Published
Edited
Jun 8, 2022
2 forks
Importers
16 stars
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
redisConfig = ({
socket: {
host: "redis.webcode.run",
port: 443,
tls: true
}
})
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
createClient = async (redisConfig, client_id) => ({
redis: await createRedisClient(redisConfig),
client_id
})
Insert cell
Insert cell
Insert cell
exampleClient = {
restartClients; // Depend on restartClients cell so we can cycle this client
return createClient(redisConfig, "exampleClient");
}
Insert cell
Insert cell
viewof restartClients = Inputs.button("restart clients")
Insert cell
Insert cell
exampleClient.redis.sendCommand(["PING"])
Insert cell
suite.test("Example client responds to PING with PONG", async () => {
expect(await exampleClient.redis.sendCommand(["PING"])).toBe("PONG");
})
Insert cell
Insert cell
init_client = ({ redis, client_id }) => {
clear_operations({ redis, client_id });
clear_notifications({ redis, client_id });
set_head_operation_id({ redis, client_id }, "0");
return set_head_notify_id({ redis, client_id }, "0-0");
}
Insert cell
Insert cell
client_operation_queue_key = (client_id) => `c-${client_id}-actions`
Insert cell
client_notify_queue_key = (client_id) => `c-${client_id}-replies`
Insert cell
Insert cell
Insert cell
clear_operations = ({ redis, client_id } = {}) =>
redis.sendCommand(["DEL", client_operation_queue_key(client_id)])
Insert cell
clear_notifications = ({ redis, client_id } = {}) =>
redis.sendCommand(["DEL", client_notify_queue_key(client_id)])
Insert cell
Insert cell
Insert cell
client_operation_head_id_key = (client_id) => `c-${client_id}-actions-head`
Insert cell
client_notify_head_id_key = (client_id) => `c-${client_id}-replies-head`
Insert cell
Insert cell
set_head_operation_id = ({ redis, client_id } = {}, value) =>
redis.sendCommand(["SET", client_operation_head_id_key(client_id), value])
Insert cell
set_head_notify_id = ({ redis, client_id } = {}, value) =>
redis.sendCommand(["SET", client_notify_head_id_key(client_id), value])
Insert cell
Insert cell
get_head_operation_id = ({ redis, client_id } = {}) =>
redis.sendCommand(["GET", client_operation_head_id_key(client_id)])
Insert cell
get_head_notify_id = ({ redis, client_id } = {}) =>
redis.sendCommand(["GET", client_notify_head_id_key(client_id)])
Insert cell
Insert cell
suite.test("init_client resets queues and heads", async () => {
const client = await createClient(redisConfig, "init_client");
enqueue_operation(client, {}); // Put some data in the queue

init_client(client); // reset the client

expect(await get_head_operation_id(client)).toBe("0");
expect(await get_head_notify_id(client)).toBe("0-0");
expect(await next_operation(client)).toBe(null);
expect(await next_notify(client)).toBe(null);
})
Insert cell
Insert cell
client_longpoll_session_key = (client_id) => `c-${client_id}-lp`
Insert cell
createLongpollSession = async (client, { password } = {}) => {
init_client(client);
client.redis.sendCommand([
"HSET",
client_longpoll_session_key(client.client_id),
"password",
password
]);
return {
client,
password
};
}
Insert cell
retrieveLongpollSession = async (client) => {
const response = await client.redis.sendCommand([
"HMGET",
client_longpoll_session_key(client.client_id),
"password"
]);
return {
client,
password: response[0]
};
}
Insert cell
suite.test(
"createLongpollSession can set a password that is visible to retrieveLongpollSession",
async () => {
const randomPassword = Math.random().toString();
createLongpollSession(exampleClient, { password: randomPassword });
const retrieveSession = await retrieveLongpollSession(exampleClient);
expect(retrieveSession.password).toBe(randomPassword);
}
)
Insert cell
Insert cell
Insert cell
incrementLongpollResponseNum = async ({ redis, client_id }) => {
return (
(await redis.sendCommand(["INCR", client_longpoll_response_num_key(client_id)])) -
1
);
}
Insert cell
suite.test(
"incrementLongpollResponseNum increases by one each time",
async () => {
const current = await incrementLongpollResponseNum(exampleClient);
expect(await incrementLongpollResponseNum(exampleClient)).toBe(current + 1);
}
)
Insert cell
Insert cell
data_key = (location) => `${location}-data`
Insert cell
data_listeners_key = (location) => `${location}-listeners`
Insert cell
Insert cell
set_data = ({ redis, client_id } = {}, location, value) =>
redis.sendCommand(["SET", data_key(location), value])
Insert cell
get_data = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["GET", data_key(location)])
Insert cell
suite.test(
"Read our writes. get_data retreives data previously set by set_data",
async () => {
const data = JSON.stringify(Math.random());
set_data(exampleClient, "read_our_writes_test", data);
const fetched = await get_data(exampleClient, "read_our_writes_test");
expect(fetched).toBe(data);
}
)
Insert cell
Insert cell
add_data_listener = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["LPUSH", data_listeners_key(location), client_id])
Insert cell
remove_data_listener = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["LREM", data_listeners_key(location), "1", client_id])
Insert cell
get_data_listeners = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["LRANGE", data_listeners_key(location), "0", "-1"])
Insert cell
clear_data_listeners = ({ redis, client_id } = {}, location) =>
redis.sendCommand(["DEL", data_listeners_key(location)])
Insert cell
suite.test(
"Listeners: add_data_listener, remove_data_listener, get_data_listeners, clear_data_listeners record client_id in a list",
async () => {
const location = "listeners-test";
clear_data_listeners(exampleClient, location);
const afterClear = get_data_listeners(exampleClient, location);
add_data_listener(exampleClient, location);
const afterAdd = get_data_listeners(exampleClient, location);
remove_data_listener(exampleClient, location);
const afterRemove = get_data_listeners(exampleClient, location);

expect(await afterClear).toEqual([]);
expect(await afterAdd).toEqual([exampleClient.client_id]);
expect(await afterRemove).toEqual([]);
}
)
Insert cell
Insert cell
Insert cell
Insert cell
enqueue_operation = ({ redis, client_id } = {}, action) =>
redis
.sendCommand(
[
"XADD",
client_operation_queue_key(client_id),
`0-${action.request_id}`
].concat(Object.entries(action).flatMap((_) => _))
)
.catch((err) => {
if (err.message.includes("ID specified in XADD is equal or smaller"))
console.log(`Non-monotonic request id ${action.request_id}`);
else {
throw err;
}
})
Insert cell
Insert cell
next_operation = async ({ redis, client_id } = {}) => {
const id = await get_head_operation_id({ redis, client_id });
const response = await redis.sendCommand([
"XREAD",
"COUNT",
"1",
"STREAMS",
client_operation_queue_key(client_id),
`0-${id}`
]);
if (response === "QUEUED")
throw new Error("Can't use next_operation in transaction");
return (
response &&
response[0][1][0][1].reduce((obj, val, index, arr) => {
if (index % 2 == 1) obj[arr[index - 1]] = arr[index];
return obj;
}, {})
);
}
Insert cell
Insert cell
ack_operation = ({ redis, client_id } = {}, stream_id) => {
return set_head_operation_id({ redis, client_id }, stream_id);
}
Insert cell
suite.test(
"Client operation queue: enqueue_operation, next_operation, ack_operation",
async () => {
const client = await createClient(redisConfig, "operation_queue_client");
init_client(client);

enqueue_operation(client, {
request_id: "1",
payload: "init"
});
// If we write twice with the same request_id it is deduplicated
enqueue_operation(client, {
request_id: "1",
payload: "dedupe_test"
});

enqueue_operation(client, {
request_id: "2",
payload: "second"
});

const next1 = next_operation(client);
const next2 = next_operation(client);

ack_operation(client, "1");

const next3 = next_operation(client);

ack_operation(client, "2");

const next4 = next_operation(client);

expect(await next1).toEqual({
request_id: "1",
payload: "init"
});

expect(await next2).toEqual({
request_id: "1",
payload: "init"
});

expect(await next3).toEqual({
request_id: "2",
payload: "second"
});

expect(await next4).toEqual(null);
}
)
Insert cell
Insert cell
enqueue_notify = ({ redis, client_id } = {}, target_client_id, reply) =>
redis.sendCommand(
["XADD", client_notify_queue_key(target_client_id), "*"].concat(
Object.entries(reply).flatMap((_) => _)
)
)
Insert cell
enqueue_notify(exampleClient, exampleClient.client_id, { payload: "hi" })
Insert cell
next_notify = async ({ redis, client_id } = {}) => {
const id = await get_head_notify_id({ redis, client_id });
const response = await redis.sendCommand([
"XREAD",
"COUNT",
"1",
"STREAMS",
client_notify_queue_key(client_id),
id
]);
return (
response && [
response[0][1][0][0],
response[0][1][0][1].reduce((obj, val, index, arr) => {
if (index % 2 == 1) obj[arr[index - 1]] = arr[index];
return obj;
}, {})
]
);
}
Insert cell
ack_notify = ({ redis, client_id } = {}, id) => {
return set_head_notify_id({ redis, client_id }, id);
}
Insert cell
Insert cell
Insert cell
Insert cell
process_operation = (client) => viewof process_operation_args.send(client)
Insert cell
viewof process_operation_args = flowQueue({
name: "process_operation_args",
timeout_ms: 2000
})
Insert cell
Insert cell
process_operation_args
Insert cell
Insert cell
action = {
const client = process_operation_args;
client.redis.sendCommand([
"WATCH",
client_operation_head_id_key(client.client_id)
]);
const actionRaw = await next_operation(process_operation_args);
if (!actionRaw) {
client.redis.sendCommand(["UNWATCH"]);
viewof process_operation_args.respond("NOOP");
return invalidation;
} else {
return actionRaw;
}
}
Insert cell
Insert cell
prerequisites = {
const client = process_operation_args;
const operation = action.action;
try {
if (operation === "PUT") {
client.redis.sendCommand(["WATCH", data_listeners_key(action.key)]);
return {
listeners: await get_data_listeners(client, action.key)
};
} else if (operation === "GET" || operation === "LISTEN") {
client.redis.sendCommand(["WATCH", data_key(action.key)]);
return {
data: await get_data(client, action.key)
};
}
} catch (err) {
client.redis.sendCommand(["UNWATCH"]);
console.error(err.message, action);
viewof process_operation_args.reject(err);
return err;
}
return {};
}
Insert cell
Insert cell
process_operation_effect = {
const client = process_operation_args;
const operation = action.action;
try {
client.redis.sendCommand(["MULTI"]);
const handler = {
PUT: viewof run_put_operation_args,
GET: viewof run_get_operation_args,
LISTEN: viewof run_listen_operation_args,
UNLISTEN: viewof run_unlisten_operation_args
}[operation];

if (handler === undefined)
throw new Error("Unknown operation " + operation);

return await handler.send({
...prerequisites,
client,
action
});
} catch (err) {
console.error(err.message, action);
client.redis.sendCommand(["DISCARD"]);
viewof process_operation_args.reject(err);
return err;
}
}
Insert cell
Insert cell
ack_process_operation = {
const client = process_operation_args;
process_operation_effect;
try {
ack_operation(client, action.request_id);
const response = client.redis.sendCommand(["EXEC"]);
viewof process_operation_args.respond(response);
return response;
} catch (err) {
await client.redis.sendCommand(["DISCARD"]);
viewof process_operation_args.reject(err);
return err;
}
}
Insert cell
suite.test(
"smoke test process_operation with LISTEN/PUT/GET/UNLISTEN",
async () => {
const client = await createClient(
redisConfig,
"process_operation_smoke_test"
);
clear_data_listeners(client, "foo");
init_client(client);
enqueue_operation(client, {
request_id: "1",
action: "PUT",
key: "foo",
value: "baz"
});
enqueue_operation(client, {
request_id: "2",
action: "LISTEN",
key: "foo"
});
enqueue_operation(client, {
request_id: "3",
action: "PUT",
key: "foo",
value: "bar"
});
enqueue_operation(client, {
request_id: "4",
action: "GET",
key: "foo"
});

enqueue_operation(client, {
request_id: "5",
action: "UNLISTEN",
key: "foo"
});

while ((await process_operation(client)) !== "NOOP") {}

const history = [];
var next = await next_notify(client);
while (next) {
const [id, reply] = next;
history.push(reply);
ack_notify(client, id);
next = await next_notify(client);
}
expect(history).toEqual([
{
request_id: "1",
status: "ok"
},
{
action: "DATA",
data: "baz",
key: "foo"
},
{
request_id: "2",
status: "ok"
},
{
action: "DATA",
data: "bar",
key: "foo"
},
{
request_id: "3",
status: "ok"
},
{
data: "bar",
request_id: "4",
status: "ok"
},
{
request_id: "5",
status: "ok"
}
]);
}
)
Insert cell
Insert cell
operations = [
put_operation_response,
get_operation_response,
listen_operation_response,
unlisten_operation_response
]
Insert cell
Insert cell
viewof run_put_operation_args = flowQueue({
name: "run_put_operation_args"
})
Insert cell
run_put_operation_args
Insert cell
run_put_operation_effect = {
const client = run_put_operation_args.client;
const action = run_put_operation_args.action;
var result = set_data(client, action.key, action.value);
run_put_operation_args.listeners.forEach((client_id) => {
var result = enqueue_notify(client, client_id, {
action: "DATA",
key: action.key,
data: action.value
});
});
return enqueue_notify(client, client.client_id, {
request_id: action.request_id,
status: "ok"
});
}
Insert cell
put_operation_response = viewof run_put_operation_args.respond(
run_put_operation_effect
)
Insert cell
Insert cell
viewof run_get_operation_args = flowQueue({
name: "run_get_operation_args"
})
Insert cell
run_get_operation_args
Insert cell
run_get_operation_effect = {
const client = run_get_operation_args.client;
const action = run_get_operation_args.action;
const data = run_get_operation_args.data;
return enqueue_notify(client, client.client_id, {
request_id: action.request_id,
status: "ok",
data
});
}
Insert cell
get_operation_response = viewof run_get_operation_args.respond(
run_get_operation_effect
)
Insert cell
Insert cell
viewof run_listen_operation_args = flowQueue({
name: "run_listen_operation_args"
})
Insert cell
run_listen_operation_args
Insert cell
run_listen_effect = {
const client = run_listen_operation_args.client;
const action = run_listen_operation_args.action;
const data = run_listen_operation_args.data;
enqueue_notify(client, client.client_id, {
action: "DATA",
key: action.key,
data: data
});
add_data_listener(client, action.key);
return enqueue_notify(client, client.client_id, {
request_id: action.request_id,
status: "ok"
});
}
Insert cell
listen_operation_response = viewof run_listen_operation_args.respond(
run_listen_effect
)
Insert cell
Insert cell
viewof run_unlisten_operation_args = flowQueue({
name: "run_unlisten_operation_args"
})
Insert cell
run_unlisten_operation_args
Insert cell
run_unlisten_effect = {
const client = run_unlisten_operation_args.client;
const action = run_unlisten_operation_args.action;
remove_data_listener(client, action.key);
return enqueue_notify(client, client.client_id, {
request_id: action.request_id,
status: "ok"
});
}
Insert cell
unlisten_operation_response = viewof run_unlisten_operation_args.respond(
run_unlisten_effect
)
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more