I want to apply a function in parallel using multiprocessing.Pool. The problem is that if one function call triggers a segmentation fault the Pool hangs forever. Has anybody an idea how I can make a Pool that detects when something like this happens and raises an error?
The following example shows how to reproduce it (requires scikit-learn > 0.14)
import numpy as np
from sklearn.ensemble import gradient_boosting
import time
from multiprocessing import Pool
class Bad(object):
    tree_ = None
def fit_one(i):
    if i == 3:
        # this will segfault                                                    
        bad = np.array([[Bad()] * 2], dtype=np.object)
        gradient_boosting.predict_stages(bad,
                                         np.random.rand(20, 2).astype(np.float32),
                                         1.0, np.random.rand(20, 2))
    else:
        time.sleep(1)
    return i
pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
    print o
As described in the comments, this just works in Python 3 if you use concurrent.Futures.ProcessPoolExecutor instead of multiprocessing.Pool.
If you're stuck on Python 2, the best option I've found is to use the timeout argument on the result objects returned by Pool.apply_async and Pool.map_async. For example:
pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
    print o.get(timeout=1000)  # allow 1000 seconds max
This works as long as you have an upper bound for how long a child process should take to complete a task.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With