I have method that should return result or error message after executing confirm method and then do some background job. I wrote something similar to above but I'm not sure about method that invoked in then. Am i wrong? If yes, How can i run method in background using webflux?
public Mono<Void> someMethod(...){
return someReactiveApiClient.confirm().onErrorMap(...).then(doSomeBackgroundJob);
}
As @zlaval mentioned, publishOn() can be used to switch execution contexts. The publishOn operator influences the threading context where the rest of the operators in the chain below it run, up to a new occurrence of publishOn. So the placement of publishOn is significant.
You can write your own scheduler or use one of the existing schedulers from this class . You can read more about it here.
Thus, if you have some code like this (I am using elastic scheduler):
Mono.just("Hello").log()
.onErrorMap(err -> new RuntimeException("my exception"))
.publishOn(Schedulers.elastic())
.then(Mono.just("World").log())
.subscribe();
you will see only this printed in console:
18:30:11.337 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
18:30:11.340 [main] INFO reactor.Mono.Just.1 - | onNext(Hello)
18:30:11.344 [main] INFO reactor.Mono.Just.1 - | onComplete()
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | request(unbounded)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onNext(World)
18:30:11.344 [elastic-2] INFO reactor.Mono.Just.2 - | onComplete()
Process finished with exit code 0
Thus the second mono having World is being executed in another thread called elastic-2 and not in the main thread.
Note that, if your confirm method throws an error, then the background task wont occur. The execution will stop there.
eg. If you have something like this:
Mono.error(new ArithmeticException()).log()
.onErrorMap(err -> new RuntimeException("my exception"))
.publishOn(Schedulers.elastic())
.then(Mono.just("World").log())
.subscribe();
then running this, will give output,
18:33:50.544 [main] INFO reactor.Mono.Error.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
18:33:50.546 [main] INFO reactor.Mono.Error.1 - request(unbounded)
18:33:50.547 [main] ERROR reactor.Mono.Error.1 - onError(java.lang.ArithmeticException)
18:33:50.549 [main] ERROR reactor.Mono.Error.1 -
java.lang.ArithmeticException: null
at com.example.schooltimetable.Application.main(Application.java:29)
Process finished with exit code 0
You need similarly like this if your background job involves some I/O or network call.
public Mono<Void> someMethod(...){
return someReactiveApiClient.confirm()
.onErrorMap(err -> {
//Your error handling logic. Remember to return an error
})
.publishOn(Schedulers.elastic())
.then(doSomeBackgroundJob);
}
The following code will return with the result of the confirm(), and after the original publisher is succesfully completed, it will call the job. publishOn indicates that the job have to run on different thread. You may have to choose the proper Schedulers, best fits to your requierements
public Mono<YourRequieredType> someMethod(...){
return someReactiveApiClient.confirm().onErrorMap(...)
.publishOn(Schedulers.boundedElastic())
.doOnComplete(doSomeBackgroundJob);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With