I have an array of multiple URLs and ports. For each of them, I need to send and receive something back:
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort()))
I communicate with the servers in UDP so I can't tell if a server is alive or not unless I send a message which 'by some set of rules, need to respond to it.
ConnectToTracker.connect may send a onNext signal if the server response or onError signal if, for example, the server doesn't response (SocketTimeOutException) or any other failure (general IOException).
I don't want to terminate the flux if, for example, the onError signal is equal to SocketTimeOutException. Instead, I would like to try communicating with every tracker I got.
This link contains all the operations I can use to handle errors but not ignore them.
I'm using Reactor 3 if this matters.
Update:
I made an ugly trick, but works:
Flux.fromArray(trackersArray)
.handle((Tracker tracker, SynchronousSink<ConnectResponse> sink) -> {
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.subscribe(sink::next, error -> {
if (!(error instanceof SocketTimeoutException))
sink.error(error);
}, sink::complete);
})
Please fill free to answer if you have anything better.
since you are already processing urls in a flatmap, use onErrorResume(e -> Mono.empty()). this will let flatmap ignore the error.
edit: within the flatmap, on the right hand side of the lambda
Flux.fromArray(trackersArray)
.flatMap(tracker ->
ConnectToTracker.connect(tracker.getTracker(), tracker.getPort())
.onErrorResume(SocketTimeoutException.class, __ -> Mono.empty()))
Maybe this is better of doing the same it will on recover from SocketTimeOut and if the exception is other i will go for the onError
Now we have reactor.core.publisher.onErrorContinue() in version 3.3.2, which allows you to send onNext() signal when some elements is onError(). Use log() you will see better.
The signature is (throwable, instance) so if you want to log the errored out one, is useful.
Flux.fromIterable(aList)
.flatMap(this::xxxx)
.onErrorContinue((throwable, o) -> {
log.error("Error while processing {}. Cause: {}", o, throwable.getMessage());
})
....
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With