I'm trying to learn the joblib module as an alternative to the builtin multiprocessing module in python. I'm used to using multiprocessing.imap to run a function over an iterable and returning the results as they come in. In this minimal working example, I can't figure out how to do it with joblib:
import joblib, time
def hello(n):
time.sleep(1)
print "Inside function", n
return n
with joblib.Parallel(n_jobs=1) as MP:
func = joblib.delayed(hello)
for x in MP(func(x) for x in range(3)):
print "Outside function", x
Which prints:
Inside function 0
Inside function 1
Inside function 2
Outside function 0
Outside function 1
Outside function 2
I'd like to see the output:
Inside function 0
Outside function 0
Inside function 1
Outside function 1
Inside function 2
Outside function 2
Or something similar, indicating that the iterable MP(...) is not waiting for all the results to complete. For longer demo change n_jobs=-1 and range(100).
stovfl's answer is elegant, but it only works for the first batches dispatched. In the example, it works because the workers never starve (n_tasks < 2*n_jobs). For this approach to work, the callback originally passed to apply_async must also be called. This is an instance of BatchCompletionCallBack, which schedules the next batch of tasks to be processed.
One possible solution is to wrap up arbitrary callbacks in a callable object, like this (tested in joblib==0.11, py36):
from joblib._parallel_backends import MultiprocessingBackend
from joblib import register_parallel_backend, parallel_backend
from joblib import Parallel, delayed
import time
class MultiCallback:
def __init__(self, *callbacks):
self.callbacks = [cb for cb in callbacks if cb]
def __call__(self, out):
for cb in self.callbacks:
cb(out)
class ImmediateResultBackend(MultiprocessingBackend):
def callback(self, result):
print("\tImmediateResult function %s" % result)
def apply_async(self, func, callback=None):
cbs = MultiCallback(callback, self.callback)
return super().apply_async(func, cbs)
register_parallel_backend('custom', ImmediateResultBackend)
def hello(n):
time.sleep(1)
print("Inside function", n)
return n
with parallel_backend('custom'):
res = Parallel(n_jobs=2)(delayed(hello)(y) for y in range(6))
Output
Inside function 0
Inside function 1
ImmediateResult function [0]
ImmediateResult function [1]
Inside function 3
Inside function 2
ImmediateResult function [3]
ImmediateResult function [2]
Inside function 4
ImmediateResult function [4]
Inside function 5
ImmediateResult function [5]
To get Immediate results from joblib, for instance:
from joblib._parallel_backends import MultiprocessingBackend
class ImmediateResult_Backend(MultiprocessingBackend):
def callback(self, result):
print("\tImmediateResult function %s" % (result))
# Overload apply_async and set callback=self.callback
def apply_async(self, func, callback=None):
applyResult = super().apply_async(func, self.callback)
return applyResult
joblib.register_parallel_backend('custom', ImmediateResult_Backend, make_default=True)
with joblib.Parallel(n_jobs=2) as parallel:
func = parallel(delayed(hello)(y) for y in range(3))
for f in func:
print("Outside function %s" % (f))
Output:
Note: I use time.sleep(n * random.randrange(1,5)) in def hello(...), therefore processes become different ready.
Inside function 0
Inside function 1
ImmediateResult function [0]
Inside function 2
ImmediateResult function [1]
ImmediateResult function [2]
Outside function 0
Outside function 1
Outside function 2
Tested with Python:3.4.2 - joblib:0.11
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With