Published
Edited
Apr 3, 2021
Importers
1 star
Insert cell
Insert cell
// Use rx.of(rx.defer()) + op.mergeAll() to only start requests after the rate limiter.
// Otherwise the requests will be started before and rateLimiter will limit only the processing of their results.
rx
.of(rx.defer(_ => getData(1)))
.pipe(
rateLimiter,
op.mergeAll()
)
.toPromise()
Insert cell
RXall(_ => rx.range(0, 10).pipe(rateLimiter))
Insert cell
RXall(_ => rx.range(0, 10).pipe(rateLimiter))
Insert cell
RXall(_ => rx.range(20, 20).pipe(rateLimiter))
Insert cell
RXall(_ => rx.defer(_ => rx.range(0, 10)).pipe(rateLimiter))
Insert cell
RXall(_ =>
rx.from(d3.range(0, 10).map(i => rx.defer(_ => getData(i)))).pipe(
rateLimiter,
op.mergeAll()
)
)
Insert cell
RXall(_ =>
rx.from(d3.range(20, 30).map(i => rx.defer(_ => getData(i)))).pipe(
rateLimiter,
op.mergeAll()
)
)
Insert cell
rateLimiter = rateLimit(3, 1000)
Insert cell
// https://github.com/gsipos/rxjs-ratelimit/blob/master/src/rxjs-ratelimit.ts
// https://aliz.ai/rate-limiting-in-rxjs/
function rateLimit(count, slidingWindowTime) {
let tokens = count;
const tokenChanged = new rx.BehaviorSubject(tokens);
const consumeToken = () => tokenChanged.next(--tokens);
const renewToken = () => tokenChanged.next(++tokens);
const availableTokens = tokenChanged.pipe(op.filter(() => tokens > 0));

return function(source) {
return source.pipe(
op.mergeMap(value =>
availableTokens.pipe(
op.take(1),
op.map(() => {
consumeToken();
rx.timer(slidingWindowTime).subscribe(renewToken);
return value;
})
)
)
);
};
}
Insert cell
function getData(x) {
return d3
.json(`https://httpbin.org/get?id=${x}&time=${new Date().toISOString()}`)
.then(i => i.args);
}
Insert cell
Insert cell
Insert cell
Insert cell

Purpose-built for displays of data

Observable is your go-to platform for exploring data and creating expressive data visualizations. Use reactive JavaScript notebooks for prototyping and a collaborative canvas for visual data exploration and dashboard creation.
Learn more