According to the document reference the groupBy operator splits a given Flux into multiple GroupedFlux depending on the keymapper function of the operator. If I execute the following code with a range of 257 integers, it works correctly bu not with 258
public void groupByTest() {
Flux.range(1, 258)
.groupBy(val -> val)
.concatMap(g -> g.map(val -> val + "test"))
.doOnNext(System.out::println)
.blockLast();
}
Is that mean that the groupBy operator cannot create more than 257 groups?
As stated in the groupBy javadoc:
The groups need to be drained and consumed downstream for groupBy to work correctly. Notably when the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream (eg. due to a
flatMapwith amaxConcurrencyparameter that is set too low).
What that means is that once a group is emitted, groupBy needs to get more request for it to make progress. By default, it opens up to 256 groups and then it needs either more request or to detect that a group is complete. And groupBy cannot "know" if a group is complete until either:
prefetch, or if groupBy received onComplete signal from the source)Both the val -> val criteria and concatMap work against these requirements.
The groupBy criteria ultimately generates as many groups as there are values. Here 258 groups, vs a default capacity for groupBy to keep track of 256 groups.
Note: If the whole sequence starts less than 256 groups, it would work fine. Try setting the criteria to
val -> val % 2and see that it works. Then try to bump the range torange(1, 513)and see how it hangs again.
The last test was limited to 512 elements due to how concatMap works.
concatMap is especially bad in our case, because it will only subscribe to the next group and make progress when the first group has completed. This clashes with condition B) above, creating a situation where neither groupBy nor concatMap can make progress.
Note: In the small example with 513,
concatMapwould start consuming group 1 and wait for it to complete before it consumes group 2. BUTgroupBystops emitting once it has fetched 256 elements for group 1 and then waits for downstream to start consuming group 2. As a result, it has just too few data to detect that the group is complete,concatMapwaits for that completion signal and never subscribes to group 2, hanging the whole thing.Using a
flatMapwould fix that, becauseflatMapwill subscribe to multiple groups concurrently, and 2 groups is no trouble for it: it will consume both groups and make progress.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With