Published
Edited
Jul 29, 2022
4 stars
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
historyRaw = new Promise((resolve) =>
db.ref("aggregations/GDD").once("value", (snap) => resolve(snap.val()))
)
Insert cell
historical = {
const data = [];
Object.entries(historyRaw).map(([device, readings]) => [
device,
Object.entries(readings).forEach(([timestamp, gdd]) => {
if (typeof gdd === "number")
data.push({
device,
timestamp: new Date(timestamp * 1000),
gdd
});
})
]);
return data;
}
Insert cell
Insert cell
todaySeconds = new Date(new Date().toISOString().substring(0, 10)).getTime() /
1000
Insert cell
latestDataRaw = Generators.observe((notify) => {
reset;
const dataset = (this && this.dataset) || [];
const dataHandler = (snapshot) => {
debugger;
const data = snapshot.val();
console.log(data);
if (data.body.temperature) {
dataset.push(data);
notify({
dataset,
row: data
});
}
};
const ref = db
.ref(`history/sensors/dev:864475046458393`)
.orderByKey()
.startAt(todaySeconds + "");

ref.on("child_added", dataHandler);

invalidation.then(() => {
dataset.length = 0;
ref.off("child_added", dataHandler);
});
})
Insert cell
Insert cell
latestDataGdd = latestDataRaw.dataset
.map((entry) => ({
device: entry.device,
timestamp: new Date(entry.when * 1000),
temperature: entry.body.temperature
}))
.reduce(
(gdd, row) =>
gdd + (row.temperature > 5 && row.temperature < 25 ? 0.25 : 0),
0
)
Insert cell
latestData = [
{
device: "dev:864475046458393",
timestamp: new Date(todaySeconds * 1000),
gdd: latestDataGdd
}
]
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
liveViewRaw = Generators.observe((notify) => {
db.ref(`history/sensors/${querySensorName}`)
.orderByKey()
.startAt(Math.floor(Date.now() / 1000 - 60 * 60 * 24) + "") // reduce data returned
.on("value", (snapshot) => {
const data = snapshot.val();
if (data) notify(data);
});
})
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
fullduck = {
// Refresh fullduck every insertion (causes analytics to be rerun)
doInsertRowToDuckDB;
return duckdb;
}
Insert cell
Insert cell
mutable reset = 0
Insert cell
Insert cell
Insert cell
doCreateTable = {
reset;
await duckdb.query(`DROP TABLE IF EXISTS readings`);
return duckdb.query(
`CREATE TABLE IF NOT EXISTS readings(sensor STRING, time TIMESTAMP, temperature FLOAT)`
);
}
Insert cell
Insert cell
firebaseRow
Insert cell
Insert cell
Insert cell
Insert cell
fullduck
SELECT sensor, CAST(SUM(EXTRACT(epoch from growthDuration)) AS FLOAT) / (60 * 60) as growthHours
FROM (SELECT *, CASE WHEN isGrowth THEN duration ELSE INTERVAL 0 seconds END growthDuration
FROM (
SELECT sensor, startTime, endTime, avgTemperature, endTime - startTime as duration, temperature > 5 AND temperature < 25 AS isGrowth
FROM (
SELECT sensor, time, temperature,
AVG(temperature) OVER lookback avgTemperature,
MIN(time) OVER lookback startTime,
MAX(time) OVER lookback endTime
FROM readings
WHERE time > ${startTimeIso} AND time < ${endTimeIso}
WINDOW lookback AS (
ORDER BY "time"
ROWS 1 PRECEDING)
ORDER BY time
)
)
) GROUP BY sensor
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
aggregate = (targetDay, sensor) =>
viewof aggregationRequest.send({
targetDay,
sensor
})
Insert cell
viewof aggregationRequest = (reset,
flowQueue({
timeout_ms: 10000000
}))
Insert cell
Insert cell
aggregationRequest
Insert cell
startTime = new Date(aggregationRequest.targetDay)
Insert cell
Insert cell
endTime = new Date(startTime.getTime() + 60 * 60 * 24 * 1000)
Insert cell
Insert cell
Insert cell
aggregationSensorDataRaw = new Promise((resolve) => {
db.ref(`history/sensors/${aggregationRequest.sensor}`)
.orderByKey()
.startAt(startTime.getTime() / 1000 + "")
.endAt(endTime.getTime() / 1000 + "")
.once("value", (snap) => resolve(snap.val()));
})
Insert cell
Insert cell
aggregationSensorData = aggregationSensorDataRaw
? Object.entries(aggregationSensorDataRaw)
.filter(([k, v]) => v.body.temperature)
.map(([k, v]) => v)
: []
Insert cell
Insert cell
prepareAggregation = {
aggregationSensorData;
}
Insert cell
Insert cell
insertIntoDb = {
prepareAggregation;
doCreateTable; // ensure DuckDB is reset and tables recreated
if (aggregationSensorData.length === 0) {
viewof firebaseRow.send(null);
} else {
return await Promise.all(
aggregationSensorData.map((data) => viewof firebaseRow.send(data))
);
}
}
Insert cell
Insert cell
growthHoursAggregation = {
insertIntoDb; // dataflow link to do after insertion
return aggregation[0]?.growthHours || 0;
}
Insert cell
Insert cell
writeAggregation = {
await firebase.auth().signInAnonymously();
return db
.ref(
`/aggregations/GDD/${aggregationRequest.sensor}/${
startTime.getTime() / 1000
}`
)
.set(growthHoursAggregation);
}
Insert cell
Insert cell
returnResultToFlowQueue = {
writeAggregation;
return viewof aggregationRequest.resolve(growthHoursAggregation);
}
Insert cell
Insert cell
Insert cell
function aggregateAll(day) {
return Promise.all(
[
"dev:864475046458393",
"dev:864475046461397",
"dev:864475046554399"
].map((device) => aggregate(day, device))
);
}
Insert cell
Inputs.button("backfill", {
reduce: async () => {
for (let i = 22; i <= 26; i++) {
await aggregateAll(`2022-07-${(i < 10 ? "0" : "") + i}`);
}
}
})
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more