I had been using RxJava so far but I am starting to play with reactor-core from projectreactor.io since it adheres to the reactive streams spec.
In the following test I create a hot Flux (ConnectableFlux) that generates random numbers. I connect() it immediately and it prefetches 256 values (I can see 258 actually them in the log). I wait 5 seconds to simulate that the subscriber won't subscribe until some time later.
After the main thread wakes up, the RnApp subscribes to the ConnectableFlux, randomNumberGenerator.subscribe(new RnApp());. Then RnApp.onSubscribe() is called and 10 elements are requested. After that, a java.lang.IllegalStateException: Queue full?! exception is raised (RnApp.onError() is invoked), why?
Subscriber:
public class RnApp implements Subscriber<Float>{
private Subscription subscription;
private List<Float> randomNumbers = new ArrayList<Float>();
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable err) {
err.printStackTrace();
}
@Override
public void onNext(Float f) {
if(this.randomNumbers.size()>=10){
this.subscription.cancel();
}else{
this.randomNumbers.add(f);
}
}
@Override
public void onSubscribe(Subscription subs) {
this.subscription = subs;
this.subscription.request(10);
}
}
Publisher test:
@Test
public void randomNumberReading() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ConnectableFlux<Float> randomNumberGenerator = ConnectableFlux.<Float>create( (c) -> {
SecureRandom sr = new SecureRandom();
int i = 1;
while(true){
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-----------------------------------------------------"+(i++));
c.onNext(sr.nextFloat());
}
}).log().subscribeOn(Computations.concurrent()).publish();
randomNumberGenerator.connect();
Thread.sleep(5000);
randomNumberGenerator.subscribe(new RnApp());
latch.await();
}
Log:
11:12:05.125 [main] DEBUG r.core.util.Logger$LoggerFactory - Using Slf4j logging framework
11:12:05.363 [concurrent-1] INFO reactor.core.publisher.FluxLog - onSubscribe(io.pivotal.literx.Part10SubscribeOnPublishOn$$Lambda$1/1586600255@29d4caeb)
11:12:05.371 [concurrent-1] INFO reactor.core.publisher.FluxLog - request(256)
-----------------------------------------------------1
11:12:06.000 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.39189225)
-----------------------------------------------------2
...
-----------------------------------------------------257
11:12:08.683 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.34729618)
-----------------------------------------------------258
11:12:08.697 [concurrent-1] INFO reactor.core.publisher.FluxLog - onNext(0.7729547)
java.lang.IllegalStateException: Queue full?!
at reactor.core.publisher.FluxPublish$State.onNext(FluxPublish.java:246)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onNext(FluxSubscribeOn.java:134)
at reactor.core.publisher.FluxLog$LoggerBarrier.doNext(FluxLog.java:130)
at reactor.core.subscriber.SubscriberBarrier.onNext(SubscriberBarrier.java:85)
at reactor.core.subscriber.SubscriberWithContext.onNext(SubscriberWithContext.java:92)
at io.pivotal.literx.Part10SubscribeOnPublishOn.lambda$1(Part10SubscribeOnPublishOn.java:132)
at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:145)
at reactor.core.publisher.FluxGenerate$ForEachBiConsumer.accept(FluxGenerate.java:114)
at reactor.core.publisher.FluxGenerate$SubscriberProxy.request(FluxGenerate.java:245)
at reactor.core.subscriber.SubscriberBarrier.doRequest(SubscriberBarrier.java:146)
at reactor.core.publisher.FluxLog$LoggerBarrier.doRequest(FluxLog.java:160)
at reactor.core.subscriber.SubscriberBarrier.request(SubscriberBarrier.java:135)
at reactor.core.util.DeferredSubscription.set(DeferredSubscription.java:71)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnPipeline.onSubscribe(FluxSubscribeOn.java:129)
at reactor.core.publisher.FluxLog$LoggerBarrier.doOnSubscribe(FluxLog.java:122)
at reactor.core.subscriber.SubscriberBarrier.onSubscribe(SubscriberBarrier.java:67)
at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:72)
at reactor.core.publisher.FluxLog.subscribe(FluxLog.java:67)
at reactor.core.publisher.FluxSubscribeOn$SourceSubscribeTask.run(FluxSubscribeOn.java:363)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:919)
at reactor.core.publisher.Computations$ProcessorWorker.onNext(Computations.java:883)
at reactor.core.publisher.WorkQueueProcessor$QueueSubscriberLoop.run(WorkQueueProcessor.java:842)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
As with RxJava, if you are using create(), you are on your own dealing with cancellation and backpressure. You could instead build the generator from standard operators:
ConnectableFlux<Double> secureRandomFlux = Flux.using(
() -> new SecureRandom(),
sr -> Flux.interval(10, TimeUnit.MILLISECONDS)
.map(v -> sr.nextDouble())
.onBackpressureDrop()
sr -> { }
).publish();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With