Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

retry (or) retryWhen does not seem to work with hot flux

I am trying to implement reactor core for my work. I am stuck with the retries that we need to perform in case of error. Below is my sample code before adding any error

FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
   mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "A")))
       .flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
       .log()
       .subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
     System.out.println("Publishing " + pendingItems + " item");
     mainSink.next(String.valueOf(pendingItems));
     System.out.println("Published " + pendingItems + " item");
     pendingItems--;
}

When I do this. It works fine.

Coming to the error case, lets say the second operation (appending "A") fails for an item. I am trying to get the following behavior.

  1. The part where I try to add "A" has to be retried 3 times before giving up
  2. Also I would like to have the whole Flux retried 5 times before giving up

was wondering how I could achieve the same.

AtomicInteger count = new AtomicInteger(0);
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
   mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> {
                  System.out.println("Processing for adding A : " + o);
                  if(count.incrementAndGet() >= 25) {
                       throw new RuntimeException("More than 25th item.. Boom.. !!!");
                  } else {
                       return Mono.just(o + "A")));
                  }
            }).retry(5)
              .doOnError(throwable -> System.out.println("**** Inner Error"))
       ).flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
       .log()
       .subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
     System.out.println("Publishing " + pendingItems + " item");
     mainSink.next(String.valueOf(pendingItems));
     System.out.println("Published " + pendingItems + " item");
     pendingItems--;
} 

When I add the retry(5) inside the first flatMap as shown above, it works fine where it retries the appending of A 5 times for the 25th guy coming in - which is evident from the logs

I am unable to achieve the complete flux retry (point (2) in my above requirement). I tried adding a .retry(3) after the second flux thinking that it would retry the whole flux. But it does not seem to be retrying. Can someone help with this?

AtomicInteger count = new AtomicInteger(0);
FluxSink<String> mainSink;
// Create the fulx and get handle to Sink
Flux<String> mainFlux = Flux.create(sink -> {
   mainSink = sink;
}, FluxSink.OverflowStrategy.BUFFER);
// Convert to Hot Flux
ConnectableFlux<String> hotFlux = mainFlux.publish();
// Two operations, add A and B to the input
hotFlux.flatMap(o -> Mono.just(o).map(s -> {
                  System.out.println("Processing for adding A : " + o);
                  if(count.incrementAndGet() >= 25) {
                       throw new RuntimeException("More than 25th item.. Boom.. !!!");
                  } else {
                       return Mono.just(o + "A")));
                  }
            }).retry(5)
              .doOnError(throwable -> System.out.println("**** Inner Error"))
       ).flatMap(o -> Mono.just(o).map(s -> Mono.just(o + "B")))
       .retry(3)
       .log()
       .subscribe();
// Activate
hotFlux.connect();
// Publish messages to test
Thread.sleep(5000);
int pendingItems = 25;
while(pendingItems > 0) {
     System.out.println("Publishing " + pendingItems + " item");
     mainSink.next(String.valueOf(pendingItems));
     System.out.println("Published " + pendingItems + " item");
     pendingItems--;
} 
like image 456
Karthick Avatar asked Dec 07 '25 10:12

Karthick


1 Answers

All forms of retry work by re-subscribing to the "retried" source. it works wonders with cold Flux, but a hot Flux is less adapted to that.

Here with publish() conversion, there's no guarantee for late subscribers: as the retry is considered a late subscriber it sees nothing, because the publish has been disconnected by the original finishing in error.

What you would need is a way to keep the last item (the one that might cause the exception) and replay it for new subscribers (or rather, for retry attempts).

Another issue is that you use create to acquire a FluxSink that you store externally, which is not a good approach.

The good news is that both problems can be solved in one go by using a ReplayProcessor: you correctly get a dedicated sink to manually push data, and in case of error retry will be able to get the error-triggering value out of the history and make another attempt:

@Test
public void test() {
    ReplayProcessor<String> foo =
            ReplayProcessor.create(1);
    FluxSink<String> sink = foo.sink();

    foo.subscribe(System.out::println, System.out::println);

    AtomicInteger transientError = new AtomicInteger(5);
    foo.map(v -> "C".equals(v) && transientError.decrementAndGet() >= 0 ? v + (100 / 0) : v)
            .doOnError(e -> System.err.println("Error, should be retried: " + e))
            .retry(5)
            .subscribe(System.err::println, System.err::println);

    sink.next("A");
    sink.next("B");
    sink.next("C");
    sink.complete();
}

This prints:

A
B
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
Error, should be retried: java.lang.ArithmeticException: / by zero
A
C
B
C
like image 84
Simon Baslé Avatar answered Dec 12 '25 04:12

Simon Baslé



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!