As I know, the parallel streams use the default ForkJoinPool.commonPool
which by default has one less threads than your processors. I want to use my own custom thread pool.
Like this:
@Test
public void stream() throws Exception {
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (Exception e) {
}
return item * 10;
})).get().collect(Collectors.toList());
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
}
And the result:
My custom ForkJoinPool
is never used.
And I change the default parallelism like this:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
It works well - the tasks cost only about 1 second.
In my application the task contains heavy IO operation (reading data from db). So I need higher parallelism, but I do not want to change the JVM property.
So what is the right way to specify my own ForkJoinPool
?
Or how to use parallel streams in IO-intensive situation?
Streams are lazy; all work is done when you commence a terminal operation. In your case, the terminal operation is .collect(Collectors.toList())
, which you call in the main
thread on the result of get()
. Therefore, the actual work will be done the same way as if you’ve constructed the entire stream in the main
thread.
For your pool to have an effect, you have to move the terminal operation into the submitted task:
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
We can also demonstrate the relevance of the terminal operation by constructing the stream in the main
thread and only submitting the terminal operation to the pool:
Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();
But you should keep in mind that this is undocumented behavior, which is not guaranteed. The actual answer must be that the Stream API in its current form, with no thread control (and no help for dealing with checked exceptions), is not suitable for parallel I/O operations.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With