Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

subscribe_on with from_iterable/range in RxPY

I'm trying to get my head around scheduling in reactive extensions for python. I would like to use subscribe_on to process multiple observables in parallel. This works fine if the observable is created with just, but not if for example range or from_ are used.

just defaults to Scheduler.immediate, while other generators default to Scheduler.current_thread. Which causes the difference, but feels inconsistent to me. Probably because I don't grasp the full problem.
Consider the following example:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
    .do_action(work)\
    .subscribe_on(Scheduler.new_thread)\
    .subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
    .do_action(work) \
    .subscribe_on(Scheduler.new_thread) \
    .subscribe(finish)

It works with observe_on or if the Scheduler is passed directly to the generator, but I would like to decouple observable creation from processing and achieve something like this:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


def factory_single():
    return rx.Observable.just(1).do_action(work)


def factory_multiple():
    return rx.Observable.range(2, 4).do_action(work)


def process(factory):
    factory().subscribe_on(Scheduler.new_thread).subscribe(finish)

# Creates a new thread (I like)
process(factory_single)

# Runs on MainThread (I don't like)
process(factory_multiple)

Am I misunderstanding subscribe_on? Is my approach wrong?

like image 472
Philipp Melab Avatar asked Oct 20 '22 03:10

Philipp Melab


1 Answers

There are three actions in your example that can be scheduled independently:

  1. Data feed action. just and range use different schedulers by default, but there is not much difference between them. Both feed in initial values on the current thread. You can override their default schedulers to whatever you wish by passing it as a parameter to these methods.

  2. Subscribe action. Uses Scheduler.current_thread by default. I.e. it is executed on the same thread as data feed action. Can be overridden by subscribe_on method.

  3. Observe (on_next, on_error, on_completed) action. Uses Scheduler.current_thread by default. I.e. it is executed on the same thread as subscribe action. Can be overridden by observe_on method.

In case you override scheduler for only one of these actions the others shall follow as described above.

About schedulers

Scheduler.immediate does not really schedule anything. It invokes action immediately on the same thread where it was scheduled.

Scheduler.current_thread avoids recursion by queuing actions but still invokes action on the same thread where it was scheduled.

Scheduler.new_thread launches single background thread to execute actions one after another.

Scheduler.timeout launches new background thread for every action it needs to execute.

Attempt parallel processing

The most appropriate method for scheduling work in different threads seems to be observe_on.

The problem though is that there is no thread_pool scheduler in RxPy right now. new_thread scheduler launches just one thread so it will not help you much.

timeout scheduler can be used to go parallel but it offers no control on the number of concurrent threads, therefore explosive growth in the number of concurrent tasks can overflow memory and effectively crash your system.

NOT a bug in observe_on

I tried running your example with observe_on(Scheduler.timeout) but the tasks still did not go in parallel. After looking into the RxPy source I have discovered that it schedules next event only after current event completes, which effectively disables parallel processing. My first reaction was to report a bug in observe_on implementation.

But after further investigation I've found that serial execution is not a bug but rather intended behavior.

Proper way to execute tasks in parallel

Here is the code that works (based on this answer):

Observable.range(1, 3) \
  .select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
  .observe_on(Scheduler.event_loop) \
  .subscribe(finish)

Observable.start creates asynchronous observable that is scheduled on a separate thread via Scheduler.timeout.

observe_on(Scheduler.event_loop) is optional. It forces finish method for all items to be called on the same thread.

Please note that there is no guarantee that finish method is called in initial range order.

like image 117
Yaroslav Stavnichiy Avatar answered Nov 09 '22 17:11

Yaroslav Stavnichiy



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!