Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs, recursively call api until all items are fetched

Tags:

angular

rxjs

I need to call an api and fetch all items from an endpoint. However we can only fetch 100 items at a time.

The response looks like this:

{
    elements: Array<any>,
    totalCount: number,
}

and the endpoint would be /api/items?drop=${drop}&take=100 with drop being used to paginate.

I think that would involve the scan and takeWhile operators somewhere.. Here is what I got:

    const subject = new Subject();
    const rec = subject.pipe(
        scan((acc: number, curr: number) => (curr + 100), 0),
        switchMap(drop => this.http.get(`api/items?drop=${drop}&take=100`)),
        takeWhile((r: any) => r.elements.length === 100),
        tap(r => subject.next())
    );
like image 493
Ced Avatar asked Sep 17 '25 11:09

Ced


2 Answers

You don't need to know how many elements will be returned, just how many it will never be :)

Observable.range(0, 1000000).pipe(
  concatMap(page => this.http.get(`api/items?drop=${page * take}&take=${take}`))
  takeWhile(results => results.elements.length === take)
)

At first I thought using range as source would pile up the requests and defeat the purpose of paging, but concatMap gives automatic throttling.

From learnrxjs.io/operators/transformation/concatmap

Note the difference between concatMap and mergeMap. Because concatMap does not subscribe to the next observable until the previous completes, the value from the source delayed by 2000ms will be emitted first. Contrast this with mergeMap which subscribes immediately to inner observables, the observable with the lesser delay (1000ms) will emit, followed by the observable which takes 2000ms to complete.

Here's a test of concatMap, which shows getPage is not called until the previous results emit

const take = 100;
const getPage = (page) => {
  console.log('reading page', page);
  return Rx.Observable.of(page).delay(1000);
}

Rx.Observable.range(0,3)
  .concatMap(getPage)
  .subscribe(results => console.log('concatMap results', results));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>

Contrast with mergeMap, which makes all calls to getPage immediately

const take = 100;
const getPage = (page) => {
  console.log('reading page', page);
  return Rx.Observable.of(page).delay(1000);
}

Rx.Observable.range(0,3)
  .mergeMap(getPage)
  .subscribe(results => console.log('mergeMap results', results));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>
like image 158
Richard Matsen Avatar answered Sep 20 '25 04:09

Richard Matsen


Seems like I got it working by using an external variable but it's not really pretty...

const take = 100;
const subject = new BehaviorSubject(0);
// var used for pagination
let i = 0;
const rec = subject.pipe(
    map((acc: number) => (acc * 100)),
    switchMap(drop => this.http.get(`api/items?drop=${drop}&take=${take}`)),
    map((r: any) => r.elements),
    tap(items=> {
        if (items.length === take)
            subject.next(++i);
    })
);
return rec;
like image 30
Ced Avatar answered Sep 20 '25 05:09

Ced