I have some pretty simple code, read a bunch of Strings & apply a filter. I expected the filter to run on multiple threads.
Iterable<String> outputs = Observable
.from(Files.readLines(new File("E:\\SAMA\\Test\\ImageNetBullets.txt"), Charset.forName("utf-8")))
.take(20).subscribeOn(Schedulers.from(threadPoolExecutor)).filter(str -> isURLOK(str))
.toBlocking().toIterable();
From the logs, it seems that the Filter method is running on just 1 thread:
In Thread pool-1-thread-1
In Thread pool-1-thread-1
http://farm2.static.flickr.com/1258/1479683334_3ff920d217.jpg
In Thread pool-1-thread-1
In Thread pool-1-thread-1
How do I speed it up?
RxJava is sequential by nature. For example, using map(Func1)
, the Func1
itself will be executed non-concurrently with the values passing through the parent sequence:
Observable.range(1, 10).map(v -> v * v).subscribe(System.out::println);
Here, the lambda v -> v * v will be called with values 1 thru 10 in a sequential manner.
RxJava can be asynchronous in the manner that stages in a pipeline (range->map->subscribe) can happen concurrently/parallel relative to each other. For example:
Observable.range(1, 10)
.subscribeOn(Schedulers.computation())
.map(v -> v * v) // (1)
.observeOn(Schedulers.io())
.map(v -> -v) // (2)
.toBlocking()
.subscribe(System.out::println); // (3)
Here, (1) may run in parallel with (2) and (3), i.e., while (2) calculates a v = 3 * 3, (1) may already calculate v = 5 and (3) is printing out -1
at the same time.
In case you want to process elements of the sequence concurrently, you have to "fork out" the sequence into sub-Observable
s, then join back the results with flatMap
:
Observable.range(1, 10)
.flatMap(v ->
Observable.just(v)
.subscribeOn(Schedulers.computation())
.map(v -> v * v)
)
.toBlocking()
.subscribe(System.out::println);
Here, each value v
will start a new Observable
that runs on a background thread and computes via map() there. v = 1
may run on the thread 1, v = 2
may run on thread 2, v = 3
may run on thread 1 but strictly after v = 1
has been computed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With