I'm using RxJS shareReplay() operator on an observable (courses$) to share observable stream among other two observables (beginnerCourses$ and advancedCourses$). It's working fine and single API call response is shared between both observables on success.
But, when it comes to error, these observable don't share error and error is seen to be thrown twice in the browser console. Doesn't shareReplay() operator shares error also? Is it an intended behavior?
const http$ = createHttpObservable('/api/courses');
const courses$ = http$
.pipe(
map(res => res['payload'] ),
shareReplay(),
catchError(err => {
return throwError(err);
})
);
this.beginnerCourses$ = courses$
.pipe(
map(courses => courses
.filter(course => course.category === 'BEGINNER')));
this.advancedCourses$ = courses$
.pipe(
map(courses => courses
.filter(course => course.category === 'ADVANCED')));
}
I think it's the expected behavior, what's a bit unexpected is that you're getting 2 different errors.
shareReplay places a ReplaySubject between the data consumer(s) and the data producer. When an error notification arrives, the ReplaySubject in use will send the same error notification to all the registered subscribers:
error(err: any) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
this.hasError = true;
this.thrownError = err;
this.isStopped = true;
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].error(err);
}
this.observers.length = 0;
}
Source
but when using shareReplay, when an error occurs the ReplaySubject in use will be replaced by another one when a new subscriber is about to subscribe. Saying that it's being replaced, this also entails re-subscribing to the source.
So I think all the subscribers should receive the same error notification as long as they are already part of the ReplaySubject's list of subscribers. Otherwise, when a new subscriber comes in, the source will be re-subscribed.
What you could do is to prevent the ReplaySubject from receiving error notifications and allow its subscribers to receive it as they are, is to use materialize and dematerialize operators:
const courses$ = http$
.pipe(
materialize(), // Everything as a `next` notification
map(res => res['payload'] ),
shareReplay(),
dematerialize() // Back to the original event
);
With this approach, if a registered subscriber receives the error notification, it will be unsubscribed, meaning that it will also be removed from the ReplaySubject's list of subscribers. But the ReplaySubject will still be there and will not be replaced on subsequent subscribers.
Also, I think this is pretty redundant:
catchError(err => throwError(err));
It is expected behavior when an observable throws an error, shareReplay() will try resubscribe/re-execute the source
you can try this to validate
const a=defer(()=>{
console.log('run')
return throwError(new Error('Error'))
}).pipe(shareReplay())
a.subscribe(console.log,console.error,()=>console.log('complete'))
a.subscribe(console.log,console.error,()=>console.log('complete'))
https://stackblitz.com/edit/typescript-jutfxe
If you want observable to share the error without execute again
use publishReplay(1),refCount() instead
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With