I am trying to get execution time for reading from redis in reactive programming, on looking up docs I am able to see that elapsed()
method will does the same and implemented code as below.
Flux.fromIterable(getActions(httpHeaders))
.parallel()
.runOn(Schedulers.parallel())
.flatMap(actionFact -> methodToReadFromCache(actionFact))
.sequential();
public Mono<ActionFact> methodToReadFromCache(actionFact) {
return Mono.fromCallable(() -> getKey(actionFact))
.flatMap(cacheKey ->
redisOperations.hasKey(key)
.flatMap(aBoolean -> {
if (aBoolean) {
return redisOperations.opsForValue().get(cacheKey);
}
return authzService.getRolePermissions(actionFact)
.flatMap(policySetResponse ->
//save in cache
);
})
.elapsed()
.flatMap(lambda -> {
LOG.info("cache/service processing key:{}, time:{}", key, lambda.getT1());
return Mono.just(lambda.getT2());
});
Output:
cache/service processing key:KEY1, time:3
cache/service processing key:KEY2, time:4
cache/service processing key:KEY3, time:18
cache/service processing key:KEY4, time:34
cache/service processing key:KEY5, time:46
cache/service processing key:KEY6, time:57
cache/service processing key:KEY7, time:70
cache/service processing key:KEY8, time:81
cache/service processing key:KEY9, time:91
cache/service processing key:KEY10, time:103
cache/service processing key:KEY11, time:112
cache/service processing key:KEY12, time:121
cache/service processing key:KEY13, time:134
cache/service processing key:KEY14, time:146
cache/service processing key:KEY15, time:159
I am expecting that time taken for each of the cache request will be <5 milliseconds like first and second request but not the case. Does elapsed()
add current fetching time to the cummulative? As per my understanding each item emmitted from flux is independent?
Mono#elapsed
measures the time between when the Mono
is subscribed to and the moment the Mono
emits an item (onNext
).
What causes the subscription and the start of the timer, in your case, is the outer parallelized flatMap
that calls methodToReadFromCache
.
What causes the onNext and thus what is timed is the combination of hasKey
and the if/else part (redisOperations.opsForValue().get(cacheKey)
vs authzService
).
The outer flatMap should at least as many timers as there are CPUs, since we're in parallel mode.
But the fact that the timings are skewed could hint at the fact that something is either blocking or has limited capacity. For example, could it be that the redisTemplate can only process a few keys at a time?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With