Public
Edited
Jul 20, 2023
1 fork
Importers
Insert cell
Insert cell
Insert cell
viewof q = queryExecutor({ storageClient: mongodbClient })
Insert cell
viewof q.log
Insert cell
async function queryExecutor({
queries,
sleepTime = 1,
storageClient, // To store and query queries and its results
backends = [arqueroFilterer, duckDBFilterer, vanillaJSFilter],
log = [],
queryPlanCollection = "queryPlanner",
queryExecCollection = "queryExecutor",
dbName = "benchmark"
} = {}) {
let queryCounter = { success: 0, failed: 0 };
let expNames = await getExperiments();
let experimentSelect = Inputs.select(expNames, {
label: "Select the experiment"
});
let runExperiment = Inputs.button("Run Experiment");
let status = html`<details style="max-width: 600px; background: #fffced; box-sizing: border-box; padding: 10px 20px;"><summary style="font-weight: bold; cursor: pointer; outline: none;">Log</details>`;
let target = html`${experimentSelect} ${runExperiment} ${status}`;

let statusLog = html``;
let queries_ = [];
let statusLock = false;
let queriesStatus = html``;
let datasetsHelper = await getDatasets();
target.log = log;

async function runFn() {
let initalCounter = 0;

if (storageClient) {
let response = await storageClient({
db: dbName,
collection: queryExecCollection,
operation: "find",
query: { experimentName: experimentSelect.value },
options: { sort: { queryId: -1 }, limit: 1 }
});
initalCounter = response[0] ? response[0].queryId : 0;
}

let totalQueries;
if (storageClient) {
let response = await storageClient({
db: dbName,
collection: queryPlanCollection,
operation: "find",
query: { experimentName: experimentSelect.value },
options: { sort: { queryId: -1 }, limit: 1 }
});
totalQueries = response[0]?.queryId;
} else {
totalQueries = Math.max(queries.map((d) => d.queryId));
}

log.push({
msg: `Query Executor: Total Queries : ${totalQueries} | Completed Queries ${initalCounter}`,
level: 1
});

//Executing queries
while (initalCounter < totalQueries) {
try {
let query;
if (storageClient) {
let res = await storageClient({
db: dbName,
collection: queryPlanCollection,
operation: "find",
query: {
experimentName: experimentSelect.value,
queryId: initalCounter + 1
}
});
query = res[0];
} else {
query = queries[queryPlanCollection][initalCounter];
}
log.push({
msg: `Query Executor: Executing query ${query}`,
level: 2
});

if (!Object.keys(datasetsHelper.datasets).includes(query["dataset"])) {
await datasetsHelper.clearAll();
await datasetsHelper.selectDataset(query["dataset"])();
console.log({ datasetsHelper });
}
let data = datasetsHelper.datasets[query["dataset"]];

//lengths to check if all the backend gives the same result based on the length
let lengths = new Set();
let resLog = { ...query };

//executing in backends
for (let backend of backends) {
let filter = await backend();
log.push({
msg: `Query Executor: Executing query ${query} in ${filter.filterer}`,
level: 2
});

let init_start = performance.now();
await filter.initialize(data, log);
let init_end = performance.now();

//filter the data based on query map
let queryMap = new Map(Object.entries(query.query));
console.log({ queryMap });
let filter_start = performance.now();
let res = await filter.filterData(queryMap);

let filter_end = performance.now();

lengths.add(res.length);
resLog[filter.filterer] = {};
let factors = {
init_time: init_end - init_start,
filter_time: filter_end - filter_start,
length: res.length
};
Object.assign(resLog[filter.filterer], factors);
filter = null;
await new Promise((d) => setTimeout(d, sleepTime));
}
//If the size is same then results match
if (lengths.size === 1) {
if (storageClient) {
let res = await storageClient({
db: dbName,
collection: queryExecCollection,
operation: "insert",
data: [resLog]
});
if (!res.acknowledged) {
throw res;
}
} else {
queries[queryExecCollection].push(resLog);
}

queryCounter.success += 1;
} else {
queryCounter.failed += 1;
}
} catch (err) {
log.push({
msg: `Query Executor: Executing queries error with ${err}`,
level: 0
});
queryCounter.failed += 1;
} finally {
initalCounter += 1;
}
}
}

runExperiment.addEventListener("click", async () => {
log.push({
msg: `Query Executor: Starting Experiment ${experimentSelect.value}`,
level: 1
});

runExperiment.childNodes[0].disabled = true;
statusLock = true;
updateStatus();

await runFn();

target.log = log;
//Update the log and status before lock
updateStatus();
await new Promise((d) => setTimeout(d, sleepTime));
statusLock = false;
runExperiment.childNodes[0].disabled = false;

//Queries
queriesStatus?.remove();
queriesStatus = htl.html`Query Executor Execution Completed | Success ${queryCounter.success} | Failed ${queryCounter.failed}`;
target.appendChild(queriesStatus);

log.push({
msg: `Query Executor: Experiment ${experimentSelect.value} Completed`,
level: 1
});
});

async function updateStatus() {
while (statusLock) {
//Queries
queriesStatus?.remove();
queriesStatus = htl.html`Running... | Completed ${queryCounter.success} | Failed ${queryCounter.failed}`;
target.appendChild(queriesStatus);

// Log
statusLog?.remove();
statusLog = html`<div style='font-size:15px'>${[
...log.filter((d) => d.level < 2).slice(-10)
]
.reverse()
.map((d, i) => (d.msg.length > 50 ? d.msg.slice(0, 50) + "..." : d.msg))
.join("<br>")}</div>`;
status.appendChild(statusLog);
await new Promise((d) => setTimeout(d, sleepTime));
}
}

//Functions to get Experiment Names
async function getExperiments() {
log.push({
msg: `Query Executor: Fetching Experiments`,
level: 1
});

let experimentNames = [];
try {
if (storageClient) {
let res = await storageClient({
db: dbName,
collection: queryPlanCollection,
operation: "aggregate",
query: [
{
$group: {
_id: "$experimentName"
}
}
]
});
if (res instanceof Error) {
throw res;
}
experimentNames = res?.map((d) => d._id);
} else {
experimentNames = new Set(
queries[queryPlanCollection].map((d) => d?.experimentName)
);
}
} catch (err) {
log.push({
msg: `Query Executor: Fetching experiments failed with error ${err} `,
level: 0
});
console.error(
`Query Executor: Fetching experiments failed with error ${err} `
);
} finally {
log.push({
msg: `Query Executor: Fetched Experiments -> ${experimentNames}`,
level: 1
});
return experimentNames;
}
}
return target;
}
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more