I need to execute an asynchronous method (e.g. a cleanup job) on an observable completion/failure. Also if the cleanup method fails the observable chain should also fail.
Is there a standard way of doing this?
Suppose I have a sequence Observable source and asyncCleanup() method that returns an observable cleanup result.
The side effect methods such as doOnCompleted/doOnTerminate/doOnUnsubscribe do not seem to be suitable:
source.doOnUnsubscribe(()->asyncCleanup().subscribe());
The observable chain will succeed even if asyncCleanup() fails. So asyncCleanup() should be a part of the same chain.
The best I came up with is the following:
source.onErrorResumeNext(err->asyncCleanup()
.flatMap(cleanupResult->Observable.error(err)))
.concatWith(asyncCleanup()
.flatMap(cleanupResult->Observable.empty()));
In case of failure, onErrorResumeNext will call asyncCleanup() and will map back to to the original error. In case of success, we can concatenate with asyncCleanup() mapped to an empty sequence. Unfortunately, it is not going to work if there is a take() or similar limiting operator downstream, which may cause the concatenated observable not even subscribe.
UPDATE 2017-08-01: The question is specifically about sequence observables. For a single-item observable source the solution is quite simple:
singleItemSource
.onErrorResumeNext(err->asyncCleanup()
.flatMap(cleanupResult->Observable.error(err)))
.flatMap(single->asyncCleanup()
.map(cleanupResult->single));
The idea you propose breaks the Observable
contract, because:
Rx Design Guidelines, 6.8 - Unsubscription should not throw
The only thing that comes to my mind to solve your problem is to turn asyncCleanup()
to the same type of the preceding sequence and in case of success return for example Observable.empty(), the you can use the concat
operator.
public Observable<Long> asyncCleanup() {
...
return Observable.empty();
}
//Concat asyncCleanup() to be executed after the sequence
Observable.rangeLong(1, 10)
.concatWith(asyncCleanup())
.subscribe(System.out::println, System.out::println);
Hope it helps...
There is a flatMap
overload that let's you map the onXXX
events of the upstream into Observable
s:
Observable.range(1, 10)
.flatMap(Observable::just, e -> asyncCleanup(), () -> asyncCleanup())
.subscribe(...);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With