We have a function that gets a stream from the backend as observable. However we would like to be able to push to that observable as well to see the changes before those are done in the back-end. To do so I tried giving back a subject instead but the connection is still on going after the unsubscribe.
In other words, in the code below, we would like the console.log(i) not to start before we subscribe to the subject, and finish when we unsubscribe from it :
import { ReplaySubject, Observable, interval } from 'rxjs';
import { tap } from 'rxjs/operators'
function test() {
const obs = interval(1000).pipe(tap(i => console.log(i)));
const subj = new ReplaySubject(1);
obs.subscribe(subj);
return subj;
}
const subject = test();
subject.next('TEST');
const subscription = subject.pipe(
tap(i => console.log('from outside ' + i))
).subscribe()
setTimeout(_ => subscription.unsubscribe(), 5000);
example
You cannot subscribe in test. I guess you want to create an Observable and a Subject and merge those - you would have to return both separately.
return [subject, merge(subject, obs)]
and then
const [subject, obs] = test();
subject.next()
But I would do it by providing subject as a parameter.
import { ReplaySubject, Observable, interval, merge } from 'rxjs';
import { tap } from 'rxjs/operators'
function test(subject) {
return merge(
interval(1000).pipe(tap(i => console.log(i))),
subject
);
}
const subject = new ReplaySubject(1);
const obs = test(subject);
subject.next('TEST');
const subscription = obs.pipe(
tap(i => console.log('from outside ' + i))
).subscribe()
setTimeout(_ => subscription.unsubscribe(), 5000);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With