RxJS 5 Angular 2 RC4 app written in Typescript 1.9:
I have two observables in a chain. I would like, if a condition is met in the 2nd, for the first to be completed immediately. My efforts seem unnecessarily complex. In the example below, I try to stop the first observable after it has emitted 3 values:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.switchMap(count => {
if(count<3){ //just pass along the value
return Observable.create(observer=>{
observer.next(count);observer.complete()
})
}
else{ //abort by issuing a non-productive observable
return Observable.create(observer=>
observer.complete()
)
}
})
this.source.subscribe(count=>this.print('Ouput is '+count);
Here is the output:
*******EMITTING from Source*******
Output is 0
*******EMITTING from Source*******
Output is 1
*******EMITTING from Source*******
Output is 2
*******EMITTING from Source*******
*******EMITTING from Source*******
*******EMITTING from Source*******
So, functionally I get the result I want because the wider script stops getting notifications after three outputs. However, I'm sure there is a better way. My problems are:
You can use take operator to do it.take takes the first N events and completes the stream.
this.source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.take(3);
this.source.subscribe(count=>this.print('Ouput is '+count);
Your example's stream doesn't complete because switchMap's outer stream doesn't complete when inner streams complete. switchMap() is equal to map().switch(). In your example, the map part emits observables like:
And the switch part switches those observables and keeps waiting for upcoming observables.
EDIT
Your example also could be written as:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.takeWhile(count => count < 3);
EDIT 2
Regarding your comment, if you want to terminate the stream if the inner stream emits true:
source = Observable.interval(1000)
.do(()=>this.print("*******EMITTING from Source*******"))
.switchMap(count => createSomeObservable(count))
.takeWhile(x => x !== true);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With