Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Collect results from parallel stream

I have a piece of code like this:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    ArrayList<Egg> eggs = new ArrayList<>();
    while (hen.hasEgg()) {
        eggs.add(hen.getEgg());
    }
    return eggs;
}).flatMap(Collection::stream).collect(Collectors.toList());

But in this way I have to create an ArrayList for every hen, and eggs are not collected until a hen is 100% processed. I would like something like this:

List<Egg> eggs = hens.parallelStream().map(hen -> {
    while (hen.hasEgg()) {
        yield return hen.getEgg();
    }
}).collect(Collectors.toList());

But Java does not have yield return. Is there a way to implement it?

like image 517
Fan Avatar asked Oct 20 '25 22:10

Fan


2 Answers

Your Hen class is poorly adapted to the Stream API. Provided that you cannot change it and it has no other useful methods (like Collection<Egg> getAllEggs() or Iterator<Egg> eggIterator()), you can create an egg stream like this:

public static Stream<Egg> eggs(Hen hen) {
    Iterator<Egg> it = new Iterator<Egg>() {
        @Override
        public boolean hasNext() {
            return hen.hasEgg();
        }

        @Override
        public Egg next() {
            return hen.getEgg();
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false);
}

Now you can use it in the following manner:

List<Egg> eggs = hens.parallelStream()
                     .flatMap(hen -> eggs(hen))
                     .collect(Collectors.toList());

Of course better Stream implementation might be possible if you can change the Hen class.

like image 132
Tagir Valeev Avatar answered Oct 22 '25 11:10

Tagir Valeev


The iteration logic using hasEgg() and getEgg() is stateful as these method’s results depend on the previous invocations. Therefore, processing a single Hen can’t be parallelized unless you manage to change the interface completely.

That said, your worrying about the ArrayList is unnecessary. When the stream implementation executes the collect operation in parallel, it has to buffer the values for each thread anyway and combine these buffers afterwards. It might even be the case that the operation doesn’t benefit from parallel execution at all.

What you can do, is to replace the ArrayList by a Stream.Builder as it’s optimized for the use case of only adding until constructing the Stream:

List<Egg> eggs = hens.parallelStream().flatMap(hen -> {
    Stream.Builder<Egg> eggStream = Stream.builder();
    while(hen.hasEgg()) {
        eggStream.add(hen.getEgg());
    }
    return eggStream.build();
}).collect(Collectors.toList());
like image 37
Holger Avatar answered Oct 22 '25 11:10

Holger



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!