Well, this is my first question in StackOverflow, but after several days fighting with RxJava
I am unable to find other solution, I have tried many things by my own, digging along the documentation and other posts but I am not sure how exactly do what I need. I have tried with several combinations of flatMap
, zip
, merge
and others, but always reaching to a dead end, and the closest achieved solution is the code below. I would appreciate any help or guidance.
I need a method that, given an input list, executes parallel calls with the different inputs of the list, and do not continue the execution until all the parallel calls have finished. It is also necessary to keep the results of the different executions for later uses (EDIT: In the same thread that started the execution).
public void parallelExecution(List<Integer> calls) {
List<String> results = new ArrayList<>();
logger.debug("Starting parallel executions");
Observable.fromIterable(calls)
.flatMap(val -> Observable.just(val).observeOn(Schedulers.io())
.doOnNext(item -> results.add(simpleAsync(item).toString())))
.subscribe(call -> logger.debug("Execution: {}", Thread.currentThread().getName()));
logger.debug("Ending parallel executions");
for (String x : results) {
logger.debug("Results: {}", x);
}
}
private Integer simpleAsync(Integer number) {
Integer result = number * number;
logger.info("Pre log {}: {}", Thread.currentThread().getName(), result);
try {
Thread.sleep(number * 500);
} catch (Exception e) {
}
logger.info("Post log {}: {}", Thread.currentThread().getName(), result);
return result;
}
The problem is that this code does not "waits" for the execution of the "simpleAsync" method, it finishes the execution with no "Results" logs (there are no results yet), and after that, the "Post log" traces appear executed in different threads:
Starting parallel executions
Ending parallel executions
Pre log RxCachedThreadScheduler-1: 1
Pre log RxCachedThreadScheduler-2: 4
Pre log RxCachedThreadScheduler-3: 9
Pre log RxCachedThreadScheduler-4: 16
Pre log RxCachedThreadScheduler-5: 25
Post log RxCachedThreadScheduler-1: 1
Execution: RxCachedThreadScheduler-1
Post log RxCachedThreadScheduler-2: 4
Execution: RxCachedThreadScheduler-2
Post log RxCachedThreadScheduler-3: 9
Execution: RxCachedThreadScheduler-3
Post log RxCachedThreadScheduler-4: 16
Execution: RxCachedThreadScheduler-4
Post log RxCachedThreadScheduler-5: 25
Execution: RxCachedThreadScheduler-5
If I delete the "observeOn" sentence, the method waits for the completion of the calls, but they are done sequentially (in the same thread):
Starting parallel executions
Pre log Default Executor-thread-9: 1
Post log Default Executor-thread-9: 1
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 4
Post log Default Executor-thread-9: 4
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 9
Post log Default Executor-thread-9: 9
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 16
Post log Default Executor-thread-9: 16
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 25
Post log Default Executor-thread-9: 25
Execution: Default Executor-thread-9
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
Results: 25
Have you tried using zip?
public void parallelExecution(List<Integer> calls) {
logger.debug("Starting parallel executions");
// Create an iterable observables
List<Observable<Integer>> observables = calls.stream()
.map(i -> {
return Observable.fromCallable(() -> simpleAsync(i))
.subscribeOn(Schedulers.newThread());
})
.collect(Collectors.toList());
Observable.zip(observables, objects -> { // Zip observables
return Arrays.stream(objects)
.map(Object::toString)
.collect(Collectors.toList());
})
.doOnNext(results -> logger.debug("Ending parallel executions"))
.subscribe(results -> { // Subscribe to the result.
// Put your code that needs to "wait"
for (String x : results) {
logger.debug("Results: {}", x);
}
});
}
The result will look something like this:
Starting parallel executions
Pre log RxNewThreadScheduler-3: 9
Pre log RxNewThreadScheduler-1: 1
Pre log RxNewThreadScheduler-2: 4
Pre log RxNewThreadScheduler-4: 16
Pre log RxNewThreadScheduler-5: 25
Post log RxNewThreadScheduler-1: 1
Post log RxNewThreadScheduler-2: 4
Post log RxNewThreadScheduler-3: 9
Post log RxNewThreadScheduler-4: 16
Post log RxNewThreadScheduler-5: 25
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
Results: 25
EDIT:
You can change the thread you want to listen to the result using observeOn
. If you want to subscribe from the calling thread, for example, you can change the code to something like this (see these SO answers):
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
logger.debug("Starting parallel executions");
// Create an iterable observables
List<Observable<Integer>> observables = calls.stream()
.map(i -> {
return Observable.fromCallable(() -> simpleAsync(i))
.subscribeOn(Schedulers.newThread());
})
.collect(Collectors.toList());
Observable.zip(observables, objects -> { // Zip observables
return Arrays.stream(objects)
.map(Object::toString)
.collect(Collectors.toList());
})
.doOnNext(results -> logger.debug("Ending parallel executions"))
.observeOn(Schedulers.from(tasks::add)) // Add a scheduler with executor from the current thread
.subscribe(results -> { // Subscribe to the result.
// Put your code that needs to "wait"
for (String x : results) {
logger.debug("Results: {}", x);
}
});
try {
tasks.take().run();
} catch (InterruptedException e) {
e.printStackTrace();
}
I'd suggest that you're not thinking reactively enough:
public Single<List<String>> parallelExecution(List<Integer> calls) {
return Observable
.fromIterable(calls)
.flatMap(val -> Observable.fromCallable(() -> simpleAsync(val).toString())
.subscribeOn(Schedulers.io())
.toList();
}
.toList()
will collect all the results and provide a single items when the flatMap
completessubscribeOn
, not observeOn
. simpleAsync
returned a reactive object.parallelExecution
non-reactive, use something like blockingGet
.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With