Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Upgrading io.projectreactor version from 2.0.x to 3.0.4 - Using Spring framework

I have an issue while trying to make the upgrade.

Currently i'm using version 2.0.x, and in particular -

reactor.bus
reactor.rx.Stream
reactor.rx.Streams
reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer

I'm using maven, and i have a single dependency regarding 'projectreactor' -

<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-core</artifactId>

When upgrading to version 3.0.4.RELEASE, in order to keep using all the things i used before, i need to explicitly import -

<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>

And

<groupId>io.projectreactor</groupId>
<artifactId>reactor-stream</artifactId>

but i'm still missing

reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer

and i'm not sure what to do.

like image 564
Adam Berlin Avatar asked Nov 01 '25 05:11

Adam Berlin


2 Answers

reactor.rx.Stream -> reactor.core.publisher.Flux
reactor.rx.Streams -> reactor.core.publisher.Flux
reactor.rx.Promise -> reactor.core.publisher.Mono and reactor.core.publisher.MonoProcessor
reactor.core.processor.RingBufferProcessor -> reactor.core.publisher.TopicProcessor
reactor.fn.Consumer -> java.util.function.Consumer (Java 8)

There is no new spring module since spring 5 directly includes Reactor support with these new types.

As for reactor-bus : By design now all stream routes (Flux/Mono chains) are typed, so dynamic routing is not part of our features yet. Still there are alternative in a typed way, for instance :

ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
Flux<MyEvent> interest1_2 = rp.filter(ev -> filterInterest1(ev) || filterInterest2(ev));

interest1.subscribe(doSomethingForInterest1);
interest2.subscribe(doSomethingForInterest2);
interest1_2.subscribe(doSomethingForInterest1_2);

rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest2")); //subscriber 2 and 3 react
rp.onNext(new MyEvent("interest4")); // buffered until interest subscriber because ReplayProcessor

//shutdown/cleanup/close
rp.onComplete();

I have found this on github which seems to fit your needs

like image 166
Aviad Avatar answered Nov 03 '25 20:11

Aviad


reactor.fn.Consumer is replaced by Java 8 java.util.function.Consumer.

As for RingBufferProcessor you have to pick one of new processors all using ring buffer.

Dispatchers are now Schedulers that use Java's Executors under the hood.

like image 39
Yaroslav Stavnichiy Avatar answered Nov 03 '25 22:11

Yaroslav Stavnichiy



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!