Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Async cleanup on observable sequence termination

Tags:

rx-java

I need to execute an asynchronous method (e.g. a cleanup job) on an observable completion/failure. Also if the cleanup method fails the observable chain should also fail.

Is there a standard way of doing this?

Suppose I have a sequence Observable source and asyncCleanup() method that returns an observable cleanup result.

The side effect methods such as doOnCompleted/doOnTerminate/doOnUnsubscribe do not seem to be suitable:

source.doOnUnsubscribe(()->asyncCleanup().subscribe());

The observable chain will succeed even if asyncCleanup() fails. So asyncCleanup() should be a part of the same chain.

The best I came up with is the following:

source.onErrorResumeNext(err->asyncCleanup()
         .flatMap(cleanupResult->Observable.error(err)))
      .concatWith(asyncCleanup()
         .flatMap(cleanupResult->Observable.empty()));

In case of failure, onErrorResumeNext will call asyncCleanup() and will map back to to the original error. In case of success, we can concatenate with asyncCleanup() mapped to an empty sequence. Unfortunately, it is not going to work if there is a take() or similar limiting operator downstream, which may cause the concatenated observable not even subscribe.

UPDATE 2017-08-01: The question is specifically about sequence observables. For a single-item observable source the solution is quite simple:

singleItemSource
    .onErrorResumeNext(err->asyncCleanup()
        .flatMap(cleanupResult->Observable.error(err)))
    .flatMap(single->asyncCleanup()
        .map(cleanupResult->single));
like image 458
yurgis Avatar asked Sep 07 '25 07:09

yurgis


2 Answers

The idea you propose breaks the Observable contract, because:

Rx Design Guidelines, 6.8 - Unsubscription should not throw

The only thing that comes to my mind to solve your problem is to turn asyncCleanup() to the same type of the preceding sequence and in case of success return for example Observable.empty(), the you can use the concat operator.

public Observable<Long> asyncCleanup() {
...
    return Observable.empty();
}

//Concat asyncCleanup() to be executed after the sequence
Observable.rangeLong(1, 10)
            .concatWith(asyncCleanup())
            .subscribe(System.out::println, System.out::println);

Hope it helps...

like image 175
Rafael Guillen Avatar answered Sep 10 '25 11:09

Rafael Guillen


There is a flatMap overload that let's you map the onXXX events of the upstream into Observables:

Observable.range(1, 10)
    .flatMap(Observable::just, e -> asyncCleanup(), () -> asyncCleanup())
    .subscribe(...);
like image 37
akarnokd Avatar answered Sep 10 '25 12:09

akarnokd