Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Synchronous SubmissionPublisher

Is it possible to make Subscriber run on the same thread as the Publisher (synchronously)? I can use CompletableFuture, but it delivers only a single result. But what if I need many results delivered to the subscriber. Please look at this small tests for a better explanation.

  @Test
  public void testCompletableFutureThreads() throws InterruptedException {
    CompletableFuture<String> f = new CompletableFuture<String>();
    f.thenAccept(new Consumer<String>() {
      @Override
      public void accept(String s) {
        System.out.println("accept " + s + " in " + Thread.currentThread().getName());
      }
    });
    Thread.sleep(200);
    System.out.println("send complete from " + Thread.currentThread().getName());
    f.complete("test");
    Thread.sleep(1000);
  }



  @Test
  public void testSubmissionPublisherThreads() throws InterruptedException {

    SubmissionPublisher<String> publisher = new SubmissionPublisher<String>();

    publisher.subscribe(new Flow.Subscriber<String>() {
      @Override
      public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
      }

      @Override
      public void onNext(String item) {
        System.out.println("onNext in " + Thread.currentThread().getName() + " received " + item);
      }

      @Override
      public void onError(Throwable throwable) {
        System.err.println("onError in " + Thread.currentThread().getName());
        throwable.printStackTrace(System.err);
      }

      @Override
      public void onComplete() {
        System.err.println("onComplete in " + Thread.currentThread().getName());
      }
    });

    int i = 10;
    while (i-- > 0) {
      Thread.sleep(100);
      System.out.println("publisher from " + Thread.currentThread().getName());
      publisher.submit("" + System.currentTimeMillis());

    }
  }
like image 217
Alex Avatar asked Nov 22 '25 19:11

Alex


1 Answers

You can use the SubmissionPublisher(Executor, int) constructor and provide an Executor that runs everything on the current thread:

final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(new Executor() {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}, Flow.defaultBufferSize());

or, if you prefer lambdas:

final SubmissionPublisher<String> publisher = new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize());

The default constructor uses ForkJoinPool.commonPool() to send signals.

like image 129
Piotr P. Karwasz Avatar answered Nov 24 '25 09:11

Piotr P. Karwasz



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!