Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: dynamically add and remove Observable with merge

I'm trying to build a pluggable ProgressService.

The service should track how many "things" are currently loading. And have an isLoading() method that returns an Observable<boolean> to indicate if anything is loading at all.

My first solution was very naive using a new BehaviorSubject(0) and then each loading provider just called ProgressService.increase() and ProgressService.decrease(). This worked fine. But now I'd like to go more reactive, if possible.


Then I came across merge, which works fine when all loading providers are known at the beginning:

this.progress = merge(...progressProviders).pipe(
  scan((acc, curr) => acc + (curr ? 1 : -1), 0)
);

This will simply increase / decrease the progress value, when any loading provider emits true or false.

But I also need some kind of register / unregister functionality. This should basically add a new Observable to the merge chain (or remove it).


The new ProgressService should look like this:

class ProgressService {

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
  }

  isLoading(): Observable<boolean> {
    return this.progress.pipe(
      map(val => val !== 0)
    );
  }

}

Maybe the removeLoadingProvider method isn't needed at all (?), if we return a Subscription from addLoadingProvider, and use Subscription.unsubscribe() to unregister.

Hopefully someone can tell me how to merge and unmerge additional Observables on demand.

like image 404
Benjamin M Avatar asked Oct 28 '25 05:10

Benjamin M


1 Answers

As per your explaination, I could understand the following [please correct me If my understanding is not correct] -

You want to collect various observables which emit boolean value and you want to keep track in such a way if any one of them is having at least one "true" then your final observable should emit true otherwise final observable should return "false"

Although your BehaviorSubject approach is a reactive approach. I am proposing the following approach; Let me know if it makes sense in your scenario -

Approach 1 -

enum ListAction {
  Added,
  Removed,
  Empty,
  Undefined
}

export class ProgressService {

  constructor() { }

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  obsListChanged: BehaviorSubject<ListAction> = new BehaviorSubject<any>(ListAction.Undefined);
  obsList: Array<Observable<boolean>> = [];

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
    this.obsList.push(lp);

    this.obsListChanged.next(ListAction.Added);
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
    this.obsList = this.obsList.filter(i => i !== lp);
    this.obsListChanged.next(ListAction.Removed);
  }

  isLoading(): Observable<boolean> {
    // return this.progress.pipe(
    //   map(val => val !== 0)
    // );
    return this.obsListChanged.pipe(
      switchMap(() => {
        return combineLatest(this.obsList);
      }),
      map(v => {
        return v.some(loading => loading);
      })
    );
  }
}

I have defined a ListAction observable if you want to do specific works as per your ListAction then you can do the same in rxjs operators as per your logic.

Approach 2 [A bit improved version of Approach 1] -

export class ProgressService {

  constructor() { }

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  obsList$: BehaviorSubject<Array<Observable<boolean>>> = new BehaviorSubject<Array<Observable<boolean>>>([]);

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
    this.obsList$.next([...this.obsList$.getValue(), lp]);
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
    const removed = this.obsList$.getValue().filter(i => i !== lp);
    this.obsList$.next(removed);
  }

  isLoading(): Observable<boolean> {
    // return this.progress.pipe(
    //   map(val => val !== 0)
    // );
    return this.obsList$.pipe(
      switchMap(obs => {
        return combineLatest(obs);
      }),
      map(v => {
        return v.some(loading => loading);
      })
    );
  }
}
like image 125
user2216584 Avatar answered Oct 29 '25 19:10

user2216584



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!