Published
Edited
Oct 2, 2019
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell
bufferCount =
(stream, n) =>
stream
.scan( (acc, x) => acc.length < n ? [...acc, x] : [x], [] )
.filter( arr => arr.length === 3)
Insert cell
bufferCount( obs.range(1,10), 3).view({size:40})
Insert cell
Insert cell
bufferCount2 =
(stream, n) =>
stream
.scan( (acc, x) => acc.length < n ? [...acc, x] : [x], [] )
.materialize()
.scan( (acc, event) =>
event.kind === 'C' ?
({event: acc.event, keep: acc.event.value.length > 0 && acc.event.value.length < n}) :
({event: event, keep: event.value.length === n}), {event:{}} )
.filter( x => x.keep )
.map( x => x.event.value )
Insert cell
bufferCount2( obs.range(1,10), 3).view({size:40})
Insert cell
obs.range(1,10)
.scan( (acc, x) => ({ window: acc.i % 3 == 0 ? new rx.ReplaySubject() : (acc.window.next(x), acc.window), i: acc.i+1}), { i: 0 })
.pluck('window')
.distinctUntilChanged()
.values()
Insert cell
bufferCount3 =
(stream, n) =>
stream
.scan( (acc, x) => acc.length < n ? (acc.push(x), acc) : [x], [] )
.distinctUntilChanged()
Insert cell
bufferCount3(obs.range(1,10), 3).values()
Insert cell
Insert cell
Insert cell
seconds$ = obs.interval(1000)
Insert cell
value
Insert cell
sub = {
var s = seconds$.subscribe(set);
invalidation.then(() => s.unsubscribe());
return s;
}
Insert cell
Insert cell
setTimeout( () => sub.unsubscribe(), 10000 )
Insert cell
Insert cell
Insert cell
Insert cell
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more