Public
Edited
Apr 23
Insert cell
Insert cell
mutable logged = [] // Rerun cell to reset
Insert cell
viewof trig = Inputs.button('Retrigger')
Insert cell
many = Array.from({length:1000},(_,i)=>i)
Insert cell
Insert cell
dep2 = {
// these updates are batched after the previous batch
// emulate starting/stopping
// and force dependency on dep1
store.stop()
store.start();
dep1, await Promises.delay(1000);

store.dispatch({
type: "decrement"
});

store.dispatch({
type: "decrement",
meta: 'hello'
});

// read values without consuming the main generator
return tee(store.consumer)[0].next().value.count
}

Insert cell
logger = (state) => mutable logged = mutable logged.concat(state)
Insert cell
// Emulate heavy reducer

reducer = /*(state, action) => {
switch (action?.type) {
case "increment":
return { ...state, count: state.count + Array.from({length:128},(_,i)=>1).reduce((a,b)=>a+b) };
case "decrement":
return { ...state, count: state.count - Array.from({length:128},(_,i)=>1).reduce((a,b)=>a+b) };
default:
return state;
}
}*/

// Direct mutation
(state, action) => {
const newState = (() => {
switch (action?.type) {
case "increment":
state.count += Array.from({length:128},(_,i)=>1).reduce((a,b)=>a+b);
break;
case "decrement":
state.count -= Array.from({length:128},(_,i)=>1).reduce((a,b)=>a+b);
break;
/*default:
return state;*/
}
return state
})();

return newState
// Freeze the new state to prevent subsequent mutation
// Useful for single call guarantees but retaining the interface
// return Object.freeze(newState);
};
Insert cell
store = new Batch({
reducer,
initialState: { count: -1 },
observers: [logger],
debug: false,
batch:true
})
Insert cell
// As long as there is memory available, #batchedTaskQueue() handles batches very quickly

class Batch {
#loop;
#taskQueue = [];
#isProcessing = false; //

constructor({ reducer, initialState, debug, batch, observers }) {
this.state = initialState ?? { count: 0, okay: true };
this.reducer = reducer ?? this.defaultReducer;
this.observers = new Set(observers);
this.debug = debug ?? false;
this.batch = batch ?? true;

this.start();
}

defaultReducer = (state, action) => {
switch (action?.type) {
case "increment":
return { ...state, count: state.count + 1 };
case "decrement":
return { ...state, count: state.count - 1 };
case "value":
return { ...state } ;
default:
return state;
}
};

start() {
if (this.running) this.stop();
if (this.debug) {
console.log("Init loop", this.state);
}
this.running = true;
this.#loop = this.createEventLoop();
this.consumer = this.createConsumer();
this.#loop.next();
this.consumer.next();
}

stop() {
this.running = false;
this.#loop.return();
this.consumer.return();
}

createEventLoop() {
let that = this;
const eventLoop = function* ({ reducer, state }) {
try {
while (true) {
const action = yield state; // where the magic happens => action takes the value passed to .next()
state = reducer(state, action);
that.consumer.next(state); // How to pass state to consumer??
if (that.debug) {
console.log("Iteration", state);
}
}
} finally {
if (that.debug) {
console.log("Exit loop", state);
}
}
}; //.bind(this); // some performance overhead => pass `this` instead

return eventLoop({ reducer: this.reducer, state: this.state });
}
// Example Consumer generator that yields the latest state
createConsumer() {
let that = this;
const consumer = function* (state) {
try {
while (true) {
const newState = yield state; // Yield the current state
state = newState ?? that.getState(); // Update the state when notified by the event loop
}
} finally {
if (that.debug) {
console.log("Exit consumer", state);
}
}
};
return consumer(this.getState());
}

// Blocking Version => MVP
/*dispatch(action) {
if (this.batch) this.#taskQueue.push(action);
if (this.#taskQueue.length === 1) {
queueMicrotask(() => {
this.#taskQueue.forEach(
(task) => (this.state = this.#loop.next(task).value)
);
if (this.debug) console.log("Processed");
this.notifyObservers();
this.#taskQueue.length = 0;
});
} else if (!this.batch) {
this.state = this.#loop.next(action).value;
this.notifyObservers();
}
return this;
}*/

// Non-blocking version => Performant
dispatch(action) {
if (this.batch) this.#taskQueue.push(action);
if (this.#taskQueue.length === 1) {
// Start processing the queue asynchronously
// Method 1
// queueMicrotask(() => this.#processTaskQueue());

// Method 2
queueMicrotask(() => this.#batchedTaskQueue());
} else if (!this.batch) {
// Process single action immediately if not batching
this.state = this.#loop.next(action).value;
this.notifyObservers();
}

return this;
}

// Method 1:
// Asynchronous queue processor
#processTaskQueue() {
const processNextTask = () => {
if (this.#taskQueue.length === 0) {
// Queue is empty, notify observers and reset
if (this.debug) console.log("Processed");
this.notifyObservers();
return;
}

// Process one task at a time
const task = this.#taskQueue.shift();
this.state = this.#loop.next(task).value;

// Schedule the next task asynchronously
queueMicrotask(processNextTask);
};

// Start processing the first task
processNextTask();
}
// Method 2:
// Asynchronous batched queue processor
#batchedTaskQueue(batchSize = 512) {
let readIndex = 0; // Track the current position in the queue

const processBatch = () => {
let tasksProcessed = 0;

// Process up to `batchSize` tasks in this microtask
while (readIndex < this.#taskQueue.length && tasksProcessed < batchSize) {
const task = this.#taskQueue[readIndex];
this.state = this.#loop.next(task).value;
readIndex++;
tasksProcessed++;
}

if (readIndex < this.#taskQueue.length) {
// Schedule the next batch asynchronously
queueMicrotask(processBatch);
} else {
// Queue is fully processed, notify observers and reset
if (this.debug) console.log("Processed");
this.notifyObservers();

// Reset the queue and readIndex
this.#taskQueue.length = 0;
readIndex = 0;
}
};

// Start processing the first batch
processBatch();
}

notifyObservers() {
if (this.debug) console.log("Forwarded");
this.observers.forEach((observer) => observer(this.getState()));
}

addObserver(observer) {
this.observers.add(observer);
return () => this.removeObserver(observer);
}

removeObserver(observer) {
this.observers.remove(observer); // = this.observers.filter((obs) => obs !== observer);
}

getState() {
return structuredClone(this.state);
}

settle() {
return new Promise((keep) => {
setTimeout(() => keep(this.getState()));
});
}

readState() {
return this.consumer.next().value;
}
}

Insert cell
class TimeSlicedBatch extends Batch {

constructor() {
super()
}
#timeSlice = 5; // Maximum time (in ms) to spend processing tasks in one go

#batchedTaskQueue() {
let readIndex = 0;
const startTime = performance.now();

const processBatch = () => {
while (readIndex < this.#taskQueue.length) {
const task = this.#taskQueue[readIndex];
this.state = this.#loop.next(task).value;
readIndex++;

if (performance.now() - startTime >= this.#timeSlice) {
// Yield control back to the event loop
queueMicrotask(processBatch);
return;
}
}

if (readIndex === this.#taskQueue.length) {
if (this.debug) console.log("Processed");
this.notifyObservers();
this.#taskQueue.length = 0;
}
};

processBatch();
}
}
Insert cell
Insert cell
// not mine

function tee(iterable) {
const source = iterable[Symbol.iterator]();
const buffers = [[], []]; // substitute in queue type for efficiency
const DONE = Object.create(null);

const next = (i) => {
if (buffers[i].length !== 0) {
return buffers[i].shift();
}

const x = source.next();

if (x.done) {
return DONE;
}

buffers[1 - i].push(x.value);
return x.value;
};

return buffers.map(function* (_, i) {
for (;;) {
const x = next(i);

if (x === DONE) {
break;
}

yield x;
}
});
}

Insert cell
{
let index = new BatchMap([[1,2],[new Date, {}]])
index.keys(Number).values(Object)
return index
//return index
}
Insert cell
class BatchMap extends Map {
#loop;
#taskQueue = [];
#observers = new Set([]);
#debug = false;
#batch = true; // Batch mode defers until all microtasks have been executed

constructor(data) {
super(data);

// How to pass the Map data to the query and evaluate it lazily in the eventLoop after tasks have been dispatched?
this.state; //
this.start();
}

// Matches any type of types
#matchesType(value, ...types) {
return types.some((type) => this.compare(type, value));
}

// Helper function to compare a source to target type
compare(source, target) {
return (
((source?.prototype &&
target !== null &&
Object.getPrototypeOf(target ?? source).constructor.prototype ===
source?.prototype) ??
Object.is(source, target)) ||
source === target
);
}

// This should be refactored
// Reducer for matching and returning item types based on methods
// This should be so that output is accumulated into the state array
// And also evaluate key/value/entries that have been passed from an iterator
reducer = (state, { method, types } = {}) => {
let hasTypes = types.length;

let key, value;
switch (method) {
case "keys":
if (hasTypes && this.#matchesType(key, ...types)) {
return [key, undefined];
}

case "values":
if (hasTypes && this.#matchesType(value, ...types)) {
return [undefined, value];
}

case "entries":
if (
// Key must match the first type
(hasTypes && !this.#matchesType(key, types[0])) ||
// Value must match one of the remaining types
(types.length > 1 &&
!types.slice(1).some((type) => this.#matchesType(value, type)))
) {
return [key, value];
}
default:
return undefined;
}
};

// This should be refactored
// Should return a generator that lazily evaluates the (potentially chained) dispatched tasks
// And for each Iterator entry apply the reducer(s) and yield the result

createEventLoop() {
let that = this;
const eventLoop = function* ({ reducer, state }) {
try {
while (true) {
const action = yield state;
state = reducer(state, action);
if (that.debug) {
console.log("Event", state);
}
}
} finally {
if (that.debug) {
console.log("Exit loop", state);
}
}
}; //.bind(this); // some performance overhead

return eventLoop({ reducer: this.reducer, state: this.state });
}

dispatch(action) {
if (this.batch) this.#taskQueue.push(action);
if (this.#taskQueue.length === 1) {
// Start processing the queue asynchronously
//queueMicrotask(() => this.#processTaskQueue());
queueMicrotask(() => this.#batchedTaskQueue());
} else if (!this.batch) {
// Process single action immediately if not batching
this.state = this.#loop.next(action).value;
this.notifyObservers();
}

return this;
}
// Asynchronous batched queue processor
#batchedTaskQueue(batchSize = 1000) {
let readIndex = 0; // Track the current position in the queue

const processBatch = () => {
let tasksProcessed = 0;

// Process up to `batchSize` tasks in this microtask
while (readIndex < this.#taskQueue.length && tasksProcessed < batchSize) {
const task = this.#taskQueue[readIndex];
this.state = this.#loop.next(task).value;
readIndex++;
tasksProcessed++;
}

if (readIndex < this.#taskQueue.length) {
// Schedule the next batch asynchronously
queueMicrotask(processBatch);
} else {
// Queue is fully processed, notify observers and reset
if (this.debug) console.log("Processed");
this.notifyObservers();

// Reset the queue and readIndex
this.#taskQueue.length = 0;
readIndex = 0;
}
};

// Start processing the first batch
processBatch();
}

// Helper function to (re)start an event loop
start() {
if (this.running) this.stop();
this.running = true;
this.#loop = this.createEventLoop();
this.#loop.next();
if (this.#debug) {
console.log("Init loop", this.query);
}
}

// Helper function to stop the event loop
stop() {
this.running = false;
this.#loop.return();
}

notifyObservers() {
this.#observers.forEach((observer) => observer(this.getState()));
}

addObserver(observer) {
this.#observers.add(observer);
return () => this.removeObserver(observer);
}

removeObserver(observer) {
this.#observers.remove(observer); // = this.observers.filter((obs) => obs !== observer);
}

// Override .keys() to support filter by types
keys(...types) {
return this.dispatch({ method: "keys", types });
}

// Override .values() to support filtering by types
values(...types) {
return this.dispatch({ method: "values", types });
}

// Override .entries() to support filtering by types
entries(...types) {
return this.dispatch({ method: "entries", types });
}

getState() {
return structuredClone(this.query);
}
}

Insert cell
// prototype for BatchMap

function createEventLoopProto() {
const that = this;

// Generator function for the event loop
const eventLoop = function* ({ reducer, state }) {
try {
// Create an iterator over the Map entries
const mapIterator = that.entries();

// Outer loop: Iterate over the Map entries lazily
for (const [key, value] of mapIterator) {
let matches = true; // Track whether the entry matches the criteria

while (true) {
const action = yield state;

// Apply the reducer to the current [key, value] pair
if (action && typeof action === "object") {
const result = reducer([key, value], action);

// If the reducer returns a valid result, update the state
if (result) {
state.push(result);
} else {
matches = false; // Exclude the entry if it doesn't match
}
}

// Debugging: Log the current state
if (that.debug) {
console.log("Event", action, state);
}

// Break the inner loop if the entry doesn't match
if (!matches) break;
}
}
} finally {
// Cleanup on exit
if (that.debug) {
console.log("Exit loop", state);
}
}
};

// Initialize the event loop with the reducer and initial state
return eventLoop({ reducer: this.reducer, state: this.state });
}

Insert cell
// Reactive state getter using a Proxy
// Method to add proxy to .state variable on read
function /*get*/ state() {
const handler = {
get: (target, prop) => {
if (this./*#*/_isProcessing) {
return new Promise((resolve) => {
const checkIfSettled = () => {
if (!this./*#*/_isProcessing) {
resolve(Reflect.get(target, prop));
} else {
queueMicrotask(checkIfSettled);
}
};
checkIfSettled();
});
}
return Reflect.get(target, prop);
},
set: (target, prop, value) => {
if (this./*#*/isProcessing) {
throw new Error(
`State is currently being updated. Property '${prop}' cannot be modified.`
);
}
return Reflect.set(target, prop, value);
}
};

return new Proxy(this._state, handler);
}

Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more