Published
Edited
Aug 16, 2020
4 stars
Insert cell
Insert cell
Insert cell
Insert cell
RXall(() => rx.ajax.ajax(`https://api.github.com/users?per_page=2`))
Insert cell
Insert cell
RXall(() =>
rx.Observable.create(observer => {
observer.next('Hello');
observer.next('World');
observer.complete();
})
)
Insert cell
Insert cell
{
// will capture current date time
const s1 = rx.of(new Date());
// will capture date time at the moment of subscription
const s2 = rx.defer(() => rx.of(new Date()));

return RXall(() =>
rx.timer(2000).pipe(
op.switchMap(_ => rx.merge(s1, s2)),
// capture current date time before the timer
op.startWith(new Date())
)
);
}
Insert cell
Insert cell
RXall(_ => rx.empty())
Insert cell
Insert cell
RXall(
_ =>
rx
.from(Promises.delay(100, [1, 2, 3]))
.pipe(op.concatMap(v => rx.of(...v))),
{ logMS: true }
)
Insert cell
RXall(_ => rx.from([1, 2, 3]))
Insert cell
Insert cell
Insert cell
RXall(_ => rx.generate(2, x => x <= 38, x => x + 3, x => '.'.repeat(x)))
Insert cell
Insert cell
RXall(_ => rx.interval(1000).pipe(op.take(10)))
Insert cell
Insert cell
RXall(_ => rx.of(1, 2, 3, 4, 5))
Insert cell
RXall(_ =>
rx.of({ name: 'Brian' }, [1, 2, 3], function hello() {
return 'Hello';
})
)
Insert cell
Insert cell
RXall(_ => rx.range(1, 10))
Insert cell
Insert cell
RXall(_ => rx.throwError('this is an error'))
Insert cell
Insert cell
RXall(_ => rx.timer(1000))
Insert cell
RXall(_ =>
// timer takes a second argument, how often to emit subsequent
// values in this case we will emit first value after 1 second
// and subsequent values every 2 seconds after
rx.timer(1000, 2000).pipe(op.take(10))
)
Insert cell
Insert cell
Insert cell
RXall(_ => rx.throwError(new Error('oh no')))
Insert cell
RXall(_ =>
rx
.throwError(new Error('oh no'))
.pipe(op.catchError(err => rx.of(err.message)))
)
Insert cell
Insert cell
RXall(_ =>
rx.interval(100).pipe(
op.mergeMap(v => {
if (v > 2) {
throw v;
// we can return throwError instead
// return rx.throwError(v);
}
return rx.of(v);
}),
op.retry(2)
)
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.mergeMap(v => {
if (v > 2) throw v;
return rx.of(v);
}),
op.retryWhen(errors =>
errors.pipe(
op.tap(v => console.log(`value ${v} was too high`)),
op.delayWhen(v => rx.timer(3000))
)
),
// otherwise it'll retry indefinitely
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
Insert cell
RXall(_ =>
rx
.interval(1000)
.pipe(op.take(2))
.pipe(
op.map(v =>
rx.interval(1000).pipe(
op.map(i => `Result (${v}): ${i}`),
op.take(5)
)
)
)
.pipe(op.combineAll())
)
Insert cell
Insert cell
RXall(_ =>
rx
.combineLatest(rx.timer(100, 400), rx.timer(200, 400), rx.timer(300, 400))
.pipe(op.take(10))
)
Insert cell
RXall(_ =>
rx
.combineLatest(
rx.timer(1000, 4000),
rx.timer(2000, 4000),
rx.timer(3000, 4000),
(one, two, three) => `One: ${one}, Two: ${two}, Three: ${three}`
)
.pipe(op.take(10))
)
Insert cell
Insert cell
RXall(_ => rx.concat(rx.of(1, 2, 3), rx.of(4, 5, 6), rx.of(7, 8, 9)))
Insert cell
RXall(
_ =>
rx.concat(
rx.of(1, 2, 3),
rx.empty().pipe(op.delay(100)),
rx.of(4, 5, 6),
rx.empty().pipe(op.delay(200)),
rx.of(7, 8, 9)
),
{
logMS: true
}
)
Insert cell
Insert cell
RXall(_ =>
rx.interval(100).pipe(
op.map(v => rx.of(v + 10)),
// merge values from inner observable
op.concatAll(),
op.take(10)
)
)
Insert cell
Insert cell
RXall(_ => rx.of(1, 2, 3).pipe(op.endWith('begin')))
Insert cell
Insert cell
RXall(_ =>
rx.of(1, 2, 3).pipe(
op.endWith('end'),
op.startWith('begin')
)
)
Insert cell
Insert cell
RXall(_ => rx.forkJoin(rx.of('hello'), rx.of('world').pipe(op.delay(1000))))
Insert cell
Insert cell
RXall(
_ =>
rx
.merge(
rx.interval(1000).pipe(op.mapTo('first')),
rx.interval(2000).pipe(op.mapTo('second')),
rx.interval(3000).pipe(op.mapTo('third'))
)
.pipe(op.take(10)),
{ logMS: true }
)
Insert cell
RXall(
_ =>
rx
.empty()
.pipe(
op.merge(
rx.interval(1000).pipe(op.mapTo('first')),
rx.interval(2000).pipe(op.mapTo('second')),
rx.interval(3000).pipe(op.mapTo('third'))
)
)
.pipe(op.take(10)),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.of(1, 2, 3, 4, 5).pipe(
op.map(v => Promises.delay((5 - v) * 1000, v)),
// without mergeAll() we'll have Promises in the stream
op.mergeAll()
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(_ =>
rx.interval(1000).pipe(
op.pairwise(),
op.take(10)
)
)
Insert cell
Insert cell
RXall(
_ =>
rx
.race(
rx.interval(1000).pipe(op.map(v => `first ${v}`)),
rx.interval(1100).pipe(op.mapTo('second')),
rx.interval(1200).pipe(op.mapTo('third')),
rx.interval(1300).pipe(op.mapTo('fourth'))
)
.pipe(op.take(10)),
{ logMS: true }
)
Insert cell
Insert cell
RXall(_ =>
rx.interval(1000).pipe(
op.withLatestFrom(rx.interval(100)),
op.map(([a, b]) => `${a} - ${b}`),
op.take(10)
)
)
Insert cell
RXall(_ =>
rx.interval(100).pipe(
op.withLatestFrom(rx.interval(300)),
op.map(([a, b]) => `${a} - ${b}`),
op.take(10)
)
)
Insert cell
Insert cell
RXall(_ =>
rx.zip(
rx.of('foo1', 'foo2', 'foo3'),
rx.of('bar1', 'bar2'),
rx.of('baz1', 'baz2')
)
)
Insert cell
Insert cell
Insert cell
RXall(_ => rx.of().pipe(op.defaultIfEmpty('default')))
Insert cell
Insert cell
RXall(_ => rx.of(1, 2, 3, 4).pipe(op.every(v => !(v & 1))))
Insert cell
RXall(_ => rx.of(2, 4).pipe(op.every(v => !(v & 1))))
Insert cell
Insert cell
RXall(_ =>
rx.interval(100).pipe(
op.mergeMap(v => rx.iif(_ => v & 1, rx.of(`odd ${v}`), rx.of(`even ${v}`))),
op.take(10)
)
)
Insert cell
RXall(_ =>
rx.interval(100).pipe(
op.mergeMap(v => rx.iif(_ => v & 1, rx.of(`odd ${v}`))),
op.take(10)
)
)
Insert cell
Insert cell
{
const expectedSequence = rx.of(4, 5, 6);
return RXall(_ =>
rx
.of([1, 2, 3], [4, 5, 6], [7, 8, 9])
.pipe(
op.switchMap(arr =>
rx.from(arr).pipe(op.sequenceEqual(expectedSequence))
)
)
);
}
Insert cell
{
const expectedSequence = rx.of(5, 6, 7);
return RXall(_ =>
rx.of(1, 2, 3, 4, 4, 5, 6, 7, 8, 9).pipe(
// we need to start buffer every 1 element,
// or we could skip the sequence it won't
// fit on the buffer boundary
op.bufferCount(3, 1),
op.switchMap(arr => rx.from(arr).pipe(op.sequenceEqual(expectedSequence)))
)
);
}
Insert cell
Insert cell
Insert cell
{
let resolve;
let vals = [];
const generator = Generators.observe(r => {
resolve = r;
resolve(vals);
});

let count = 0;
const source = rx.interval(100).pipe(
op.tap(v => {
vals.push(++count);
resolve(vals);
}),
op.take(10),
// source will start emitting values only after 'connect()' call.
op.publish()
);

Promises.delay(2000).then(_ => source.connect());

return generator;
}
Insert cell
Insert cell
{
let count = 0;
const source = rx.interval(100).pipe(
// create side-effects to ensure that 'source' stream processes data only once
op.map(_ => ++count),
op.take(20)
);

// Subject:
// All 20 events will arrive only to gen1. gen2 will only have the ones that will only be emitted after subscription.
const multi = source.pipe(op.multicast(_ => new rx.Subject()));

const gen1 = RXall(_ => multi);
multi.connect();

await Promises.delay(1000);

const gen2 = RXall(_ => multi, { logMS: true });
return gen2;
}
Insert cell
{
let count = 0;
const source = rx.interval(100).pipe(
// create side-effects to ensure that 'source' stream processes data only once
op.map(_ => ++count),
op.take(20)
);

// ReplaySubject:
// All 20 events will arrive to both gen1 and gen2, but since we're subscribing to gen2 after delay the first 10 events will arrive there immediately after subscription
// similar to shareReplay()
const multi = source.pipe(op.multicast(_ => new rx.ReplaySubject(5)));

const gen1 = RXall(_ => multi);
multi.connect();

await Promises.delay(1000);

const gen2 = RXall(_ => multi, { logMS: true });
return gen2;
}
Insert cell
Insert cell
{
let count = 0;
const source = rx.interval(100).pipe(
// create side-effects to ensure that 'source' stream processes data only once
op.map(_ => ++count),
op.take(20)
);

// without '.pipe(op.share())' the side-effects will be executed twice per event
const shared = source.pipe(op.share());

const gen1 = RXall(_ => shared, { logMS: true });
// without this delay both gen1 and gen2 will be the same
await Promises.delay(1000);
const gen2 = RXall(_ => shared, { logMS: true });

return gen2;
}
Insert cell
Insert cell
{
let count = 0;
const source = rx.interval(100).pipe(
// create side-effects to ensure that 'source' stream processes data only once
op.map(_ => ++count),
op.take(20)
);

// without '.pipe(op.share())' the side-effects will be executed twice per event
// similar to multicast(ReplaySubject)
const shared = source.pipe(op.shareReplay(5));

const gen1 = RXall(_ => shared, { logMS: true });
await Promises.delay(1000);
const gen2 = RXall(_ => shared, { logMS: true });

return gen2;
}
Insert cell
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.auditTime(1000),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.audit(_ => rx.interval(1000)),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx
.concat(
rx.of(0),
rx.empty().pipe(op.delay(50)),
rx.of(1, 2, 3),
rx.empty().pipe(op.delay(100)),
rx.of(4, 5, 6),
rx.empty().pipe(op.delay(100))
)
.pipe(
// need an interval of 100ms between events to report a value
op.debounceTime(100)
),
{ logMS: true }
)
Insert cell
Insert cell
Insert cell
RXall(_ => rx.of(1, 2, 2, 2, 3, 2, 3, 4).pipe(op.distinct()))
Insert cell
RXall(_ =>
rx
.of(
{ id: 1, value: 'hello' },
{ id: 2, value: 'world' },
{ id: 1, value: 'hello again' }
)
.pipe(op.distinct(v => v.id))
)
Insert cell
Insert cell
RXall(_ => rx.of(1, 2, 2, 2, 3, 2, 3, 4).pipe(op.distinctUntilChanged()))
Insert cell
Insert cell
RXall(_ =>
rx
.of(
{ id: 1, value: 'hello' },
{ id: 2, value: 'hello' },
{ id: 2, value: 'world' },
{ id: 3, value: 'hello' }
)
.pipe(op.distinctUntilKeyChanged('id'))
)
Insert cell
Insert cell
RXall(_ =>
rx.range(1, 10).pipe(
// return only odd numbers
op.filter(v => v & 1)
)
)
Insert cell
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.find(v => v > 5)))
Insert cell
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.first()))
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.first(v => v > 5)))
Insert cell
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.last()))
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.last(v => v < 5)))
Insert cell
Insert cell
RXall(_ =>
rx.interval(100).pipe(
op.take(10),
// ignore everything but complete and error
op.ignoreElements()
)
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.sample(rx.interval(1000)),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.single(v => v & 1)))
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.single(v => v === 4)))
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.skip(5),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.skipUntil(rx.timer(550)),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.skipWhile(v => v < 5),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(_ => rx.range(1, 100).pipe(op.take(10)))
Insert cell
Insert cell
RXall(_ => rx.range(1, 100).pipe(op.takeLast(10)))
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.takeUntil(rx.timer(550)),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(_ => rx.interval(100).pipe(op.takeWhile(v => v < 5)), { logMS: true })
Insert cell
Insert cell
RXall(
_ =>
rx
.concat(
rx.of(0),
rx.empty().pipe(op.delay(50)),
rx.of(1, 2, 3),
rx.empty().pipe(op.delay(100)),
rx.of(4, 5, 6),
rx.empty().pipe(op.delay(100))
)
.pipe(
// will take value every 100ms after last value
op.throttleTime(100)
),
{ logMS: true }
)
Insert cell
Insert cell
Insert cell
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.bufferCount(3)))
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.bufferCount(3, 2)))
Insert cell
RXall(_ => rx.range(1, 10).pipe(op.bufferCount(3, 1)))
Insert cell
Insert cell
RXall(
_ =>
rx.interval(10).pipe(
op.bufferTime(100),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.bufferToggle(rx.interval(1000), v => rx.interval(2000)),
op.take(3)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.bufferWhen(v => rx.interval(1000)),
op.take(5)
),
{ logMS: true }
)
Insert cell
Insert cell
Insert cell
RXall(
_ =>
rx
.of(2000, 1000)
// compare with mergeMap
.pipe(op.concatMap(v => rx.of(`Delayed by: ${v}ms`).pipe(op.delay(v)))),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx
.interval(100)
.pipe(op.take(5))
.pipe(
op.concatMapTo(rx.of('Network request complete').pipe(op.delay(1000)))
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx
.of(2000, 1000)
.pipe(op.exhaustMap(v => rx.of(`Delayed by: ${v}ms`).pipe(op.delay(v)))),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx
.of(2000, 1000)
// compare with concatMap
.pipe(op.mergeMap(v => rx.of(`Delayed by: ${v}ms`).pipe(op.delay(v)))),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx
.of(2000, 1000)
.pipe(op.switchMap(v => rx.of(`Delayed by: ${v}ms`).pipe(op.delay(v)))),
{ logMS: true }
)
Insert cell
Insert cell
RXall(_ =>
rx.of(1).pipe(
op.expand(v => rx.of(v + 2)),
op.take(10)
)
)
Insert cell
Insert cell
RXall(_ =>
rx
.of(
{ name: 'Sue', age: 25 },
{ name: 'Joe', age: 30 },
{ name: 'Frank', age: 25 },
{ name: 'Sarah', age: 35 }
)
.pipe(
op.groupBy(v => v.age),
op.mergeMap(g => g.pipe(op.toArray()))
)
)
Insert cell
Insert cell
RXall(_ => rx.range(10).pipe(op.map(v => v * 100)))
Insert cell
Insert cell
RXall(_ => rx.range(10).pipe(op.mapTo('yay')))
Insert cell
Insert cell
RXall(_ =>
rx.interval(10).pipe(
op.skip(30),
// count number of events
op.mergeScan((acc, curr) => rx.of(acc + 1), 0),
op.take(10)
)
)
Insert cell
Insert cell
{
const [odd, even] = rx.range(10).pipe(op.partition(v => v & 1));
return RXall(_ => odd);
}
Insert cell
Insert cell
RXall(_ =>
rx
.of(
{ name: 'Sue', age: 25 },
{ name: 'Joe', age: 30 },
{ name: 'Frank', age: 25 },
{ name: 'Sarah', age: 35 }
)
.pipe(op.pluck('name'))
)
Insert cell
Insert cell
RXall(_ => rx.range(10).pipe(op.reduce((acc, curr) => acc + curr, 0)))
Insert cell
Insert cell
RXall(_ => rx.range(10).pipe(op.scan((acc, curr) => acc + curr, 0)))
Insert cell
Insert cell
RXall(_ =>
rx.interval(100).pipe(
op.take(10),
op.toArray()
)
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.window(rx.interval(1000)),
// take 3 elements from each window
op.map(win => win.pipe(op.take(3))),
// flatten back to a single stream
op.mergeAll(),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.windowCount(10),
// take 3 elements from each window
op.map(win => win.pipe(op.take(3))),
// flatten back to a single stream
op.mergeAll(),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.windowTime(1000),
// take 3 elements from each window
op.map(win => win.pipe(op.take(3))),
// flatten back to a single stream
op.mergeAll(),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.windowToggle(rx.timer(0, 1000), v => rx.interval(1000)),
// take 3 elements from each window
op.map(win => win.pipe(op.take(3))),
// flatten back to a single stream
op.mergeAll(),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.windowWhen(v => rx.interval(1000)),
// take 3 elements from each window
op.map(win => win.pipe(op.take(3))),
// flatten back to a single stream
op.mergeAll(),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
Insert cell
{
const r = [];
RXall(_ =>
rx.range(10).pipe(
// we can do arbitrary side-effects here
op.tap(v => r.push(v)),
op.mapTo('foo')
)
);
return r;
}
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.delay(1000),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(
_ =>
rx.interval(100).pipe(
op.delayWhen(_ => rx.timer(1000)),
op.take(10)
),
{ logMS: true }
)
Insert cell
Insert cell
RXall(_ =>
rx
.from([
rx.Notification.createNext('SUCCESS!'),
rx.Notification.createError('ERROR!'),
rx.Notification.createNext('ignored after error')
])
.pipe(
// turn notification objects into notification values
op.dematerialize()
)
)
Insert cell
Insert cell
{
const r = [];
RXall(_ =>
rx.range(10).pipe(
// we can do arbitrary side-effects here
// similar to 'tap', but runs only on completion
op.finalize(_ => r.push('complete')),
op.mapTo('foo')
)
);
return r;
}
Insert cell
Insert cell
RXall(_ => rx.of(1, 2, 3).pipe(op.repeat(3)))
Insert cell
Insert cell
RXall(_ =>
rx.timer(100, 1000).pipe(
op.timeInterval(),
op.pluck('interval'),
op.take(10)
)
)
Insert cell
Insert cell
RXall(
_ =>
rx.of(3000, 500, 2000, 1000, 100).pipe(
op.concatMap(v =>
rx.of(`done ${v}`).pipe(
op.delay(v),
op.timeout(1000),
op.catchError(err => rx.of(`timeout ${v}`))
)
)
),
{ logMS: true }
)
Insert cell
Insert cell
rx
.of(1, 2, 3)
.pipe(op.delay(1000))
.toPromise()
Insert cell
Insert cell
import { rx, op, RX, RXall, RXlatest } from '@mblsha/rxjs'
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more