Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

submit method won't invoke onNext FLOW STREAM API JAVA

I'm learning about the FLOW Stream API in Java and I'm currently creating an example based on the one that is on oracle community. The problem is that I don't get to see the expected output but just the SUBSCRIBING string that is printed inside onSubscribe method. I already checked and found submissionpublisher-on-submit-not-invoking-onnext-of-subscriber on StackOverflow, but didn't work because I was already calling request(Long N).

import java.util.concurrent.Flow;

public class Computer<T> implements Flow.Subscriber<T> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("SUBSCRIBING");
        this.subscription.request(1);
    }

    @Override
    public void onNext(T item) {
        System.out.println(String.format("Got %s", item.toString()));
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("DONE");
    }

}

--

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

public class Sensor {

    public static void main(String[] args) {
        SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>();
        Computer<String> subscriber = new Computer<>();
        submissionPublisher.subscribe(subscriber);

        List<String> items = List.of("1.25", "1.224", "1.55");
        items.forEach(submissionPublisher::submit);
        submissionPublisher.close();
    }

}

And I just get to see:

SUBSCRIBING

Why onNext method is not being called?

like image 638
Tomás Denis Reyes Sánchez Avatar asked Nov 22 '25 05:11

Tomás Denis Reyes Sánchez


1 Answers

You are not passing a ScheduledExecutorService to the Publisher which is basically an ExecutorService which can schedule tasks to run after a delay or to execute repeatedly with a fixed interval of time in between each execution.

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;

public class Sensor {

    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>(executor, 5);
        Computer<String> subscriber = new Computer<>();
        submissionPublisher.subscribe(subscriber);

        List<String> items = List.of("1.25", "1.224", "1.55");
        items.forEach(submissionPublisher::submit);
        submissionPublisher.close();
        executor.shutdown();
    }
}
like image 55
Tomás Denis Reyes Sánchez Avatar answered Nov 23 '25 20:11

Tomás Denis Reyes Sánchez



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!