Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring WebClient and longpolling

I want to use Spring's reactive WebClient to poll a REST endpoint that uses long-polling.

The endpoint provides messages for a chat-channel. When I call it and there is no message, it blocks (i.e. does not return) until a message appears (or 30 seconds pass).

So, in a synchronous world, I would dedicate a thread to monitor this channel, call the endpoint via RestTemplate, wait for the result, write it to a shared queue and start the next request. A consumer can then react to new items appearing on the queue.

In the reactive world, this is a bit different. Ideally, the consumer would subscribe to a Flux of messages. The question is how to construct this Flux.

The logic should be:

Mono<String> message = WebClient.get(). […] .bodyToMono(String.class);
// When the mono completes, create a new one just as described above
// Combine all of the monos into a Flux
flux.subscribe(message -> System.out.println("New message" + message);

I think that I need some kind of switch… operator but I can find the correct one.

like image 823
David Avatar asked Jan 17 '26 08:01

David


1 Answers

As @123 pointed out:

You can just use repeat, i.e WebClient.get(). […] .bodyToMono(String.class).repeat(), will give you a flux of the replies, and will only start the next one when the previous is done.

Actually, what is required here is defer() and repeat(): defer() takes a supplier of Monos and repeat() will re-subscribe to the Mono upon completion of the previous subscription. That will cause the supplier to be called again and thus a new http request will be started.

Also, since this runs forever, it will lead to an unclean application shutdown: If a shutdown occurs, there is probably an http request in flight. To cleanly end the Flux, takeUntilOther() can be used, which takes another Publisher (like an EmitterProcessor). Then, in a @PreDestroy method, you can call shutdown.onNext(true), which will cause the http request to be cancelled.

My solution now looks like this:

   Mono.defer(() -> receiveMessage())
   .repeat()
   .takeUntilOther(shutdown)
   .subscribe(message -> System.out.println("New message" + message);
like image 129
David Avatar answered Jan 19 '26 20:01

David



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!