a: 1---2-3-4--5---6
b: ------T---------
o: ------1234-5---6
Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?
I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.
const { concat, interval, of, from } = rxjs;
const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;
const waitUntil = signal$ => source$ => {
const sharedSource$ = source$.pipe(share());
return concat(
sharedSource$.pipe(
takeUntil(signal$),
toArray(),
mergeMap(from)
),
sharedSource$
);
}
const stopWaiting$ = of('signal').pipe(delay(2000));
const source$ = interval(500).pipe(
waitUntil(stopWaiting$)
).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>
I think @ZahiC's solution is correct but personally I'd do it in a single chain using the multicast operator.
a$.pipe(
multicast(new Subject(), s => concat(
s.pipe(
buffer(b$),
take(1),
),
s
)),
)
multicast will basically spit the stream into two where concat will first subscribe to the first one that is buffered until b$ emits. Then it completes immediately because of take(1) and concat subscribe to the same steam again but this time unbuffered.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With