Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: Even if used `shareReplay()` on source observable, `throwError()` gets executed separately for each observable

I'm using RxJS shareReplay() operator on an observable (courses$) to share observable stream among other two observables (beginnerCourses$ and advancedCourses$). It's working fine and single API call response is shared between both observables on success.

But, when it comes to error, these observable don't share error and error is seen to be thrown twice in the browser console. Doesn't shareReplay() operator shares error also? Is it an intended behavior?

const http$ = createHttpObservable('/api/courses');

const courses$ = http$
  .pipe(
    map(res => res['payload'] ),
    shareReplay(),
    catchError(err => {
      return throwError(err);
    })
  );

this.beginnerCourses$ = courses$
  .pipe(
    map(courses => courses
      .filter(course => course.category === 'BEGINNER')));

this.advancedCourses$ = courses$
  .pipe(
    map(courses => courses
      .filter(course => course.category === 'ADVANCED')));

}
like image 980
Gourav Pokharkar Avatar asked Oct 20 '25 08:10

Gourav Pokharkar


2 Answers

I think it's the expected behavior, what's a bit unexpected is that you're getting 2 different errors.

shareReplay places a ReplaySubject between the data consumer(s) and the data producer. When an error notification arrives, the ReplaySubject in use will send the same error notification to all the registered subscribers:

error(err: any) {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  }
  this.hasError = true;
  this.thrownError = err;
  this.isStopped = true;
  const { observers } = this;
  const len = observers.length;
  const copy = observers.slice();
  for (let i = 0; i < len; i++) {
    copy[i].error(err);
  }
  this.observers.length = 0;
}

Source

but when using shareReplay, when an error occurs the ReplaySubject in use will be replaced by another one when a new subscriber is about to subscribe. Saying that it's being replaced, this also entails re-subscribing to the source.

So I think all the subscribers should receive the same error notification as long as they are already part of the ReplaySubject's list of subscribers. Otherwise, when a new subscriber comes in, the source will be re-subscribed.


What you could do is to prevent the ReplaySubject from receiving error notifications and allow its subscribers to receive it as they are, is to use materialize and dematerialize operators:

const courses$ = http$
  .pipe(
    materialize(), // Everything as a `next` notification
    map(res => res['payload'] ),
    shareReplay(),
    dematerialize() // Back to the original event
  );

With this approach, if a registered subscriber receives the error notification, it will be unsubscribed, meaning that it will also be removed from the ReplaySubject's list of subscribers. But the ReplaySubject will still be there and will not be replaced on subsequent subscribers.


Also, I think this is pretty redundant:

catchError(err => throwError(err));
like image 95
Andrei Gătej Avatar answered Oct 23 '25 03:10

Andrei Gătej


It is expected behavior when an observable throws an error, shareReplay() will try resubscribe/re-execute the source

you can try this to validate

const a=defer(()=>{
  console.log('run')
  return throwError(new Error('Error'))
}).pipe(shareReplay())

a.subscribe(console.log,console.error,()=>console.log('complete'))
a.subscribe(console.log,console.error,()=>console.log('complete'))

https://stackblitz.com/edit/typescript-jutfxe

If you want observable to share the error without execute again use publishReplay(1),refCount() instead

like image 41
Fan Cheung Avatar answered Oct 23 '25 03:10

Fan Cheung



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!