Hi I'm looking for an RxJs operator that behaves similar to withLatestFrom, with the exception that it would wait for the second stream to emit a value instead of skipping it. To be claer: I only want emissions when the first stream emits a new value.
So instead of:
---A-----B----C-----D-|
------1----------2----|
withLatestFrom
---------B1---C1----D2|
I want this behavior:
---A-----B----C-----D-|
------1----------2----|
?????????????
------A1-B1---C1----D2|
Is there an operator for this?
Smola came up witha nice and clean solution in the comments that simply uses a distinctUntilKeyChanged operator:
combineLatest(first$, second$)
.pipe(distinctUntilKeyChanged(0))
As you can see in the RxViz diagram, this produces the desired result:

I don't think there's an operator that does exactly this, but you can achieve those results by combining a high order mapping operator and a Subject:
second$ = second$.pipe(shareReplay({ bufferSize: 1, refCount: false }));
first$.pipe(
concatMap(
firstVal => second$.pipe(
map(secondVal => `${firstVal}${secondVal}`),
take(1),
)
)
)
shareReplay places a ReplaySubject in front on the data producer. This means that it will reply latest N(bufferSize) values to every new subscriber. refCount makes sure that if there are no more active subscribers, the ReplaySubject in use won't be destroyed.
I decided to use concatMap as I think it's safer for the ReplaySubject to have only one active susbcriber.
Considering this scheme:
---A-----B----C-----D-| first$
------1----------2----| second$
When A comes in, the ReplaySubject(from shareReplay) will receive a new subscriber and A it will wait until second$ emits. When this happens, you'd get A1 and the inner observable will complete(meaning that its subscriber will be removed from the ReplaySubject's subscribers list). 1 will be cached by the ReplaySubject.
Then B comes in, the newly created inner subscriber will subscribe to second$ and will receive 1 immediately, resulting into B1. Same with C.
Now comes an important part: the ReplaySubject should have no active subscribers when it receives a new value from its source, so that's why I opted for take(1). When 2 will come, the ReplaySubject will have no active subscribers, so nothing happens.
Then D arrives and will receive the latest stored value, 2, resulting into D2.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With