function rateLimit(count, slidingWindowTime) {
let tokens = count;
const tokenChanged = new rx.BehaviorSubject(tokens);
const consumeToken = () => tokenChanged.next(--tokens);
const renewToken = () => tokenChanged.next(++tokens);
const availableTokens = tokenChanged.pipe(op.filter(() => tokens > 0));
return function(source) {
return source.pipe(
op.mergeMap(value =>
availableTokens.pipe(
op.take(1),
op.map(() => {
consumeToken();
rx.timer(slidingWindowTime).subscribe(renewToken);
return value;
})
)
)
);
};
}