Followup question for https://stackoverflow.com/a/47136941/1776585
I can't make my integration handler to run in parallel threads while using Flux + split() + FluxMessageChannel.
Consider the following snippet:
// ...
.handle(message -> Flux.range(0, 10)
.doOnNext(i -> LOG.info("> " + i))
.subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...
All logs are output in one thread:
[ parallel-1] d.a.Application : > 0
[ parallel-1] d.a.Application : -> 0
[ parallel-1] d.a.Application : > 1
[ parallel-1] d.a.Application : -> 1
[ parallel-1] d.a.Application : > 2
[ parallel-1] d.a.Application : -> 2
[ parallel-1] d.a.Application : > 3
[ parallel-1] d.a.Application : -> 3
[ parallel-1] d.a.Application : > 4
[ parallel-1] d.a.Application : -> 4
[ parallel-1] d.a.Application : > 5
[ parallel-1] d.a.Application : -> 5
[ parallel-1] d.a.Application : > 6
[ parallel-1] d.a.Application : -> 6
[ parallel-1] d.a.Application : > 7
[ parallel-1] d.a.Application : -> 7
[ parallel-1] d.a.Application : > 8
[ parallel-1] d.a.Application : -> 8
[ parallel-1] d.a.Application : > 9
[ parallel-1] d.a.Application : -> 9
How can I force the processing in multiple threads?
I've tried using .parallel().runOn() on the Flux, but that just makes fetching data parallel, but actual handling still runs on one thread.
I've also tried .publishOn(Schedulers.parallel()) on the Flux with no effect.
And also adding ExecutorChannel or a Poller with an executor to a handler didn't help.
This does some trick:
.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))
And those messages consumed by the FluxMessageChannel is going to be paralled by that additional ExecutorChannel.
I think what you are asking is like a feature request to make the mentioned FluxMessageChannel configurable. And such a subscribeOn/publishOn etc. could be configured there.
Feel free to raise a JIRA on the matter!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With