Public
Edited
Jan 2, 2024
Insert cell
Insert cell
// An implementation of the observer pattern
class Observable {
constructor(initialValue) {
this._value = initialValue;
this.observers = [];
}

read() {
return this._value;
}

write(newValue) {
if (newValue !== this._value) {
this._value = newValue;
this.notifyObservers();
}
}

// Method to add an observer
subscribe(observer) {
this.observers.push(observer);
}

// Method to notify all observers about the value change
notifyObservers() {
this.observers.forEach(observer => observer(this._value));
}
}
Insert cell
function combineLatest(...inputs) {
const observables = inputs.slice(0, -1);
const subscription = inputs[inputs.length - 1];

// store the latest value for each observable
let latestValues = observables.map(obs => obs.read());

// initialize a new output observable
const obs = new Observable(subscription(...latestValues));

// every time an observable changes...
observables.forEach((observable, index) => {
observable.subscribe((value) => {
// push the update to latestValues
latestValues[index] = value;
// update the output observable
obs.write(subscription(...latestValues));
});
});

return obs;
}
Insert cell
// This dependency tree is too coarse!
combineLatest(i, t, e, (i, t, e) => i ? t : e)
Insert cell
function switchAll1(x /* Observable<T> */, f /* T => Observable<U> */) /* => Observable<U> */ {
const obsobs = combineLatest(x, f); // This produces an Observable<Observable<U>>. i.e. a stream of observables

// THIS IS JOIN/FLATTEN!
const obs = new Observable(obsobs.read().read());

// when obs changes...
obsobs.subscribe((innerObs) => {
// subscribe to the innerObs
innerObs.subscribe((value) => {
obs.write(value)
})

// OH NO! But we also have to remove the old subscription...
})

return obs;
}
Insert cell
function switchAll(...inputs) {
const obsobs = combineLatest(...inputs); // This produces an Observable<Observable<U>>. i.e. a stream of observables

// THIS IS JOIN/FLATTEN!
const obs = new Observable(obsobs.read().read());

// when obs changes...
obsobs.subscribe((innerObs) => {
// subscribe to the innerObs
innerObs.subscribe((value) => {
obs.write(value)
})

// OH NO! But we also have to remove the old subscription...
})

return obs;
}
Insert cell
// An implementation of the observer pattern
class ObservableV2 {
constructor(initialValue) {
this._value = initialValue;
this.observers = [];
this.sources = [];
}

read() {
return this._value;
}

write(newValue) {
if (newValue !== this._value) {
this._value = newValue;
this.notifyObservers();
}
}

// Method to add an observer
subscribe(observer) {
this.observers.push(observer);
}

// Method to notify all observers about the value change
notifyObservers() {
this.observers.forEach(observer => observer(this._value));
}

// from reactively
removeParentObservers(index) {
if (!this.sources) return;
for (let i = index; i < this.sources.length; i++) {
const source = this.sources[i]; // We don't actually delete sources here because we're replacing the entire array soon
const swap = source.observers.findIndex((v) => v === this);
source.observers[swap] = source.observers[source.observers.length - 1];
source.observers.pop();
}
}
}
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more