Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to specify ForkJoinPool for Java 8 parallel stream?

As I know, the parallel streams use the default ForkJoinPool.commonPool which by default has one less threads than your processors. I want to use my own custom thread pool.

Like this:

@Test
public void stream() throws Exception {
    //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    ForkJoinPool pool = new ForkJoinPool(10);
    List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
    long start = System.currentTimeMillis();
    List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
        try {
            // read from database
            Thread.sleep(1000);
            System.out.println("task" + item + ":" + Thread.currentThread());
        } catch (Exception e) {
        }
        return item * 10;
    })).get().collect(Collectors.toList());
    System.out.println(result);
    System.out.println(System.currentTimeMillis() - start);
}

And the result: enter image description here

My custom ForkJoinPool is never used. And I change the default parallelism like this:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

It works well - the tasks cost only about 1 second.

In my application the task contains heavy IO operation (reading data from db). So I need higher parallelism, but I do not want to change the JVM property.

So what is the right way to specify my own ForkJoinPool?

Or how to use parallel streams in IO-intensive situation?

like image 713
Jerry Zhou Avatar asked Sep 12 '18 04:09

Jerry Zhou


1 Answers

Streams are lazy; all work is done when you commence a terminal operation. In your case, the terminal operation is .collect(Collectors.toList()), which you call in the main thread on the result of get(). Therefore, the actual work will be done the same way as if you’ve constructed the entire stream in the main thread.

For your pool to have an effect, you have to move the terminal operation into the submitted task:

ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);

We can also demonstrate the relevance of the terminal operation by constructing the stream in the main thread and only submitting the terminal operation to the pool:

Stream<Integer> stream = testList.parallelStream().map(item -> {
    try {
        // read from database
        Thread.sleep(1000);
        System.out.println("task" + item + ":" + Thread.currentThread());
    } catch (InterruptedException e) {}
    return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();

But you should keep in mind that this is undocumented behavior, which is not guaranteed. The actual answer must be that the Stream API in its current form, with no thread control (and no help for dealing with checked exceptions), is not suitable for parallel I/O operations.

like image 127
Holger Avatar answered Oct 12 '22 19:10

Holger