Published
Edited
Jun 8, 2022
2 forks
Importers
16 stars
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
redisConfig = ({
socket: {
host: "redis.webcode.run",
port: 443,
tls: true
}
})
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
createClient = async (redisConfig, client_id) => ({
redis: await createRedisClient(redisConfig),
client_id
})
Insert cell
Insert cell
Insert cell
exampleClient = {
restartClients; // Depend on restartClients cell so we can cycle this client
return createClient(redisConfig, "exampleClient");
}
Insert cell
Insert cell
viewof restartClients = Inputs.button("restart clients")
Insert cell
Insert cell
exampleClient.redis.sendCommand(["PING"])
Insert cell
suite.test("Example client responds to PING with PONG", async () => {
expect(await exampleClient.redis.sendCommand(["PING"])).toBe("PONG");
})
Insert cell
Insert cell
init_client = ({ redis, client_id }) => {
clear_operations({ redis, client_id });
clear_notifications({ redis, client_id });
set_head_operation_id({ redis, client_id }, "0");
return set_head_notify_id({ redis, client_id }, "0-0");
}
Insert cell
Insert cell
client_operation_queue_key = (client_id) => `c-${client_id}-actions`
Insert cell
client_notify_queue_key = (client_id) => `c-${client_id}-replies`
Insert cell
Insert cell
Insert cell
clear_operations = ({ redis, client_id } = {}) =>
redis.sendCommand(["DEL", client_operation_queue_key(client_id)])
Insert cell
clear_notifications = ({ redis, client_id } = {}) =>
redis.sendCommand(["DEL", client_notify_queue_key(client_id)])
Insert cell
Insert cell
Insert cell
client_operation_head_id_key = (client_id) => `c-${client_id}-actions-head`
Insert cell
client_notify_head_id_key = (client_id) => `c-${client_id}-replies-head`
Insert cell
Insert cell
set_head_operation_id = ({ redis, client_id } = {}, value) =>
redis.sendCommand(["SET", client_operation_head_id_key(client_id), value])
Insert cell
set_head_notify_id = ({ redis, client_id } = {}, value) =>
redis.sendCommand(["SET", client_notify_head_id_key(client_id), value])
Insert cell
Insert cell
get_head_operation_id = ({ redis, client_id } = {}) =>
redis.sendCommand(["GET", client_operation_head_id_key(client_id)])
Insert cell
get_head_notify_id = ({ redis, client_id } = {}) =>
redis.sendCommand(["GET", client_notify_head_id_key(client_id)])
Insert cell
Insert cell
suite.test("init_client resets queues and heads", async () => {
const client = await createClient(redisConfig, "init_client");
enqueue_operation(client, {}); // Put some data in the queue

init_client(client); // reset the client

expect(await get_head_operation_id(client)).toBe("0");
expect(await get_head_notify_id(client)).toBe("0-0");
expect(await next_operation(client)).toBe(null);
expect(await next_notify(client)).toBe(null);
})
Insert cell
Insert cell
client_longpoll_session_key = (client_id) => `c-${client_id}-lp`
Insert cell
createLongpollSession = async (client, { password } = {}) => {
init_client(client);
client.redis.sendCommand([
"HSET",
client_longpoll_session_key(client.client_id),
"password",
password
]);
return {
client,
password
};
}
Insert cell
retrieveLongpollSession = async (client) => {
const response = await client.redis.sendCommand([
"HMGET",
client_longpoll_session_key(client.client_id),
"password"
]);
return {
client,
password: response[0]
};
}
Insert cell
suite.test(
"createLongpollSession can set a password that is visible to retrieveLongpollSession",
async () => {
const randomPassword = Math.random().toString();
createLongpollSession(exampleClient, { password: randomPassword });
const retrieveSession = await retrieveLongpollSession(exampleClient);
expect(retrieveSession.password).toBe(randomPassword);
}
)
Insert cell
Insert cell
Insert cell
incrementLongpollResponseNum = async ({ redis, client_id }) => {
return (
(await redis.sendCommand(["INCR", client_longpoll_response_num_key(client_id)])) -
1
);
}
Insert cell
suite.test(
"incrementLongpollResponseNum increases by one each time",
async () => {
const current = await incrementLongpollResponseNum(exampleClient);
expect(await incrementLongpollResponseNum(exampleClient)).toBe(current + 1);
}
)
Insert cell
Insert cell
data_key = (location) => `${location}-data`
Insert cell
data_listeners_key = (location) => `${location}-listeners`
Insert cell
Insert cell
set_data = ({ redis, client_id } = {}, location, value) =>
redis.sendCommand(["SET", data_key(location), value])
Insert cell
get_data = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["GET", data_key(location)])
Insert cell
suite.test(
"Read our writes. get_data retreives data previously set by set_data",
async () => {
const data = JSON.stringify(Math.random());
set_data(exampleClient, "read_our_writes_test", data);
const fetched = await get_data(exampleClient, "read_our_writes_test");
expect(fetched).toBe(data);
}
)
Insert cell
Insert cell
add_data_listener = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["LPUSH", data_listeners_key(location), client_id])
Insert cell
remove_data_listener = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["LREM", data_listeners_key(location), "1", client_id])
Insert cell
get_data_listeners = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["LRANGE", data_listeners_key(location), "0", "-1"])
Insert cell
clear_data_listeners = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["DEL", data_listeners_key(location)])
Insert cell
suite.test(
"Listeners: add_data_listener, remove_data_listener, get_data_listeners, clear_data_listeners record client_id in a list",
async () => {
const location = "listeners-test";
clear_data_listeners(exampleClient, location);
const afterClear = get_data_listeners(exampleClient, location);
add_data_listener(exampleClient, location);
const afterAdd = get_data_listeners(exampleClient, location);
remove_data_listener(exampleClient, location);
const afterRemove = get_data_listeners(exampleClient, location);

expect(await afterClear).toEqual([]);
expect(await afterAdd).toEqual([exampleClient.client_id]);
expect(await afterRemove).toEqual([]);
}
)
Insert cell
Insert cell
Insert cell
Insert cell
enqueue_operation = ({ redis, client_id } = {}, action) =>
redis
.sendCommand(
[
"XADD",
client_operation_queue_key(client_id),
`0-${action.request_id}`
].concat(Object.entries(action).flatMap((_) => _))
)
.catch((err) => {
if (err.message.includes("ID specified in XADD is equal or smaller"))
console.log(`Non-monotonic request id ${action.request_id}`);
else {
throw err;
}
})
Insert cell
Insert cell
next_operation = async ({ redis, client_id } = {}) => {
const id = await get_head_operation_id({ redis, client_id });
const response = await redis.sendCommand([
"XREAD",
"COUNT",
"1",
"STREAMS",
client_operation_queue_key(client_id),
`0-${id}`
]);
if (response === "QUEUED")
throw new Error("Can't use next_operation in transaction");
return (
response &&
response[0][1][0][1].reduce((obj, val, index, arr) => {
if (index % 2 == 1) obj[arr[index - 1]] = arr[index];
return obj;
}, {})
);
}
Insert cell
Insert cell
ack_operation = ({ redis, client_id } = {}, stream_id) => {
return set_head_operation_id({ redis, client_id }, stream_id);
}
Insert cell
suite.test(
"Client operation queue: enqueue_operation, next_operation, ack_operation",
async () => {
const client = await createClient(redisConfig, "operation_queue_client");
init_client(client);

enqueue_operation(client, {
request_id: "1",
payload: "init"
});
// If we write twice with the same request_id it is deduplicated
enqueue_operation(client, {
request_id: "1",
payload: "dedupe_test"
});

enqueue_operation(client, {
request_id: "2",
payload: "second"
});

const next1 = next_operation(client);
const next2 = next_operation(client);

ack_operation(client, "1");

const next3 = next_operation(client);

ack_operation(client, "2");

const next4 = next_operation(client);

expect(await next1).toEqual({
request_id: "1",
payload: "init"
});

expect(await next2).toEqual({
request_id: "1",
payload: "init"
});

expect(await next3).toEqual({
request_id: "2",
payload: "second"
});

expect(await next4).toEqual(null);
}
)
Insert cell
Insert cell
enqueue_notify = ({ redis, client_id } = {}, target_client_id, reply) =>
redis.sendCommand(
["XADD", client_notify_queue_key(target_client_id), "*"].concat(
Object.entries(reply).flatMap((_) => _)
)
)
Insert cell
enqueue_notify(exampleClient, exampleClient.client_id, { payload: "hi" })
Insert cell
next_notify = async ({ redis, client_id } = {}) => {
const id = await get_head_notify_id({ redis, client_id });
const response = await redis.sendCommand([
"XREAD",
"COUNT",
"1",
"STREAMS",
client_notify_queue_key(client_id),
id
]);
return (
response && [
response[0][1][0][0],
response[0][1][0][1].reduce((obj, val, index, arr) => {
if (index % 2 == 1) obj[arr[index - 1]] = arr[index];
return obj;
}, {})
]
);
}
Insert cell
ack_notify = ({ redis, client_id } = {}, id) => {
return set_head_notify_id({ redis, client_id }, id);
}
Insert cell
Insert cell
Insert cell
Insert cell
process_operation = (client) => viewof process_operation_args.send(client)
Insert cell
viewof process_operation_args = flowQueue({
name: "process_operation_args",
timeout_ms: 2000
})
Insert cell
Insert cell
process_operation_args
Insert cell
Insert cell
action = {
const client = process_operation_args;
client.redis.sendCommand([
"WATCH",
client_operation_head_id_key(client.client_id)
]);
const actionRaw = await next_operation(process_operation_args);
if (!actionRaw) {
client.redis.sendCommand(["UNWATCH"]);
viewof process_operation_args.respond("NOOP");
return invalidation;
} else {
return actionRaw;
}
}
Insert cell
Insert cell
prerequisites = {
const client = process_operation_args;
const operation = action.action;
try {
if (operation === "PUT") {
client.redis.sendCommand(["WATCH", data_listeners_key(action.key)]);
return {
listeners: await get_data_listeners(client, action.key)
};
} else if (operation === "GET" || operation === "LISTEN") {
client.redis.sendCommand(["WATCH", data_key(action.key)]);
return {
data: await get_data(client, action.key)
};
}
} catch (err) {
client.redis.sendCommand(["UNWATCH"]);
console.error(err.message, action);
viewof process_operation_args.reject(err);
return err;
}
return {};
}
Insert cell
Insert cell
process_operation_effect = {
const client = process_operation_args;
const operation = action.action;
try {
client.redis.sendCommand(["MULTI"]);
const handler = {
PUT: viewof run_put_operation_args,
GET: viewof run_get_operation_args,
LISTEN: viewof run_listen_operation_args,
UNLISTEN: viewof run_unlisten_operation_args
}[operation];

if (handler === undefined)
throw new Error("Unknown operation " + operation);

return await handler.send({
...prerequisites,
client,
action
});
} catch (err) {
console.error(err.message, action);
client.redis.sendCommand(["DISCARD"]);
viewof process_operation_args.reject(err);
return err;
}
}
Insert cell
Insert cell
ack_process_operation = {
const client = process_operation_args;
process_operation_effect;
try {
ack_operation(client, action.request_id);
const response = client.redis.sendCommand(["EXEC"]);
viewof process_operation_args.respond(response);
return response;
} catch (err) {
await client.redis.sendCommand(["DISCARD"]);
viewof process_operation_args.reject(err);
return err;
}
}
Insert cell
suite.test(
"smoke test process_operation with LISTEN/PUT/GET/UNLISTEN",
async () => {
const client = await createClient(
redisConfig,
"process_operation_smoke_test"
);
clear_data_listeners(client, "foo");
init_client(client);
enqueue_operation(client, {
request_id: "1",
action: "PUT",
key: "foo",
value: "baz"
});
enqueue_operation(client, {
request_id: "2",
action: "LISTEN",
key: "foo"
});
enqueue_operation(client, {
request_id: "3",
action: "PUT",
key: "foo",
value: "bar"
});
enqueue_operation(client, {
request_id: "4",
action: "GET",
key: "foo"
});

enqueue_operation(client, {
request_id: "5",
action: "UNLISTEN",
key: "foo"
});

while ((await process_operation(client)) !== "NOOP") {}

const history = [];
var next = await next_notify(client);
while (next) {
const [id, reply] = next;
history.push(reply);
ack_notify(client, id);
next = await next_notify(client);
}
expect(history).toEqual([
{
request_id: "1",
status: "ok"
},
{
action: "DATA",
data: "baz",
key: "foo"
},
{
request_id: "2",
status: "ok"
},
{
action: "DATA",
data: "bar",
key: "foo"
},
{
request_id: "3",
status: "ok"
},
{
data: "bar",
request_id: "4",
status: "ok"
},
{
request_id: "5",
status: "ok"
}
]);
}
)
Insert cell
Insert cell
operations = [
put_operation_response,
get_operation_response,
listen_operation_response,
unlisten_operation_response
]
Insert cell
Insert cell
viewof run_put_operation_args = flowQueue({
name: "run_put_operation_args"
})
Insert cell
run_put_operation_args
Insert cell
run_put_operation_effect = {
const client = run_put_operation_args.client;
const action = run_put_operation_args.action;
var result = set_data(client, action.key, action.value);
run_put_operation_args.listeners.forEach((client_id) => {
var result = enqueue_notify(client, client_id, {
action: "DATA",
key: action.key,
data: action.value
});
});
return enqueue_notify(client, client.client_id, {
request_id: action.request_id,
status: "ok"
});
}
Insert cell
put_operation_response = viewof run_put_operation_args.respond(
run_put_operation_effect
)
Insert cell
Insert cell
viewof run_get_operation_args = flowQueue({
name: "run_get_operation_args"
})
Insert cell
run_get_operation_args
Insert cell
run_get_operation_effect = {
const client = run_get_operation_args.client;
const action = run_get_operation_args.action;
const data = run_get_operation_args.data;
return enqueue_notify(client, client.client_id, {
request_id: action.request_id,
status: "ok",
data
});
}
Insert cell
get_operation_response = viewof run_get_operation_args.respond(
run_get_operation_effect
)
Insert cell
Insert cell
viewof run_listen_operation_args = flowQueue({
name: "run_listen_operation_args"
})
Insert cell
run_listen_operation_args
Insert cell
run_listen_effect = {
const client = run_listen_operation_args.client;
const action = run_listen_operation_args.action;
const data = run_listen_operation_args.data;
enqueue_notify(client, client.client_id, {
action: "DATA",
key: action.key,
data: data
});
add_data_listener(client, action.key);
return enqueue_notify(client, client.client_id, {
request_id: action.request_id,
status: "ok"
});
}
Insert cell
listen_operation_response = viewof run_listen_operation_args.respond(
run_listen_effect
)
Insert cell
Insert cell
viewof run_unlisten_operation_args = flowQueue({
name: "run_unlisten_operation_args"
})
Insert cell
run_unlisten_operation_args
Insert cell
run_unlisten_effect = {
const client = run_unlisten_operation_args.client;
const action = run_unlisten_operation_args.action;
remove_data_listener(client, action.key);
return enqueue_notify(client, client.client_id, {
request_id: action.request_id,
status: "ok"
});
}
Insert cell
unlisten_operation_response = viewof run_unlisten_operation_args.respond(
run_unlisten_effect
)
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell

One platform to build and deploy the best data apps

Experiment and prototype by building visualizations in live JavaScript notebooks. Collaborate with your team and decide which concepts to build out.
Use Observable Framework to build data apps locally. Use data loaders to build in any language or library, including Python, SQL, and R.
Seamlessly deploy to Observable. Test before you ship, use automatic deploy-on-commit, and ensure your projects are always up-to-date.
Learn more