Published
Edited
Jul 22, 2022
1 fork
5 stars
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
clientConfig = ({
firebaseServer: {
url: databaseURL
}
})
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
readYourWriteTest = suite.test("Read your own write - String", async () => {
const payload = randomString();
const location = rtdb.child(rootA, "collection/readyourownwrite");
await rtdb.set(location, payload);
const response = (await rtdb.get(location)).val();
expect(response).toEqual(payload);
})
Insert cell
Insert cell
readTheirWriteTest = suite.test("Read their write - String", async () => {
const payload = randomString();
const locationA = rtdb.child(rootA, "collection/readtheirwrite");
const locationB = rtdb.child(rootB, "collection/readtheirwrite");
await rtdb.set(locationA, payload);
const response = await rtdb.get(locationB);
expect(response.val()).toEqual(payload);
})
Insert cell
Insert cell
listenNotifiedOfWrite = suite.test(
"onValue is notified of other's write",
async (done) => {
const payload = randomString();
const locationA = rtdb.child(rootA, "collection/subscribetheirwrite");
const locationB = rtdb.child(rootB, "collection/subscribetheirwrite");
rtdb.onValue(locationB, (snap) => {
const val = snap.val();
console.log(val);
if (_.isEqual(val, payload)) {
done();
}
});

await rtdb.set(locationA, payload);
}
)
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
viewof incomingLongpollRequest = flowQueue({
timeout_ms: 10000
})
Insert cell
incomingLongpollRequest
Insert cell
Insert cell
Insert cell
Insert cell
incomingLongpollRequestAction = {
const req = incomingLongpollRequest.req;
const res = incomingLongpollRequest.res;

const callbackId = req.query.cb;
if (req.query.start) {
// New session initialized
const cid = Math.random().toString(16).substring(3);

sessions[cid] = {
id: cid,
responseId: 0,
sessionId: Math.random().toString(16).substring(3),
password: Math.random().toString(16).substring(3),
hanginingRequest: null,
serverToClientQueue: []
};

res.header("content-type", "application/javascript");
res.send(newSessionResponse(callbackId, sessions[cid]));
} else if (req.query.dframe) {
res.header("content-type", "text/html");
res.send(disconnectFrame(sessions[req.query.id]));
} else {
const session = sessions[req.query.id];
if (!session) {
return viewof incomingLongpollRequest.reject(
new Error("Unrecognized connection id for " + req.query.id)
);
}
// If there is an existing long poll close the previous
if (session.hangingRequest) {
session.hangingRequest.send(`pRTLPCB(${session.responseId++},[])`);
session.hangingRequest = null;
}

res.header("content-type", "application/javascript");
var commandIndex = 0;
while (req.query[`d${commandIndex}`]) {
const data = JSON.parse(
atob(req.query[`d${commandIndex}`].replaceAll(".", "="))
);
commandIndex++;
console.log("Incoming request", data);
try {
const response = await viewof incomingRequest.send({
session,
request: data
});
session.serverToClientQueue.push({
t: "d",
d: { r: data.d.r, b: response }
});
} catch (err) {
console.error(err);
session.serverToClientQueue.push({
t: "d",
d: { s: "fail", d: err.message }
});
}
}

if (session.serverToClientQueue.length > 0) {
res.send(
`pRTLPCB(${session.responseId++},${JSON.stringify([
session.serverToClientQueue.shift()
])});`
);
} else {
session.hangingRequest = res;
}
}

viewof incomingLongpollRequest.respond();
}
Insert cell
Insert cell
viewof incomingRequest = flowQueue({
timeout_ms: 10000
})
Insert cell
Insert cell
requestRouter = {
const session = incomingRequest.session;
const request = incomingRequest.request;
const commandHandler = {
s: viewof incomingStats,
p: viewof incomingPut,
q: viewof incomingQuery,
g: viewof incomingGet
}[request.d.a];

if (!commandHandler) {
return viewof incomingRequest.reject(
new Error("Unrecognised server action " + request.d.a)
);
}

commandHandler
.send({
session,
command: request.d.b
})
.then((handlerResponse) => {
viewof incomingRequest.respond(handlerResponse);
})
.catch(viewof incomingRequest.reject);
}
Insert cell
Insert cell
viewof incomingStats = flowQueue({
timeout_ms: 5000
})
Insert cell
incomingStats
Insert cell
incomingStatsAction = {
incomingStats; // Trigger
const response = { s: "ok", d: "" };
viewof incomingStats.respond(response);
return response;
}
Insert cell
Insert cell
viewof incomingPut = flowQueue({
timeout_ms: 5000
})
Insert cell
incomingPut
Insert cell
incomingPutAction = {
const path = incomingPut.command.p;
const data = incomingPut.command.d;
try {
await redis.sendCommand([
"SET",
"firebase-server-prototype-1" + path,
JSON.stringify(data)
]);

// We also need to tell all interested parties
const listeners = pathToListeners[path] || [];
listeners.forEach((session) => {
session.serverToClientQueue.push({
t: "d",
d: {
a: "d",
b: {
p: path,
d: data
}
}
});

// TODO, we should not be doing long poll specific stuff here
if (session.hangingRequest) {
const res = session.hangingRequest;
session.hangingRequest = null;
res.send(
`pRTLPCB(${session.responseId++},${JSON.stringify([
session.serverToClientQueue.shift()
])});`
);
}
});

const response = { s: "ok", d: "" };
viewof incomingPut.respond(response);
return response;
} catch (err) {
viewof incomingPut.reject(err);
return err;
}
}
Insert cell
Insert cell
viewof incomingGet = flowQueue({
timeout_ms: 5000
})
Insert cell
incomingGet
Insert cell
incomingGetAction = {
const path = incomingGet.command.p;
const query = incomingGet.command.q;

// TODO permissions check
// { s: "permission_denied", d: "Permission denied" };
try {
const value = await redis.sendCommand([
"GET",
"firebase-server-prototype-1" + path
]);
const response = { s: "ok", d: JSON.parse(value) };
viewof incomingGet.respond(response);
return response;
} catch (err) {
viewof incomingGet.reject(err);
return err;
}
}
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
incomingQueryAction = {
// Actions: Listen/Query
const session = incomingQuery.session;
const command = incomingQuery.command;

try {
const initial = await redis.sendCommand([
"GET",
"firebase-server-prototype-1" + command.p
]);

// Initial data update sent immediately
session.serverToClientQueue.push({
t: "d",
d: {
a: "d",
b: {
p: command.p,
d: JSON.parse(initial)
}
}
});

// Now register as a listener
pathToListeners[command.p] ||= [];
pathToListeners[command.p].push(session);

// Follow up with a OK to the query
const response = { s: "ok", d: "" };
viewof incomingQuery.respond(response);
return response;
} catch (err) {
viewof incomingQuery.reject(err);
return err;
}
}
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell

One platform to build and deploy the best data apps

Experiment and prototype by building visualizations in live JavaScript notebooks. Collaborate with your team and decide which concepts to build out.
Use Observable Framework to build data apps locally. Use data loaders to build in any language or library, including Python, SQL, and R.
Seamlessly deploy to Observable. Test before you ship, use automatic deploy-on-commit, and ensure your projects are always up-to-date.
Learn more