function* pipe(iterable, op) {
let subscriber;
let pool = [];
new rx.Observable((s) => (subscriber = s))
.pipe(op)
.subscribe((v) => pool.push(v));
for (const value of iterable) {
subscriber.next(value);
yield* pool;
if (subscriber.closed) {
return;
}
pool = [];
}
subscriber.complete();
}