Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly forkJoin a list of observables or their subscriptions?

I have a very large list of ordered observables that needs to run in parallel. When each observables returns they append their result to a behaviour subject, which is how the results are passed on. However, I need a specific function to be called when they have all completed.

The observables each download an image (and related metadata), from an API. The requests need to be executed as fast as possible and I need to work with each result as it emits and emit an empty value when all the observables are finished. This means that the observables should be executed in parallel.

The original implementation, without a callback when they complete.

const requests: Observable[] = getRequests();

requests.forEach(obs => obs.subscribe(res => {
    const currentImages = this.$images.value;
    currentImages.push(res);
    this.$images.next(currentImages);
}));

In order to implement the callback when all requests have completed, I have tried the following.

const requests: Observable[] = getRequests();
const finishedTracker = new Subject<void>();

requests.forEach(obs => obs.subscribe(res => {
    const currentImages = this.$images.value;
    currentImages.push(res);
    this.$images.next(currentImages);
}));

forkJoin(requests).subscribe(() => {
    finishedTracker.next();
    finishedTracker.complete();
    console.log('requests done');
});

This works, but it seems strange to me, that I need to split up the forkJoin and subscriptions to the requests. Is there a better way to implement this functionality? I looked at mergeMap as well, but was not able to make it work.

Edit Based on comments, I have realised subscribing twice means making the requests twice. Therefore I have attempted another implementation.

from(requests).pipe(
    mergeMap(o => {
        o.subscribe(res => {
            const currentImages = this.$images.value;
            currentImages.push(res);
            this.$images.next(currentImages);
        }
        return o;
    }, 10)
).subscribe(() => {
    finishedTracker.next();
    console.log('requests done');
})

I did not use the result from forkJoin because, as for as I understand, gives me the result of all the requests. Therefore it needs to wait for them to finish. Since each requests is relatively fast, but there are often hundreds of them I need their individual results passed to the behaviour subject as soon as each request finishes.

Edit 2 The solution I went with.

from(requests).pipe(
    mergeMap(request => request, 10),
    scan<ImageResponse, ImageResponse[]>((all, current, index) => {
        all = all.concat(current);
        this.$images.next(all);
        return all;
    }, [])
).subscribe({
    complete: () => {
    finishedTracker.next();
    console.log('requests done');
}});
like image 748
tgm Avatar asked Dec 13 '25 13:12

tgm


1 Answers

It's not necessary to subscribe inside of your mergeMap. In fact, as others have pointed out, it's causing a double subscription since mergeMap internally subscribes to the observable returned by the function you pass to it.

To handle the responses as they occur you can simply use a pipe and add your handling logic inside. Since you are essentially doing a side effect (something that doesn't modify the output of the current stream), using the tap operator is appropriate:

from(requests).pipe(
    mergeMap(o => o.pipe(
        tap(res => {
            const currentImages = this.$images.value;
            currentImages.push(res);
            this.$images.next(currentImages);
        }),
    }, 10)
).subscribe(() => {
    finishedTracker.next();
    console.log('requests done');
})

While this will work, it looks like you are over complicating the observable flow. I'm not exactly sure of your use case, but I'd guess the Subjects aren't really needed at all. If your goal is to emit a cumulative array of results as they are processed, you can use scan for this without involving any Subject or BehaviorSubject. To do some logic when all requests have completed, you can pass a partial Observer, that specifies only the complete callback (instead of the next callback which is implicitly used when you pass a function as an argument to subscribe()):

from(requests).pipe(
    mergeMap(request => request, 10),
    scan((all, current) => all.concat(current), [])
).subscribe({
    complete: () => console.log('requests done')
});

EDIT: As pointed out by @AdrianBrand, it's more concise to use merge instead of from/mergeMap:

merge(...requests, 10).pipe(
    scan((all, current) => all.concat(current), [])
).subscribe({
    complete: () => console.log('requests done')
})
like image 123
BizzyBob Avatar answered Dec 16 '25 22:12

BizzyBob



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!