Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: Batch requests and share response

Let's imagine i have a function fetchUser which takes as parameter userId and return an observable of user.

As i am calling this method often, i want to batch the ids to perform one request with multiple ids instead !

Here my troubles began...

I can't find a solution to do that without sharing an observable between the different calls of fetchUser.

import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userToFetch$ = new Subject<string>()

const fetchedUser$ = userToFetch$.pipe(
    bufferTime(1000),
    mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
    share(),
)

const fetchUser = (userId: string) => {
    const observable = fetchedUser$.pipe(
        map((users) => users.find((user) => user.id === userId)),
        filter((user) => !!user),
        take(1),
    )
    userToFetch$.next(userId)
    return observable
}

But that's ugly and it has multiple troubles:

  • If i unsubscribe from the observable returned by fetchUser before the timer of bufferTime is finished, it doesn't prevent the fetch of the user.
  • If i unsubscribe from all the observables returned by fetchUser before the fetch of the batch is finished, it doesn't cancel the request.
  • Error handling is more complex
  • etc

More generally: i don't know how to solve the problems requiring sharing resources using RxJS. It's difficult to find advanced example of RxJS.

like image 299
antoinestv Avatar asked Oct 21 '25 15:10

antoinestv


2 Answers

What you have is a good, but as with everything RxJS, but the devil is in the details.

Issues

  1. The switchMaping
        mergeMap((userIds) => functionThatSimulateAFetch(userIds)),

This is where you first go wrong. By using a merge map here, you are making it impossible to tell appart the "stream of requests" from the "stream returned by a single request":

  • You are making it near impossible to unsubscribe from an individual request (to cancel it)
  • You are making it impossible to handle errors
  • It falls appart if your inner observable emits more than once.

Rather, what you want is to emit individual BatchEvents, via a normal map (producing an observable of observable), and switchMap/mergeMap those after the filtering.

  1. Side effects when creating an observable & Emitting before subscribing
    userToFetch$.next(userId)
    return observable

Don’t do this. An observable by itself does not actually do anything. It’s a "blueprint" for a sequence of actions to happen when you subscribe to it. By doing this, you’ll only create a batch action on observable creating, but you’re screwed if you get multiple or delayed subscriptions.

Rather, you want to create an observable from defer that emits to userToFetch$ on every subscription.

Even then you’ll want to subscribe to your observable before emitting to userToFetch: If you aren’t subscribed, your observable is not listening to the subject, and the event will be lost. You can do this in a defer-like observable.

Solution

Short, and not very different from your code, but structure it like this.

const BUFFER_TIME = 1000;

type BatchEvent = { keys: Set<string>, values: Observable<Users> };

/** The incoming keys */
const keySubject = new Subject<string>();

const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
  this.keySubject.asObservable().pipe(
    bufferTime(BUFFER_TIME),
    map(keys => this.fetchBatch(keys)),
    share(),
  );

/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
  console.log("Creating observable for:", userId);
  // The money observable. See "defer":
  // triggers a new subject event on subscription
  const observable = new Observable<BatchEvent>(observer => {
    this.requests.subscribe(observer);
    // Emit *after* the subscription
    this.keySubject.next(userId);
  });
  return observable.pipe(
    first(v => v.keys.has(userId)),
    // There is only 1 item, so any *Map will do here
    switchMap(v => v.values),
    map(v => v[userId]),
  );
}

function fetchBatch(args: string[]): BatchEvent {
  const keys = new Set(args); // Do not batch duplicates
  const values = this.userService.get(Array.from(keys)).pipe(
    share(),
  );
  return { keys, values };
}

This does exactly what you were asking, including:

  • Errors are propagated to the recipients of the batch call, but nobody else
  • If everybody unsubscribes from a batch, the observable is canceled
  • If everybody unsubscribes from a batch before the request is even fired, it never fires
  • The observable behaves like HttpClient: subscribing to the observable fires a new (batched) request for data. Callers are free to pipe shareReplay or whatever though. So no surprises there.

Here is a working stackblitz Angular demo: https://stackblitz.com/edit/angular-rxjs-batch-request

In particular, notice the behavior when you "toggle" the display: You’ll notice that re-subscribing to existing observables will fire new batch requests, and that those requests will cancel (or outright not fire) if you re-toggle fast enough.

Use case

In our project, we use this for Angular Tables, where each row needs to individually fetch additional data to render. This allows us to:

  • chunk all the requests for a "single page", without needing any special knowledge of pagination
  • Potentially fetch multiple pages at once if the user paginates fast
  • re-use existing results even if page size changes

Limitations

I would not add chunking or rate limitting into this. Because the source observable is a dumb bufferTime you run into issues:

  • The "chunking" will happen before the deduping. So if you have 100 requests for a single userId, you’ll end up firing several requests with only 1 element
  • If you rate limit, you’ll not be able to inspect your queue. So you may end up with a very long queue containing multiple same requests.

This is a pessimistic point of view though. Fixing it would mean going full out with a stateful queue/batch mechanism, which is an order of magnitude more complex.

like image 122
Paul Féraud Avatar answered Oct 23 '25 05:10

Paul Féraud


I think @Biggy is right.

This is the way I understand the problem and what you want to achieve

  1. There are different places in your app where you want to fetch users
  2. You do not want to fire a fetch request all the time, rather you want to buffer them and send them at a certain interval of time, let's say 1 second
  3. You want to cancel a certain buffer and avoid that for that 1 second interval a request to fetch a batch of users is fired
  4. At the same time, if somebody, let's call it Code at Position X has asked for a User and just few milliseconds later somebody else, i.e. Code at Position Y cancels the entire batch of requests, then Code at Position X has to receive some sort of answer, let's say a null
  5. More, you may want to be able to ask to fetch a User and then change your mind, if within the interval of the buffer time, and and avoid this User to be fetched (I am far from sure this is really something you want, but it seems somehow to emerge from your question

If this is all true, then you probably have to have some sort of queuing mechanism, as Buggy suggested.

Then there may be many implementations of such mechanism.

like image 44
Picci Avatar answered Oct 23 '25 04:10

Picci



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!