I've got 2 nested Observable Streams which do HTTP requests. Now I'd like to display a loading indicator, but can't get it working correctly.
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
        .startWith(1)
        .do(function(pageNumber) {
            pendingRequests++;
        })
        .concatMap(function(pageNumber) {
            return MyHTTPService.getPage(pageNumber);
        })
        .do(function(response) {
            pendingRequests--;
        });
Rx.createObservableFunction(_self, 'search')
        .flatMapLatest(function(e) {
            return pageStream;
        })
        .subscribe();
search();
nextPage(2);
nextPage(3);
search();
This will trigger pendingRequests++ 4 times, but pendingRequests-- only once, because flatMapLatest will cancel the inner observable before the first 3 HTTP responses arrive.
I couldn't find anything like an onCancel callback. I also tried onCompleted and onError, but those too won't get triggered by flatMapLatest.
Is there any other way to get this working?
Thank you!
Example: Single search() call.
Example: search() and nextPage() call. (nextPage() is called before search() response came back.)
Example: search(), search(). (search() calls override each other, though the response of the first one can be dismissed)
Example: search(), nextPage(), search(). (Again: Because of the second search(), the responses from the previous search() and nextPage() can be ignored)
Example: search(), nextPage(). But this time nextPage() is called after search() response came back.
I tried using pendingRequests counter, because I can have multiple relevant requests at the same time (for example: search(), nextPage(), nextPage()). Then of course I'd like to disable the loading indicator after all those relevant requests finished.
When calling search(), search(), the first search() is irrelevant. Same applies for search(), nextPage(), search(). In both cases there's only one active relevant request (the last search()).
With switchMap aka flatMapLatest you want to trim asap execution of the current inner-stream as new outer-items arrive. It is surely a good design decision as otherwise it would bring a lot of confusion and allow some spooky actions. If you really wanted do do something onCancel you can always create your own observable with custom unsubscribe callback. But still I would recommend not to couple unsubscribe with changing state of the external context. Ideally the unsubscribe would only clean up internally used resources.
Nevertheless your particular case can be solved without accessing onCancel or similar. The key observation is - if I understood your use case correctly - that on search all previous / pending actions may be ignored. So instead of worry about decrementing the counter we can simply start counting from 1.
Some remarks about the snippet:
BehaviorSubject for counting pending requests - as it is ready to be composed with other streams;nextPage when a search is still pending - but it seems to be just a matter of using concatMapTo vs merge;Rx operators.PLNKR
console.clear();
const searchSub = new Rx.Subject(); // trigger search 
const nextPageSub = new Rx.Subject(); // triger nextPage
const pendingSub = new Rx.BehaviorSubject(); // counts number of pending requests
const randDurationFactory = min => max => () => Math.random() * (max - min) + min;
const randDuration = randDurationFactory(250)(750);
const addToPending = n => () => pendingSub.next(pendingSub.value + n);
const inc = addToPending(1);
const dec = addToPending(-1);
const fakeSearch = (x) => Rx.Observable.of(x)
  .do(() => console.log(`SEARCH-START: ${x}`))
  .flatMap(() => 
    Rx.Observable.timer(randDuration())
    .do(() => console.log(`SEARCH-SUCCESS: ${x}`)))
const fakeNextPage = (x) => Rx.Observable.of(x)
  .do(() => console.log(`NEXT-PAGE-START: ${x}`))
  .flatMap(() =>
    Rx.Observable.timer(randDuration())
    .do(() => console.log(`NEXT-PAGE-SUCCESS: ${x}`)))
// subscribes
searchSub
  .do(() => console.warn('NEW_SEARCH'))
  .do(() => pendingSub.next(1)) // new search -- ingore current state
  .switchMap(
    (x) => fakeSearch(x)
    .do(dec) // search ended
    .concatMapTo(nextPageSub // if you wanted to block nextPage when search still pending
      // .merge(nextPageSub // if you wanted to allow nextPage when search still pending
      .do(inc) // nexpage started
      .flatMap(fakeNextPage) // optionally switchMap
      .do(dec) // nextpage ended
    )
  ).subscribe();
pendingSub
  .filter(x => x !== undefined) // behavior-value initially not defined
  .subscribe(n => console.log('PENDING-REQUESTS', n))
// TEST
const test = () => {
    searchSub.next('s1');
    nextPageSub.next('p1');
    nextPageSub.next('p2');
    setTimeout(() => searchSub.next('s2'), 200)
  }
// test();
// FUZZY-TEST
const COUNTER_MAX = 50;
const randInterval = randDurationFactory(10)(350);
let counter = 0;
const fuzzyTest = () => {
  if (counter % 10 === 0) {
    searchSub.next('s' + counter++)
  }
  nextPageSub.next('p' + counter++);
  if (counter < COUNTER_MAX) setTimeout(fuzzyTest, randInterval());
}
fuzzyTest()<script src="https://npmcdn.com/[email protected]/bundles/Rx.umd.js"></script>One way: use the finally operator (rxjs4 docs, rxjs5 source). Finally triggers whenever the observable is unsubscribed or completes for any reason.
I'd also move the counter logic to inside the concatMap function since you are really counting the getPage requests, not the number of values that have gone through. Its a subtle difference.
var pageStream = Rx.createObservableFunction(_self, 'nextPage')
        .startWith(1)
        .concatMap(function(pageNumber) {
            ++pendingRequests;
            // assumes getPage returns an Observable and not a Promise
            return MyHTTPService.getPage(pageNumber)
               .finally(function () { --pendingRequests; })
        });
I wrote a solution for your problem from scratch.
For sure it might be written in a more functional way but it works anyway. 
This solution is based on reqStack which contains all requests (keeping the call order) where a request is an object with id, done and type properties.
When the request is done then requestEnd method is called.
There are two conditions and at least one of them is enough to hide a loader.
search request then we can hide a loader.Otherwise, all other requests have to be already done.
function getInstance() {
 return {
    loaderVisible: false,
    reqStack: [],
    requestStart: function (req){
        console.log('%s%s req start', req.type, req.id)
        if(_.filter(this.reqStack, r => r.done == false).length > 0 && !this.loaderVisible){
            this.loaderVisible = true
            console.log('loader visible')
        }
    },
    requestEnd: function (req, body, delay){
        console.log('%s%s req end (took %sms), body: %s', req.type, req.id, delay, body)
        if(req === this.reqStack[this.reqStack.length-1] && req.type == 'search'){
            this.hideLoader(req)
            return true
        } else if(_.filter(this.reqStack, r => r.done == true).length == this.reqStack.length && this.loaderVisible){
            this.hideLoader(req)
            return true
        } 
        return false
    },
    hideLoader: function(req){
        this.loaderVisible = false
        console.log('loader hidden (after %s%s request)', req.type, req.id)
    },
    getPage: function (req, delay) {
        this.requestStart(req)
        return Rx.Observable
                .fromPromise(Promise.resolve("<body>" + Math.random() + "</body>"))
                .delay(delay)
    },
    search: function (id, delay){
        var req = {id: id, done: false, type: 'search'}
        this.reqStack.push(req)
        return this.getPage(req, delay).map(body => {  
                    _.find(this.reqStack, r => r.id == id && r.type == 'search').done = true
                    return this.requestEnd(req, body, delay)
                })
    },
    nextPage: function (id, delay){
        var req = {id: id, done: false, type: 'nextPage'}
        this.reqStack.push(req)
        return this.getPage(req, delay).map(body => {  
                    _.find(this.reqStack, r => r.id == id && r.type == 'nextPage').done = true
                    return this.requestEnd(req, body, delay)
                })
    },
}
}
Unit tests in Moca:
describe('animation loader test:', function() {
    var sut
    beforeEach(function() {
        sut = getInstance()
    })
    it('search', function (done) {
        sut.search('1', 10).subscribe(expectDidHideLoader)
        testDone(done)
    })
    it('search, nextPage', function (done) {
        sut.search('1', 50).subscribe(expectDidHideLoader)
        sut.nextPage('1', 20).subscribe(expectDidNOTHideLoader)
        testDone(done)
    })
    it('search, nextPage, nextPage', function(done) {
        sut.search('1', 50).subscribe(expectDidHideLoader)
        sut.nextPage('1', 40).subscribe(expectDidNOTHideLoader)
        sut.nextPage('2', 30).subscribe(expectDidNOTHideLoader)
        testDone(done)
    })
    it('search, nextPage, nextPage - reverse', function(done) {
        sut.search('1', 30).subscribe(expectDidNOTHideLoader)
        sut.nextPage('1', 40).subscribe(expectDidNOTHideLoader)
        sut.nextPage('2', 50).subscribe(expectDidHideLoader)
        testDone(done)
    })
    it('search, search', function (done) {
        sut.search('1', 60).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
        sut.search('2', 50).subscribe(expectDidHideLoader)
        testDone(done)
    })
    it('search, search - reverse', function (done) {
        sut.search('1', 40).subscribe(expectDidNOTHideLoader) 
        sut.search('2', 50).subscribe(expectDidHideLoader)
        testDone(done)
    })
    it('search, nextPage, search', function (done) {
        sut.search('1', 40).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
        sut.nextPage('1', 30).subscribe(expectDidNOTHideLoader) //even if it takes more time than search2
        sut.search('2', 10).subscribe(expectDidHideLoader)
        testDone(done)
    })
    it('search, nextPage (call after response from search)', function (done) {
        sut.search('1', 10).subscribe(result => {
            expectDidHideLoader(result)
            sut.nextPage('1', 10).subscribe(expectDidHideLoader)
        })
        testDone(done)   
    })
    function expectDidNOTHideLoader(result){
        expect(result).to.be.false
    }
    function expectDidHideLoader(result){
        expect(result).to.be.true
    }
    function testDone(done){
        setTimeout(function(){
            done()
        }, 200)
    }
})
Part of the output:

JSFiddle is here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With