Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can an RxJS 5 Observable source be stopped by another down the chain?

RxJS 5 Angular 2 RC4 app written in Typescript 1.9:

I have two observables in a chain. I would like, if a condition is met in the 2nd, for the first to be completed immediately. My efforts seem unnecessarily complex. In the example below, I try to stop the first observable after it has emitted 3 values:

source = Observable.interval(1000)
        .do(()=>this.print("*******EMITTING from Source*******"))
        .switchMap(count => {
            if(count<3){ //just pass along the value
                return Observable.create(observer=>{
                    observer.next(count);observer.complete()
                })                
            }
            else{ //abort by issuing a non-productive observable
                return Observable.create(observer=>
                    observer.complete()
                )
            }
        })
this.source.subscribe(count=>this.print('Ouput is '+count);

Here is the output:

*******EMITTING from Source*******
Output is 0
*******EMITTING from Source*******
Output is 1
*******EMITTING from Source*******
Output is 2
*******EMITTING from Source*******
*******EMITTING from Source*******
*******EMITTING from Source*******

So, functionally I get the result I want because the wider script stops getting notifications after three outputs. However, I'm sure there is a better way. My problems are:

  1. The upstream observable continues to emitting forever. How can I stop it?
  2. I'm creating a new observable down the chain on every emission. Shouldn't I be able to just pass along the first 3 values but abort or complete the chain on the 4th?
like image 275
BeetleJuice Avatar asked Jan 24 '26 17:01

BeetleJuice


1 Answers

You can use take operator to do it.take takes the first N events and completes the stream.

this.source = Observable.interval(1000)
  .do(()=>this.print("*******EMITTING from Source*******"))
  .take(3);
this.source.subscribe(count=>this.print('Ouput is '+count);

Your example's stream doesn't complete because switchMap's outer stream doesn't complete when inner streams complete. switchMap() is equal to map().switch(). In your example, the map part emits observables like:

  1. next(0), complete()
  2. next(1), complete()
  3. next(2), complete()
  4. complete()
  5. complete()
  6. complete()
  7. complete()
  8. ...(continues infinitely)...

And the switch part switches those observables and keeps waiting for upcoming observables.

EDIT

Your example also could be written as:

source = Observable.interval(1000)
        .do(()=>this.print("*******EMITTING from Source*******"))
        .takeWhile(count => count < 3);

EDIT 2

Regarding your comment, if you want to terminate the stream if the inner stream emits true:

source = Observable.interval(1000)
        .do(()=>this.print("*******EMITTING from Source*******"))
        .switchMap(count => createSomeObservable(count))
        .takeWhile(x => x !== true);
like image 197
Shuhei Kagawa Avatar answered Jan 26 '26 23:01

Shuhei Kagawa



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!