I created the Observable constructor below that works as described. Does anyone know if there is a more concise way of achieving the same behaviour using the operators that come with RxJs? I was looking at bufferToggle which is close to the required behaviour, but I need the emitted values to be passed through when the buffer is closed.
Function Description: Buffers the emitted source values if the condition emits true, and passes through the emitted source values if the condition emits false. If the condition emits false after being true, the buffer releases each value in the order that they were received. The buffer is initialised to pass through the emitted source values until the condition emits true.
function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.do(value => isBufferOpen = value)
.filter(value => !value)
.subscribe(value => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => {
subscriptions.forEach(sub => sub.unsubscribe());
};
});
}
In response to comment, the following is the same function as the one above but in the form of an RxJs Operator and updated to use RxJx 6+ pipeabale Operators:
function bufferIf<T>(condition: Observable<boolean>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
return new Observable<T>(subscriber => {
const subscriptions: Subscription[] = [];
const buffer: T[] = [];
let isBufferOpen = false;
subscriptions.push(
// handle source events
source.subscribe(value => {
// if buffer is open, or closed but buffer is still being
// emptied from previously being closed.
if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
buffer.push(value);
} else {
subscriber.next(value);
}
}),
// handle condition events
condition.pipe(
tap(con => isBufferOpen = con),
filter(() => !isBufferOpen)
).subscribe(() => {
while (buffer.length > 0 && !isBufferOpen) {
subscriber.next(buffer.shift());
}
})
);
// on unsubscribe
return () => subscriptions.forEach(sub => sub.unsubscribe());
});
}
}
Here's another way, slightly briefer (adding a new answer as the previous is rather busy)
// Source and buffering observables
const source$ = Rx.Observable.timer(0, 200).take(15)
const bufferIt$ = Rx.Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)
const makeHot$ = (src) => {
const hot$ = new Rx.Subject();
src.subscribe(x => hot$.next(x));
return hot$;
}
// Buffered output
const buffered$ = (source, bufferIt) => {
const hot$ = makeHot$(source)
const close = new Rx.Subject()
return bufferIt
.concat(Rx.Observable.of(false)) // ensure last buffer emits
.do(x => {if(!x) close.next(true)} ) // close previous buffer
.switchMap(x => x ? hot$.buffer(close) : hot$.map(x=>[x]))
.mergeAll()
}
// Proof
const start = new Date()
const outputDisplay = buffered$(source$, bufferIt$).timestamp()
.map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
const bufferDisplay = bufferIt$.timestamp()
.map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
bufferDisplay.merge(outputDisplay)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With