Public
Edited
Apr 29, 2023
Paused
1 fork
Importers
Insert cell
Insert cell
Insert cell
// viewof template = queryExecutor({
// fetchQueriesFn: async (d) => (d < 5 ? d : false),
// runFn: (query, backend, successFn, failedFn, log) => {
// let res = query % 2 == 0 ? successFn() : failedFn();
// if (res === "failed") {
// throw new Error("Required");
// }
// log[res].push({ query, backend, res });
// },
// successFn: () => "success",
// failedFn: () => "failed",
// testIds: [1]
// })
Insert cell
viewof queryExecu.log
Insert cell
viewof queryExecu.log["failed"].map((d) => {
return { id: d.res.queryId, comb: d.res.comb };
})
Insert cell
Insert cell
viewof queryExecu = queryExecutor({
testIds: await getTestids(),
//Fetch queries from Mongodb based on testId and queryId
fetchQueriesFn: async (testId, queryId) =>
await mongodbClient({
...mongoDetailsQueryPlan,
operation: "find",
query: { testId, queryId }
}),
//Function to run the query
runFn: async ({
query,
backends,
data,
successFn,
failedFn,
log,
displayLog,
testId,
queryCount
}) => {
let length = {};
let resLog = { ...query };
let lengths = new Set();
let values = [];
try {
console.log(`running ${{ testId, query: query.queryId }}`);
for (let backend of backends) {
let filter = await backend();
//Initialize
displayLog("data", data);
let init_start = performance.now();
await filter.initialize(data);
let init_end = performance.now();
displayLog("query", query);
displayLog(`running in ${filter.filterer}`);
//filter the data based on query map
let queryMap = new Map(Object.entries(query.query));
let filter_start = performance.now();
let res = await filter.filterData(queryMap);
values.push(res);
let filter_end = performance.now();
displayLog("result", res, res?.length);

lengths.add(res.length);
resLog[filter.filterer] = {};
let factors = {
init_time: init_end - init_start,
filter_time: filter_end - filter_start,
length: res.length
};
Object.assign(resLog[filter.filterer], factors);
console.log(resLog);
filter = null;
}
if (lengths.size === 1) {
log["success"].push({
resLog
});
successFn({ res: resLog });
} else {
console.log("Failed in runFn");
failedFn({ res: resLog, error: "length not matched", lengths, log });
queryCount.failed += 1;
}
} catch (err) {
failedFn({ res: resLog, error: err, log });
queryCount.failed += 1;
}
},
successFn: async ({ res }) => {
delete res["_id"];
delete res["query"];
console.log("Success");
let { queryId, testId } = res;
console.log({ queryId, testId });
await mongodbClient({
...mongoDetailsSuccessQuery,
operation: "delete",
query: { queryId, testId }
});

return await mongodbClient({
...mongoDetailsSuccessQuery,
operation: "insert",
data: [{ ...res }]
});
},
failedFn: async ({ error, res, lengths, log }) => {
try {
console.log("Failed", res, error, lengths);
log["failed"].push({ res, error, lengths });

return await mongodbClient({
...mongoDetailsFailedQuery,
operation: "insert",
data: [{ error, res, lengths }]
});
} catch (err) {
console.err(err);
}
},
getTotalQueriesFn: async (testId) =>
await getMaxQueryId(mongoDetailsQueryPlan, testId),
getLastExecutedQueryFn: async (testId) =>
await getMaxQueryId(mongoDetailsSuccessQuery, testId),
//Data
data: await getdatasets(dataset),
backends: [arqueroFilterer, duckDBFilterer, nativeJSFilterer]
})
Insert cell
randomBackend = {
let backend = ["Arquero", "DuckDB", "VanillaJS"];
return backend[d3.randomInt(3)()];
}
Insert cell
// getMaxQueryId(mongoDetailsQueryPlan, "test-2")
Insert cell
getTestids()
Insert cell
async function getTestids() {
let res = [];
try {
res = await mongodbClient({
...mongoDetailsQueryPlan,
operation: "aggregate",
query: [
{
$group: {
_id: "$testId"
}
}
]
});

res = res?.map((d) => d._id);
return res;
} catch (error) {
displayLog({ res, error });
}
}
Insert cell
async function getMaxQueryId(mongodetails, testId) {
let res;
try {
res = await mongodbClient({
...mongodetails,
operation: "find",
query: { testId: testId },
options: { sort: { queryId: -1 }, limit: 1 }
});
// console.log("get max query Id", { res });
} catch (error) {
console.log(error);
} finally {
return res[0] || { queryId: 0 };
}
}
Insert cell
function displayLog() {
if ({ ...arguments }[0].error) {
console.error(new Date().toTimeString(), ...arguments);
} else {
console.log(new Date().toTimeString(), ...arguments);
}
}
Insert cell
//Executor
async function queryExecutor({
testIds, //testId
fetchQueriesFn = (d) => d,
runFn, //Function to run the query
successFn, //Function to run once the query ran successfully,
failedFn, //Function to run once the query failed
getTotalQueriesFn, //Function to get total queries
getLastExecutedQueryFn, //Function to get Last Executed Query
initialCounter = 1, //If not provided the last queryId will be used to initialize
backends = ["Arquero", "DuckDB", "VanillaJS"], //backends to benchmark
checkBackendPoss = false, //A flag to check all the permutations of backend like [arquero,duckdb] => generates [arquero,duckdb], [duckdb,arquero]
sleepTime = 1000 //SleepTime after the query execution
} = {}) {
let backendsComb = checkBackendPoss
? generatePermutations(backends)
: [d3.shuffle(backends)];
let log = { success: [], failed: [], error: [] };
let query;
let runAllTests = Inputs.toggle({ label: "Run All Experiments" });
let runBtn = Inputs.toggle({ label: "Run Experiment" });
let displayLogBtn = Inputs.toggle({ label: "Display Log" });
let queryCounter = initialCounter;
let testId = Inputs.select(testIds, { label: "Experiment Name" });
let target = html` ${
testIds?.length != 1 ? html`${runAllTests} ${testId}` : ""
} ${runBtn} ${displayLogBtn}`;
let lock = false;
let totalQueries;
let data;
let runBtnLabel;
let dataset;
let queryCount = { failed: 0 };
function displayLog() {
try {
if (displayLogBtn.value) {
if ({ ...arguments }[0] ? { ...arguments }[0].error : false) {
console.error(new Date().toTimeString(), ...arguments);
} else {
console.log(new Date().toTimeString(), ...arguments);
}
}
} catch (error) {
console.error(error);
}
}
displayLog();

function getStatus() {
return runBtnLabel;
}

async function executeQueries() {
lock = true;
let currentBackend = "";
let res = await getTotalQueriesFn(testId.value);
totalQueries = res?.queryId;

displayLog("Executing Queries");
while (runBtn.value) {
try {
//Fetch Query
let queries = await fetchQueriesFn(testId.value, queryCounter);
//If there are duplicate, try to get 1st matching
query = queries[0];
displayLog(`fetched Query ${query} for testId ${testId.value}`);
//label
runBtnLabel?.remove();
runBtnLabel = htl.html`Status: Running Query <b>${queryCounter}</b> out of ${totalQueries} || Failed : <b>${queryCount.failed}</b>`;
target.appendChild(runBtnLabel);
if (!query) {
displayLog("Completed execution");
runBtnLabel?.remove();
runBtnLabel = htl.html`Completed All Queries Execiton`;
target.appendChild(runBtnLabel);
runBtn.value = false;
lock = false;
return;
}
for (let backends of backendsComb) {
// try { //Inner Try Block
displayLog(`Executing query : ${queryCounter}`);

if (dataset != query.dataset) {
let res = await getdatasets(query.dataset);
data = res[query.dataset];
dataset = query.dataset;
}
let res = await runFn({
query,
backends,
data,
successFn,
failedFn,
log,
displayLog,
testId: testId.value,
queryCount
});
target.dispatchEvent(new Event("input", { bubbles: true }));
await new Promise((d) => setTimeout(d, sleepTime));

// catch (error) {
// log["error"].push({ query, error });
// console.error({ error, query, currentBackend });
// }
}
} catch (error) {
log["error"].push({ query, error });
displayLog({ error, query });
failedFn({ error, query });
return;
} finally {
queryCounter += 1;
}
}
}

runBtn.addEventListener("input", async () => {
if (runBtn.value && !lock) {
runBtnLabel?.remove();
runBtnLabel = htl.html`Starting Execution`;
runBtn.appendChild(runBtnLabel);
await executeQueries();
}
});

testId.addEventListener("input", async () => {
runBtn.value = false;
runBtn.dispatchEvent(new Event("input", { bubbles: true }));
await updateQueryCounter();
});

async function updateQueryCounter() {
//Get details about the total queries in Query Plan
let res = await getTotalQueriesFn(testId.value);
totalQueries = res.queryId;
// let dataset = await getdatasets(res.dataset);
// data = dataset[res.dataset];

//If initialCounter is not provided it will get the last queryId from success Query

let successQuery = await getLastExecutedQueryFn(testId.value);
console.log({ successQuery, test: testId.value });
queryCounter = successQuery.queryId + 1;
}

let runAllBtnLabel;
runAllTests.addEventListener("input", async () => {
if (runAllTests.value) {
let testCounter = 1;
for (let test of testIds) {
//update queryCounter
queryCount = { failed: 0 };

//Add label
runAllBtnLabel?.remove();
runAllBtnLabel = html`Running test ${testCounter} out ${testIds.length}`;
runAllTests.appendChild(runAllBtnLabel);
if (runAllTests.value) {
testId.childNodes[1].disabled = true;
testId.value = test;
await updateQueryCounter();
runBtn.value = true;
runBtn.childNodes[1].disabled = true;
await executeQueries();
await new Promise((d) => setTimeout(d, sleepTime / 10));
}
testCounter += 1;
}
runAllBtnLabel?.remove();
runAllBtnLabel = html`All Tests execution completed`;
testId.childNodes[1].disabled = false;
runAllTests.value = false;
runBtn.childNodes[1].disabled = false;
} else {
runAllBtnLabel?.remove();
}
});

testId.dispatchEvent(new Event("input", { bubbles: true }));

target.log = log;
target.status = getStatus;
return target;
}
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more