I was trying out some of the new features in Java 9. So I put together a test to have a publisher, emitting numbers at a given rate. I also implemented a Subscriber to listen to those publications and just print them to console.
Although I might not fully understand how to use this Api, because the onNext() method is not printing anything and getLastItem() only returns 0.
The only part that seems to work is the onSubscribe() which correctly initialises the lastItem variable.
@Test
public void testReactiveStreams(){
    //Create Publisher
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
    //Register Subscriber
    TestIntegerSubscriber subscriber = new TestIntegerSubscriber();
    publisher.subscribe(subscriber);
    assertTrue(publisher.hasSubscribers());
    //Publish items
    System.out.println("Publishing Items...");
    List.of(1,2,3,4,5).stream().forEach(i -> {
        publisher.submit(i);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // blah
        }
    });
    assertEquals(5, subscriber.getLastItem());
    publisher.close();
}
private class TestIntegerSubscriber implements Flow.Subscriber<Integer> {
    private int lastItem;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscribed");
        lastItem = 0;
    }
    @Override
    public void onNext(Integer item) {
        System.out.println("Received : "+item);
        lastItem += 1; // expect increment by 1
        assertTrue(lastItem == item);
    }
    @Override
    public void onError(Throwable throwable) {
        // nothing for the moment
    }
    @Override
    public void onComplete() {
        System.out.println("Completed");
    }
    public int getLastItem(){
        return lastItem;
    }
}
Can someone tell me what am I doing wrong in my test please? I would expect the test to print those numbers and return 5 as last item.
I have to say I am only using Observables and Subjects in Angular2, although they seems easier to understand.
The Flow API implements a feature called backpressure (explained here in the context of RxJava), which means the publisher should not be able to overwhelm the subscriber by publishing items faster than it can process them. The way JDK 9 implements that is by having the subscriber request items from the subscription.
For your test, the TestIntegerSubscriber should request items onSubscription, let's say 10, and keep track of how often onNext has been called, so it can request more once the 10 items were pushed.
I wrote a section about the Flow API that goes into a little more detail. It also describes the interaction between publisher, subscriber, and subscription:
Publisher and a Subscriber.Publisher::subscribe.Subscription and calls Subscriber::onSubscription with it so the subscriber can store the subscription.Subscription::request to request a number of items.Subscriber::onNext. It will never publish more than the requested number of items.Subscriber::onComplete or Subscriber::onError, respectively.Subscription::cancel.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With