I have an API that takes an Observable that triggers an event.
I want to return an Observable that emits a value every defaultDelay seconds if an Internet connection is detected, and delays numberOfFailedAttempts^2 times if there's no connection.
I've tried a bunch of various styles, the biggest problem I'm having is retryWhen's observable is only evaluated once:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
Is there any way to do what I'm attempting to do? I found a related question (can't find it searching right now), but the approach taken didn't seem to work with a dynamic value.
In your code there are two mistakes:
interval you'd better use something like just, or fromCallable as I did in sample below.repeatWhen's inner function you need to return new delayed observable source, so instead of observable.delay() you have to return Observable.timer().Working code:
public void testRepeat() throws InterruptedException {
logger.info("test start");
int DEFAULT_DELAY = 100; // ms
int ADDITIONAL_DELAY = 100; // ms
AtomicInteger generator = new AtomicInteger(0);
AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
.repeatWhen(counts -> {
AtomicInteger retryCounter = new AtomicInteger(0);
return counts.flatMap(c -> {
int retry = 0;
if (connectionAlive.get()) {
retryCounter.set(0); // reset counter
} else {
retry = retryCounter.incrementAndGet();
}
int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
});
})
.subscribe(v -> logger.info("got {}", v));
Thread.sleep(220);
logger.info("connection dropped");
connectionAlive.set(false);
Thread.sleep(2000);
logger.info("connection is back alive");
connectionAlive.set(true);
Thread.sleep(2000);
subscription.dispose();
logger.info("test complete");
}
See detailed article about repeatWhen here.
I've always found retryWhen to be somewhat low-level so for exponential backoff I use a a builder (like Abhijit) that is unit tested and available for RxJava 1.x at rxjava-extras. I'd suggest using a capped version so that the exponential increase of delay won't go beyond a maximum value you define.
This is how you use it:
observable.retryWhen(
RetryWhen.exponentialBackoff(
delay, maxDelay, TimeUNIT.SECONDS)
.build());
I disagree that retryWhen is buggy but if you find a bug report it to RxJava. Bugs are fixed fast!
You'll need rxjava-extras 0.8.0.6 or later which is on Maven Central:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.8.0.6</version>
</dependency>
Let me know if you need the RxJava 2.x version. The same functionality is available in rxjava2-extras from 0.1.4.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With