To allow multiple iterations on the resulting stream from a CompletableFuture<Stream<String>> I am considering one of the following approaches:
Convert the resulting future to CompletableFuture<List<String>> through: teams.thenApply(st -> st.collect(toList()))
Convert the resulting future to Flux<String> with cache: Flux.fromStream(teams::join).cache();
Flux<T> is the implementation of Publisher<T> in project reactor.
Use case:
I would like to get a sequence with the premier league teams names (e.g. Stream<String>) from a data source which provides a League object with a Standing[] (based on football-data RESTful API, e.g. http://api.football-data.org/v1/soccerseasons/445/leagueTable). Using AsyncHttpClient and Gson we have:
CompletableFuture<Stream<String>> teams = asyncHttpClient .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable") .execute() .toCompletableFuture() .thenApply(Response::getResponseBody) .thenApply(body -> gson.fromJson(body, League.class)); .thenApply(l -> stream(l.standings).map(s -> s.teamName)); To re-use the resulting stream I have two options:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList())) 2. Flux<String> res = Flux.fromStream(teams::join).cache() Flux<T> is less verbose and provides all that I need. Yet, is it correct to use it in this scenario?
Or should I use CompletableFuture<List<String>> instead? Or is there any other better alternative?
UPDATED with some thoughts (2018-03-16):
CompletableFuture<List<String>>:
List<String> will be collected in a continuation and when we need to proceed with the result of the future, maybe it is already completed.List<T>.Flux<String>:
.cache() and forward it to the next layer, which can take advantage of the reactive API, e.g. web flux reactive controller, e.g. @GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…} Flux<T> we have to wrap it in a cacheable Flux<T> (….cache()) which in turn will add overhead on the first traversal, because it has to store the resulting items in an internal cache. CompletableFuture<Stream<String>> teams = ...; Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream)); Flux.fromStream(teams::join) is a code smell because it's blocking a thread to fetch the result from the CompletableFuture which is running on another thread.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With