I prepare a bunch of requests that I want to send in parallel to an external webservice. In this flow, I continue to process the response directly (eg inserting something into a database).
Problem: I want to track the maximum request time (for one request!), excluding the processing. But as written, this will only track the global time including any subprocess:
StopWatch watch = new StopWatch();
watch.start();
Flux.fromIterable(requests)
.flatMap(req -> webClient.send(req, MyResponse.class)
.doOnSuccess(rsp -> processResponse(rsp))) //assume some longer routine
.collectList()
.block();
watch.stop();
System.out.println(w.getTotalTimeMillis());
Question: how can I measure the maximum time the requests took, excluding the processResponse() time?
When using elapsed on a mono, you will get back a mono of a tuple with both the elapsed time and the original object in it. You have to unwrap them to use those. I wrote an example (a bit simplified from your code) in a test to see it working:
@Test
public void elapsed() {
Flux.fromIterable(List.of(1, 2, 3, 4, 5))
.flatMap(req -> Mono.delay(Duration.ofMillis(100L * req))
.map(it -> "response_" + req)
.elapsed()
.doOnNext(it -> System.out.println("I took " + it.getT1() + " MS"))
.map(Tuple2::getT2)
.doOnSuccess(rsp -> processResponse(rsp)))
.collectList()
.block();
}
@SneakyThrows
public void processResponse(Object it) {
System.out.println("This is the response: " + it);
Thread.sleep(1000);
}
the output looks like this:
I took 112 MS
This is the response: response_1
I took 205 MS
This is the response: response_2
I took 305 MS
This is the response: response_3
I took 403 MS
This is the response: response_4
I took 504 MS
This is the response: response_5
those numbers represent both the delay (which in your case would be the webClient.send()) and a little overhead from the reactive pipeline itself. It is calculated between subscription (which happens when the flatMap for a specific request runs) and the next signal (the result from the map in my case, in yours the result of the webclient request)
your code would look something like this:
Flux.fromIterable(requests)
.flatMap(req -> webClient.send(req, MyResponse.class)
.elapsed()
.doOnNext(it -> System.out.println("I took " + it.getT1() + " MS"))
.map(Tuple2::getT2)
.doOnSuccess(rsp -> processResponse(rsp))) //assume some longer routine
.collectList()
.block();
note if you want to use a stopwatch in stead, that is also possible by doing something like:
Flux.fromIterable(List.of(1, 2, 3, 4, 5)).flatMap(req -> {
StopWatch stopWatch = new StopWatch();
return Mono.fromRunnable(stopWatch::start)
.then(Mono.delay(Duration.ofMillis(100L * req)).map(it -> "response_" + req).doOnNext(it -> {
stopWatch.stop();
System.out.println("I took " + stopWatch.getTime() + " MS");
}).doOnSuccess(this::processResponse));
}).collectList().block();
but personally I would recommend the .elapsed() solution since it's a bit cleaner.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With