Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Operator "publishOn()" and thread affinity in Reactor Project

I don't clearly understand from documentation how publishOn() (or observeOn() in case of RxJava) operator works in terms of thread affinity. I thought that this operator guarantee that any subscriber will be processed in the same thread, but following example has broken my understanding:

Flux<String> started = ep.publishOn(scheduler);
Flux<String> afterStateUpdate = started.doOnNext(e -> {
    System.out.println("STATE UPDATE : " + Thread.currentThread().getId());
    state.add(e);
}).share();
afterStateUpdate.subscribe();
ep.onNext("1");
ep.onNext("2");
ep.onNext("3");

afterStateUpdate.subscribe(e -> {
    System.out.println("MAIN SUBSCRIBER : " + Thread.currentThread().getId());
});
ep.onNext("4");
ep.onNext("5");
ep.onNext("6");

As result I see following output:

STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13

It means that "state updater" had been working in thread 12, but when second subscriber subscribed "state updater" started working in thread 13.

So, the question is how in this case I can guarantee thread affinity for my subscribers?

like image 905
dmitrievanthony Avatar asked Jan 16 '26 19:01

dmitrievanthony


1 Answers

I found only one way to be 100% sure that one subscriber uses the same thread. It's to create a pool of single schedulers like this:

Scheduler[] schedulers = new Schedulers[10];
for (int i = 0; i < 10; i++) {
  schedulers[i] = Schedulers.newSingle();
}

public Flux<T> wrapIntoSingleThread(Scheduler scheduler) {
  return this.flux.publishOn(scheduler);
}

And then for every subscriber take a one scheduler from this pool: wrapIntoSingleThread(schedulers[i++ % schedulers.length]).subscribe(...)

If anyone know other way to provide thread affinity - please correct me.

like image 68
dmitrievanthony Avatar answered Jan 19 '26 20:01

dmitrievanthony