Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Integration 5.0 + Project Reactor: controlling threads

Followup question for https://stackoverflow.com/a/47136941/1776585

I can't make my integration handler to run in parallel threads while using Flux + split() + FluxMessageChannel.

Consider the following snippet:

// ...
.handle(message -> Flux.range(0, 10)
    .doOnNext(i -> LOG.info("> " + i))
    .subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...

All logs are output in one thread:

[     parallel-1] d.a.Application    : > 0
[     parallel-1] d.a.Application    :  -> 0
[     parallel-1] d.a.Application    : > 1
[     parallel-1] d.a.Application    :  -> 1
[     parallel-1] d.a.Application    : > 2
[     parallel-1] d.a.Application    :  -> 2
[     parallel-1] d.a.Application    : > 3
[     parallel-1] d.a.Application    :  -> 3
[     parallel-1] d.a.Application    : > 4
[     parallel-1] d.a.Application    :  -> 4
[     parallel-1] d.a.Application    : > 5
[     parallel-1] d.a.Application    :  -> 5
[     parallel-1] d.a.Application    : > 6
[     parallel-1] d.a.Application    :  -> 6
[     parallel-1] d.a.Application    : > 7
[     parallel-1] d.a.Application    :  -> 7
[     parallel-1] d.a.Application    : > 8
[     parallel-1] d.a.Application    :  -> 8
[     parallel-1] d.a.Application    : > 9
[     parallel-1] d.a.Application    :  -> 9

How can I force the processing in multiple threads?

I've tried using .parallel().runOn() on the Flux, but that just makes fetching data parallel, but actual handling still runs on one thread.

I've also tried .publishOn(Schedulers.parallel()) on the Flux with no effect.

And also adding ExecutorChannel or a Poller with an executor to a handler didn't help.

like image 655
Mikhail Kadan Avatar asked Nov 23 '25 19:11

Mikhail Kadan


1 Answers

This does some trick:

.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))

And those messages consumed by the FluxMessageChannel is going to be paralled by that additional ExecutorChannel.

I think what you are asking is like a feature request to make the mentioned FluxMessageChannel configurable. And such a subscribeOn/publishOn etc. could be configured there.

Feel free to raise a JIRA on the matter!

like image 76
Artem Bilan Avatar answered Nov 26 '25 10:11

Artem Bilan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!