class Batch {
#loop;
#taskQueue = [];
#isProcessing = false;
constructor({ reducer, initialState, debug, batch, observers }) {
this.state = initialState ?? { count: 0, okay: true };
this.reducer = reducer ?? this.defaultReducer;
this.observers = new Set(observers);
this.debug = debug ?? false;
this.batch = batch ?? true;
this.start();
}
defaultReducer = (state, action) => {
switch (action?.type) {
case "increment":
return { ...state, count: state.count + 1 };
case "decrement":
return { ...state, count: state.count - 1 };
case "value":
return { ...state } ;
default:
return state;
}
};
start() {
if (this.running) this.stop();
if (this.debug) {
console.log("Init loop", this.state);
}
this.running = true;
this.#loop = this.createEventLoop();
this.consumer = this.createConsumer();
this.#loop.next();
this.consumer.next();
}
stop() {
this.running = false;
this.#loop.return();
this.consumer.return();
}
createEventLoop() {
let that = this;
const eventLoop = function* ({ reducer, state }) {
try {
while (true) {
const action = yield state; // where the magic happens => action takes the value passed to .next()
state = reducer(state, action);
that.consumer.next(state); // How to pass state to consumer??
if (that.debug) {
console.log("Iteration", state);
}
}
} finally {
if (that.debug) {
console.log("Exit loop", state);
}
}
}; //.bind(this); // some performance overhead => pass `this` instead
return eventLoop({ reducer: this.reducer, state: this.state });
}
// Example Consumer generator that yields the latest state
createConsumer() {
let that = this;
const consumer = function* (state) {
try {
while (true) {
const newState = yield state; // Yield the current state
state = newState ?? that.getState(); // Update the state when notified by the event loop
}
} finally {
if (that.debug) {
console.log("Exit consumer", state);
}
}
};
return consumer(this.getState());
}
// Blocking Version => MVP
/*dispatch(action) {
if (this.batch) this.#taskQueue.push(action);
if (this.#taskQueue.length === 1) {
queueMicrotask(() => {
this.#taskQueue.forEach(
(task) => (this.state = this.#loop.next(task).value)
);
if (this.debug) console.log("Processed");
this.notifyObservers();
this.#taskQueue.length = 0;
});
} else if (!this.batch) {
this.state = this.#loop.next(action).value;
this.notifyObservers();
}
return this;
}*/
// Non-blocking version => Performant
dispatch(action) {
if (this.batch) this.#taskQueue.push(action);
if (this.#taskQueue.length === 1) {
// Start processing the queue asynchronously
// Method 1
// queueMicrotask(() => this.#processTaskQueue());
// Method 2
queueMicrotask(() => this.#batchedTaskQueue());
} else if (!this.batch) {
// Process single action immediately if not batching
this.state = this.#loop.next(action).value;
this.notifyObservers();
}
return this;
}
// Method 1:
// Asynchronous queue processor
#processTaskQueue() {
const processNextTask = () => {
if (this.#taskQueue.length === 0) {
// Queue is empty, notify observers and reset
if (this.debug) console.log("Processed");
this.notifyObservers();
return;
}
// Process one task at a time
const task = this.#taskQueue.shift();
this.state = this.#loop.next(task).value;
// Schedule the next task asynchronously
queueMicrotask(processNextTask);
};
// Start processing the first task
processNextTask();
}
// Method 2:
// Asynchronous batched queue processor
#batchedTaskQueue(batchSize = 512) {
let readIndex = 0; // Track the current position in the queue
const processBatch = () => {
let tasksProcessed = 0;
// Process up to `batchSize` tasks in this microtask
while (readIndex < this.#taskQueue.length && tasksProcessed < batchSize) {
const task = this.#taskQueue[readIndex];
this.state = this.#loop.next(task).value;
readIndex++;
tasksProcessed++;
}
if (readIndex < this.#taskQueue.length) {
// Schedule the next batch asynchronously
queueMicrotask(processBatch);
} else {
// Queue is fully processed, notify observers and reset
if (this.debug) console.log("Processed");
this.notifyObservers();
// Reset the queue and readIndex
this.#taskQueue.length = 0;
readIndex = 0;
}
};
// Start processing the first batch
processBatch();
}
notifyObservers() {
if (this.debug) console.log("Forwarded");
this.observers.forEach((observer) => observer(this.getState()));
}
addObserver(observer) {
this.observers.add(observer);
return () => this.removeObserver(observer);
}
removeObserver(observer) {
this.observers.remove(observer); // = this.observers.filter((obs) => obs !== observer);
}
getState() {
return structuredClone(this.state);
}
settle() {
return new Promise((keep) => {
setTimeout(() => keep(this.getState()));
});
}
readState() {
return this.consumer.next().value;
}
}