async function queriesExecutor({
queries,
sleepTime = 100,
storageClient,
filters = [vanillaJSFilter, duckDBFilter, arqueroFilter],
log = [],
queryPlanCollection = "queryPlanner",
queryExecCollection = "queryExecutor",
dbName = "benchmark-moma",
datasetsAccessor,
logLevel,
initalCounter = 0
} = {}) {
let target;
console.time("Query Executor");
try {
let queryCounter = { success: 0, failed: 0 };
let expNames = await getExperiments();
let experimentSelect = Inputs.select(expNames, {
label: "Select the experiment"
});
let runExperiment = Inputs.button("Run Experiment");
let stopExperiment = Inputs.button("Stop Experiment");
let status = html`<details style="max-width: 600px; background: #fffced; box-sizing: border-box; padding: 10px 20px;"><summary style="font-weight: bold; cursor: pointer; outline: none;">Log</details>`;
target = html`<div>${experimentSelect} <div style='display:flex;align-items:left' > ${runExperiment} ${stopExperiment} </div>${status}</div>`;
let statusLog = html``;
let queries_ = [];
let totalQueries;
let statusLock = false;
let queriesStatus = html``;
target.log = log;
target.success = [];
target.failed = [];
function addLog({ msg, level, queryId }) {
if (!Object.keys(log).includes(queryId)) {
log[queryId] = [];
}
if (!logLevel) {
return;
}
level <= logLevel && log.push({ msg });
}
async function runFn() {
if (storageClient) {
let response = await storageClient({
db: dbName,
collection: queryExecCollection,
operation: "find",
query: { experimentName: experimentSelect.value },
options: { sort: { queryId: -1 }, limit: 1 }
});
initalCounter = response[0] ? response[0].queryId : initalCounter;
}
if (storageClient) {
let response = await storageClient({
db: dbName,
collection: queryPlanCollection,
operation: "find",
query: { experimentName: experimentSelect.value },
options: { sort: { queryId: -1 }, limit: 1 }
});
totalQueries = response[0]?.queryId;
} else {
console.log(
"queries",
queries[experimentSelect.value].map((d) => d.queryId)
);
totalQueries = Math.max(
...queries[experimentSelect.value].map((d) => d.queryId)
);
}
addLog({
msg: `Query Executor: Total Queries : ${totalQueries} | Completed Queries ${initalCounter}`,
level: 1,
id: "Init"
});
let queryTime = performance.now();
//Executing queries
while (initalCounter < totalQueries && !stopExperiment.value) {
let query, resLog;
try {
if (storageClient) {
let res = await storageClient({
db: dbName,
collection: queryPlanCollection,
operation: "find",
query: {
experimentName: experimentSelect.value,
queryId: initalCounter + 1
}
});
query = res[0];
} else {
query = queries[experimentSelect.value][initalCounter];
}
addLog({
msg: `Query Executor: Executing query ${query}`,
level: 2,
id: initalCounter
});
if (
!Object.keys(datasetsAccessor.datasets).includes(query["dataset"])
) {
await datasetsAccessor.clearAll();
await datasetsAccessor.selectDataset(query["dataset"])();
//Function to Init the datasets
}
let data = datasetsAccessor.datasets[query["dataset"]];
//lengths to check if all the backend gives the same result based on the length
let lengths = new Set();
resLog = { ...query, inputDataLenght: data.length };
//executing in backends
for (let filter_ of filters) {
let filter = await filter_();
addLog({
msg: `Query Executor: Executing query ${query} in ${filter.filterName}`,
level: 2,
id: initalCounter
});
console.log("execution", filter);
let init_start = performance.now();
await filter.initialize(data);
let init_end = performance.now();
//filter the data based on query map
let queryMap = new Map(Object.entries(query.query));
let { res, filterTime } = await filter.filterData(queryMap);
//Clear the instance in case of duckdb
filter.clearClient && (await filter.clearClient());
lengths.add(res.length);
resLog[`${filter.filterName}-init_time`] = init_end - init_start;
resLog[`${filter.filterName}-filter_time`] = filterTime;
resLog[`${filter.filterName}-length`] = res.length;
}
//If the size is same then results match
if (lengths.size === 1) {
delete resLog["query"];
if (storageClient) {
let res = await storageClient({
db: dbName,
collection: queryExecCollection,
operation: "insert",
data: [resLog]
});
if (!res.acknowledged) {
throw res;
}
} else {
target.success.push(resLog);
}
queryCounter.success += 1;
} else {
target.failed.push({ ...resLog, Error: "Length Not Matched" });
queryCounter.failed += 1;
}
await new Promise((resolve) => setTimeout(resolve, 100));
} catch (err) {
console.error(err);
target.failed.push({ ...resLog, Error: err });
addLog({
msg: `Query Executor: Executing queries error with ${err}`,
level: 0,
id: initalCounter
});
queryCounter.failed += 1;
} finally {
initalCounter += 1;
target.dispatchEvent(new Event("input", { bubbles: true }));
}
console.log(
`query ${initalCounter} ran for ${
(performance.now() - queryTime) / 1000
}`
);
queryTime = performance.now();
}
console.timeEnd("Query Executor");
}
runExperiment.addEventListener("click", async () => {
addLog({
msg: `Query Executor: Starting Experiment ${experimentSelect.value}`,
level: 1,
id: "Init"
});
runExperiment.childNodes[0].disabled = true;
statusLock = true;
updateStatus();
stopExperiment.value = false;
await runFn();
target.log = log;
//Update the log and status before lock
updateStatus();
await new Promise((d) => setTimeout(d, sleepTime));
statusLock = false;
runExperiment.childNodes[0].disabled = false;
//Queries
queriesStatus?.remove();
queriesStatus = htl.html`Query Executor Execution Completed | Success ${queryCounter.success} | Failed ${queryCounter.failed}`;
target.appendChild(queriesStatus);
addLog({
msg: `Query Executor: Experiment ${experimentSelect.value} Completed`,
level: 1,
id: "Completed"
});
});
async function updateStatus() {
while (statusLock) {
//Queries
queriesStatus?.remove();
queriesStatus = htl.html`${
stopExperiment.value ? "Stopping" : "Running"
}... | Completed ${initalCounter} out of ${totalQueries} | Failed ${
queryCounter.failed
}`;
target.appendChild(queriesStatus);
// // Log
statusLog?.remove();
statusLog = html`<div style='font-size:15px'>${[
...log?.filter((d) => d.level < 2).slice(-10)
]
.reverse()
.map((d, i) =>
d.msg.length > 70 ? d.msg.slice(0, 70) + "..." : d.msg
)
.join("<br>")}</div>`;
status.appendChild(statusLog);
await new Promise((d) => setTimeout(d, sleepTime));
}
}
//stop experiment
stopExperiment.addEventListener("click", () => {
stopExperiment.value = true;
});
//Functions to get Experiment Names
async function getExperiments() {
addLog({
msg: `Query Executor: Fetching Experiments`,
level: 1,
id: "Init"
});
let experimentNames = [];
try {
if (storageClient) {
let res = await storageClient({
db: dbName,
collection: queryPlanCollection,
operation: "aggregate",
query: [
{
$group: {
_id: "$experimentName"
}
}
]
});
if (res instanceof Error) {
throw res;
}
experimentNames = res?.map((d) => d._id);
} else {
experimentNames = Object.keys(queries);
}
} catch (err) {
addLog({
msg: `Query Executor: Fetching experiments failed with error ${err} `,
level: 0,
id: "Completed"
});
console.error(
`Query Executor: Fetching experiments failed with error ${err} `
);
} finally {
addLog({
msg: `Query Executor: Fetched Experiments -> ${experimentNames}`,
level: 1,
id: "Init"
});
return experimentNames;
}
}
} catch (err) {
console.error(err);
} finally {
return target;
}
}