How can I split a never ending stream into multiple ending streams based on a grouping method?
--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>
into these observables
--a--a-a-a-a-|
b---b-b--b-|
c-c---c-c-|
d-d-d-|
e...>
As you can see, the a is at the beginning, and after I receive b, i will no longer get a so it should be ended. That's why the normal groupBy is not good.
You can use window and share the source Observable. There's also a little trick with bufferCount(2, 1):
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
source
.bufferCount(2, 1) // delay emission by one item
.map(arr => arr[0])
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
This prints (because of toArray()):
[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]
The problem with this solution is the order of subscriptions to source. We need the window notifier to subscribe before the first bufferCount. Otherwise an item is first pushed further and then is checked whether it's different than the previous one with .filter(([oldValue, newValue]) ...).
This means that be need to delay emission by one before window (that's the first .bufferCount(2, 1).map(arr => arr[0]).
Or maybe it's easier to control the order of subscriptions myself with publish():
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();
const connectable = source.publish();
connectable
.window(source
.bufferCount(2, 1) // keep the previous and current item
.filter(([oldValue, newValue]) => oldValue !== newValue)
)
.concatMap(obs => obs.toArray())
.subscribe(console.log);
connectable.connect();
The output is the same.
Maybe someone can come up with something simpler but this works (fiddle: https://fiddle.jshell.net/uk01njgc/) ...
let counter = 0;
let items = Rx.Observable.interval(1000)
.map(value => Math.floor(value / 3))
.publish();
let distinct = items.distinctUntilChanged()
.publish();
distinct
.map(value => {
return items
.startWith(value)
.takeUntil(distinct);
})
.subscribe(obs => {
let obsIndex = counter++;
console.log('New observable');
obs.subscribe(
value => {
console.log(obsIndex.toString() + ': ' + value.toString());
},
err => console.log(err),
() => console.log('Completed observable')
);
});
distinct.connect();
items.connect();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With