I'm learning Java with Android by creating Hacker News reader app.
What I'm trying to do is:
/topstories, return Observable<List<int>>, emit when 
request finishes.storyId to Observable<Story>
List<Story>, when all requests finishes.And to the code:
  private Observable<Story> getStoryById(int articleId) {
    BehaviorSubject<Story> subject = BehaviorSubject.create();
    // calls subject.onNext on success
    JsonObjectRequest request = createStoryRequest(articleId, subject);
    requestQueue.add(request);
    return subject;
  }
  public Observable<ArrayList<Story>> getTopStories(int amount) {
    Observable<ArrayList<Integer>> topStoryIds = (storyIdCache == null)
      ? fetchTopIds()
      : Observable.just(storyIdCache);
    return topStoryIds
      .flatMap(id -> getStoryById(id))
      // some magic here
  }
Then we would use this like:
getTopStories(20)
  .subscribe(stories -> ...)
You can try something like that
Observable<List<Integers>> ids = getIdsObservable();
Single<List<Story>> listSingle =
            ids.flatMapIterable(ids -> ids)
            .flatMap(id -> getStoryById(id)).toList();
Then you can subscribe to that Single to get the List<Story>
Please have a look at my solution. I changed your interface to return a Single for getStoryById(), because it should only return one value. After that, I created a for each Story a Single request and subscribed to all of them with Single.zip. Zip will execute given lambda, when all Singles are finished. On drawback is, that all requestes will be fired at once. If you do not want this, I will update my post. Please take into considerations that @elmorabea solution will also subscribe to the first 128 elements (BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));), and to the next element when one finishes.
@Test
  void name() {
    Api api = mock(Api.class);
    when(api.getTopStories()).thenReturn(Flowable.just(Arrays.asList(new Story(1), new Story(2))));
    when(api.getStoryById(eq(1))).thenReturn(Single.just(new Story(888)));
    when(api.getStoryById(eq(2))).thenReturn(Single.just(new Story(888)));
    Flowable<List<Story>> listFlowable =
        api.getTopStories()
            .flatMapSingle(
                stories -> {
                  List<Single<Story>> collect =
                      stories
                          .stream()
                          .map(story -> api.getStoryById(story.id))
                          .collect(Collectors.toList());
                  // possibly not the best idea to subscribe to all singles at the same time
                  Single<List<Story>> zip =
                      Single.zip(
                          collect,
                          objects -> {
                            return Arrays.stream(objects)
                                .map(o -> (Story) o)
                                .collect(Collectors.toList());
                          });
                  return zip;
                });
    TestSubscriber<List<Story>> listTestSubscriber =
        listFlowable.test().assertComplete().assertValueCount(1).assertNoErrors();
    List<List<Story>> values = listTestSubscriber.values();
    List<Story> stories = values.get(0);
    assertThat(stories.size()).isEqualTo(2);
    assertThat(stories.get(0).id).isEqualTo(888);
    assertThat(stories.get(1).id).isEqualTo(888);
  }
  interface Api {
    Flowable<List<Story>> getTopStories();
    Single<Story> getStoryById(int id);
  }
  static class Story {
    private final int id;
    Story(int id) {
      this.id = id;
    }
  }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With