I want to handle Flux to limit concurrent HTTP requests made by List of Mono.
When some requests are done (received responses), then service requests another until the total count of waiting requests is 15.
A single request returns a list and triggers another request depending on the result.
At this point, I want to send requests with limited concurrency. Because consumer side, too many HTTP requests make an opposite server in trouble.
I used flatMapMany like below.
public Flux<JsonNode> syncData() {
return service1
.getData(param1)
.flatMapMany(res -> {
List<Mono<JsonNode>> totalTask = new ArrayList<>();
Map<String, Object> originData = service2.getDataFromDB(param2);
res.withArray("data").forEach(row -> {
String id = row.get("id").asText();
if (originData.containsKey(id)) {
totalTask.add(service1.updateRequest(param3));
} else {
totalTask.add(service1.deleteRequest(param4));
}
originData.remove(id);
});
for (left) {
totalTask.add(service1.createRequest(param5));
}
return Flux.merge(totalTask);
});
}
void syncData() {
syncDataService.syncData().????;
}
I tried chaining .window(15), but it doesn't work. All the requests are sent simultaneously.
How can I handle Flux for my goal?
I am afraid Project Reactor doesn't provide any implementation of either rate or time limit.
However, you can find a bunch of 3rd party libraries that provide such functionality and are compatible with Project Reactor. As far as I know, resilience4-reactor supports that and is also compatible with Spring and Spring Boot frameworks.
The
RateLimiterOperatorchecks if a downstream subscriber/observer can acquire a permission to subscribe to an upstream Publisher. If the rate limit would be exceeded, theRateLimiterOperatorcould either delay requesting data from the upstream or it can emit aRequestNotPermittederror to the downstream subscriber.
RateLimiter rateLimiter = RateLimiter.ofDefaults("name");
Mono.fromCallable(backendService::doSomething)
.transformDeferred(RateLimiterOperator.of(rateLimiter))
More about RateLimiter module itself here: https://resilience4j.readme.io/docs/ratelimiter
You can use limitRate on a Flux. you need to probably reformat your code a bit but see docs here: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#limitRate-int-
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With