I don't clearly understand from documentation how publishOn() (or observeOn() in case of RxJava) operator works in terms of thread affinity. I thought that this operator guarantee that any subscriber will be processed in the same thread, but following example has broken my understanding:
Flux<String> started = ep.publishOn(scheduler);
Flux<String> afterStateUpdate = started.doOnNext(e -> {
System.out.println("STATE UPDATE : " + Thread.currentThread().getId());
state.add(e);
}).share();
afterStateUpdate.subscribe();
ep.onNext("1");
ep.onNext("2");
ep.onNext("3");
afterStateUpdate.subscribe(e -> {
System.out.println("MAIN SUBSCRIBER : " + Thread.currentThread().getId());
});
ep.onNext("4");
ep.onNext("5");
ep.onNext("6");
As result I see following output:
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
It means that "state updater" had been working in thread 12, but when second subscriber subscribed "state updater" started working in thread 13.
So, the question is how in this case I can guarantee thread affinity for my subscribers?
I found only one way to be 100% sure that one subscriber uses the same thread. It's to create a pool of single schedulers like this:
Scheduler[] schedulers = new Schedulers[10];
for (int i = 0; i < 10; i++) {
schedulers[i] = Schedulers.newSingle();
}
public Flux<T> wrapIntoSingleThread(Scheduler scheduler) {
return this.flux.publishOn(scheduler);
}
And then for every subscriber take a one scheduler from this pool: wrapIntoSingleThread(schedulers[i++ % schedulers.length]).subscribe(...)
If anyone know other way to provide thread affinity - please correct me.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With