Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelism in RxJava - Filter

I have some pretty simple code, read a bunch of Strings & apply a filter. I expected the filter to run on multiple threads.

    Iterable<String> outputs = Observable
            .from(Files.readLines(new File("E:\\SAMA\\Test\\ImageNetBullets.txt"), Charset.forName("utf-8")))
            .take(20).subscribeOn(Schedulers.from(threadPoolExecutor)).filter(str -> isURLOK(str))
            .toBlocking().toIterable();

From the logs, it seems that the Filter method is running on just 1 thread:

In Thread pool-1-thread-1
In Thread pool-1-thread-1
http://farm2.static.flickr.com/1258/1479683334_3ff920d217.jpg
In Thread pool-1-thread-1
In Thread pool-1-thread-1

How do I speed it up?

like image 307
user2849678 Avatar asked Sep 08 '25 18:09

user2849678


1 Answers

RxJava is sequential by nature. For example, using map(Func1), the Func1 itself will be executed non-concurrently with the values passing through the parent sequence:

Observable.range(1, 10).map(v -> v * v).subscribe(System.out::println);

Here, the lambda v -> v * v will be called with values 1 thru 10 in a sequential manner.

RxJava can be asynchronous in the manner that stages in a pipeline (range->map->subscribe) can happen concurrently/parallel relative to each other. For example:

Observable.range(1, 10)
.subscribeOn(Schedulers.computation())
.map(v -> v * v)                       // (1)
.observeOn(Schedulers.io())
.map(v -> -v)                          // (2)
.toBlocking()
.subscribe(System.out::println);       // (3)

Here, (1) may run in parallel with (2) and (3), i.e., while (2) calculates a v = 3 * 3, (1) may already calculate v = 5 and (3) is printing out -1 at the same time.

In case you want to process elements of the sequence concurrently, you have to "fork out" the sequence into sub-Observables, then join back the results with flatMap:

Observable.range(1, 10)
.flatMap(v -> 
    Observable.just(v)
    .subscribeOn(Schedulers.computation())
    .map(v -> v * v)
)
.toBlocking()
.subscribe(System.out::println);

Here, each value v will start a new Observable that runs on a background thread and computes via map() there. v = 1 may run on the thread 1, v = 2 may run on thread 2, v = 3 may run on thread 1 but strictly after v = 1 has been computed.

like image 101
akarnokd Avatar answered Sep 10 '25 07:09

akarnokd