When using the reduce() operation on a parallel stream, the OCP exam book states that there are certain principles the reduce() arguments must adhere to. Those principles are the following:
- The identity must be defined such that for all elements in the stream u, combiner.apply(identity, u) is equal to u.
- The accumulator operator op must be associative and stateless such that
(a op b) op cis equal toa op (b op c).- The combiner operator must also be associative and stateless and compatible with the identity, such that for all of
uandtcombiner.apply(u, accumulator.apply(identity, t))is equal toaccumulator.apply(u,t).
The book gives two examples to illustrate these principles, please see the code below:
example for associative:
System.out.println(
        Arrays.asList(1, 2, 3, 4, 5, 6)
                .parallelStream()
                .reduce(0, (a, b) -> (a - b)));
What the book says about this:
It may output -21, 3, or some other value as the accumulator function violates the associativity property.
example for the identity requirement:
System.out.println(
        Arrays.asList("w", "o", "l", "f")
                .parallelStream()
                .reduce("X", String::concat));
What the book says about this:
You can see other problems if we use an identity parameter that is not truly an identity value. It can output
XwXoXlXf. As part of the parallel process, the identity is applied to multiple elements in the stream, resulting in very unexpected data.
I don't understand those examples. With the accumulator example the accumulator starts with 0 - 1 which is -1, then -1 - 2 which is -3, then -6 etc all the way to -21. I understand that, because the generated arraylist isn't synchronized the results maybe be unpredictable because of the possibility of race conditions etc, but why isn't the accumulator associative? Wouldn't (a+b) cause unpredictable results too? I really don't see what's wrong with the accumulator being used in the example and why it's not associative, but then again I still don't exactly understand what "associative principle" means.
I don't understand the identity example either. I understand that the result could indeed be XwXoXlXf if 4 separate threads were to start accumulating with the identity at the same time, but what does that have to do with the identity parameter itself? What exactly would be a proper identity to use then?
I was wondering if anyone could enlighten me a bit more on these principles.
Thank you
Reducing is the repeated process of combining all elements. reduce operation applies a binary operator to each element in the stream where the first argument to the operator is the return value of the previous application and second argument is the current stream element.
In Java, reducing is a terminal operation that aggregates a stream into a type or a primitive type. Java 8 provides Stream API contains set of predefined reduction operations such as average(), sum(), min(), max(), and count(). These operations return a value by combining the elements of a stream.
A reduction is a terminal operation that aggregates a stream into a type or a primitive. The Java 8 Stream API contains a set of predefined reduction operations, such as average , sum , min , max , and count , which return one value by combining the elements of a stream.
Identity – an element that is the initial value of the reduction operation and the default result if the stream is empty. Accumulator – a function that takes two parameters: a partial result of the reduction operation and the next element of the stream.
why isn't the accumulator associative?
It's not associative since the order of subtraction operations determines the final result.
If you run a serial Stream, you'll get the expected result of:
0 - 1 - 2 - 3 - 4 - 5 - 6 = -21
On the other hand, for parallel Streams, the work is split to multiple threads. For example, if reduce is executed in parallel on 6 threads, and then the intermediate results are combined, you can get a different result:
0 - 1   0 - 2   0 - 3      0 - 4     0 - 5    0 - 6
  -1     -2      -3         -4        -5        -6
  -1 - (-2)         -3 - (-4)          -5 - (-6)
      1                 1                  1
           1   -   1
               0            -     1
                        -1
Or, to make a long example short:
(1 - 2) - 3 = -4
1 - (2 - 3) =  2
Therefore subtraction is not associative.
On the other hand, a+b doesn't cause the same problem, since addition is an associative operator (i.e. (a+b)+c == a+(b+c)).
The problem with the identity example is that when reduce is executed in parallel on multiple threads, "X" is appended to the starts of each intermediate result.
What exactly would be a proper identity to use then?
If you change the identity value to "" :
System.out.println(Arrays.asList("w","o","l","f"))
.parallelStream()
.reduce("", String::concat));
you'll get "wolf" instead of "XwXoXlXf".
Let me give two examples. First where the identity is broken:
int result = Stream.of(1, 2, 3, 4, 5, 6)
        .parallel()
        .reduce(10, (a, b) -> a + b);
System.out.println(result); // 81 on my run
Basically you have broken this rule: The identity value must be an identity for the accumulator function.  This means that for all u, accumulator(identity, u) is equal to u.
Or to make is simpler, let's see if that rule holds for some random data from our Stream:
 Integer identity = 10;
 BinaryOperator<Integer> combiner = (x, y) -> x + y;
 boolean identityRespected = combiner.apply(identity, 1) == 1;
 System.out.println(identityRespected); // prints false
And a second example:
/**
 * count letters, adding a bit more all the time
 */
private static int howMany(List<String> tokens) {
    return tokens.stream()
            .parallel()
            .reduce(0, // identity
                    (i, s) -> { // accumulator
                        return s.length() + i;
                    }, (left, right) -> { // combiner
                        return left + right + left; // notice the extra left here
                    });
}
And you invoke this with:
List<String> left = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee");
List<String> right = Arrays.asList("aa", "bbb", "cccc", "ddddd", "eeeeee", "");
System.out.println(howMany(left));  // 38 on my run
System.out.println(howMany(right)); // 50 on my run
Basically you have broken this rule: Additionally, the combiner function must be compatible with the accumulator function or in code :
// this must hold!
// combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
Integer identity = 0;
String t = "aa";
Integer u = 3; // "bbb"
BiFunction<Integer, String, Integer> accumulator = (Integer i, String s) -> i + s.length();
BinaryOperator<Integer> combiner = (left, right) -> left + right + left;
int first = accumulator.apply(identity, t); // 2
int second = combiner.apply(u, first); // 3 + 2 + 3 = 8
Integer shouldBe8 = accumulator.apply(u, t);
System.out.println(shouldBe8 == second); // false
While the question has already been answered and accepted, I think it can be answered in a simpler, more practical way.
If you don't have a valid identity and an associative accumulator/combiner, the result of the reduce operation will depend on:
Stream contentStream
Let's try with an example for non-associative accumulator/combiner (basically, we reduce a list of 50 numbers in a sequence and in parallel by varying the number of threads):
System.out.println("sequential: reduce="+
    IntStream.rangeClosed(1, 50).boxed()
        .reduce(
            0, 
            (a,b)->a-b, 
            (a,b)->a-b));
for (int n=1; n<6; n++) {
    ForkJoinPool pool = new ForkJoinPool(n);
    final int finalN = n;
    try {
        pool.submit(()->{
            System.out.println(finalN+" threads : reduce="+
                IntStream.rangeClosed(1, 50).boxed()
                    .parallel()
                    .reduce(
                        0, 
                        (a,b)->a-b, 
                        (a,b)->a-b));
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
This displays the following results (Oracle JDK 10.0.1) :
sequential: reduce=-1275
1 threads : reduce=325
2 threads : reduce=-175
3 threads : reduce=-25
4 threads : reduce=75
5 threads : reduce=-25
This shows that the result depends on the number of threads involved in the reduce calculation.
Notes:
Stream content and the same number of threads always leads to the same reduced value when ran several times. I suppose this is because the parallel stream uses a deterministic Spliterator.ForkJoinPool of 1,2,3,4 or 5 threads.For identity, as Eran wrote with the "XwXoXlXf" example, with 4 threads, each thread will start by using the identity as a kind of String prefix. But pay attention : while the OCP book suggests that "" and 0 are valid identity, it depends on the accumulator/combiner functions. For example:
0 is a valid identity for accumulator (a,b)->a+b (because a+0=a)1 is a valid identity for accumulator (a,b)->a*b (because a*1=a, but 0 is not valid because a*0=0!)If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With