Let me start by saying that I'm not using a Queue, so this question is not a duplicate of this one and I'm not using a process pool, so it's not a duplicate of this one.
I have a Process object that uses a pool of thread workers to accomplish some task. For the sake of an MCVE, this task is just constructing a list of the integers from 0 to 9. Here's my source:
#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process
from sys import stdout
class Quest():
def __init__(self):
pass
def doIt(self, i):
return i
class Test(Process):
def __init__(self, arg):
super(Test, self).__init__()
self.arg = arg
self.pool = Pool()
def run(self):
quest = Quest()
done = self.pool.map_async(quest.doIt, range(10), error_callback=print)
stdout.flush()
self.arg = [item for item in done.get()]
def __str__(self):
return str(self.arg)
# I tried both with and without this method
def join(self, timeout=None):
self.pool.close()
self.pool.join()
super(Test, self).join(timeout)
test = Test("test")
print(test) # should print 'test' (and does)
test.start()
# this line hangs forever
_ = test.join()
print(test) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'
This is a pretty rough model of what I want my actual program to do. The problem, as indicated in the comments, is that Test.join always hangs forever. That's totally independent of whether or not that method is overridden in the Test class. It also never prints anything, but the output when I send a KeyboardInterrupt signal indicates that the problem lies in getting the results from the workers:
test
^CTraceback (most recent call last):
File "./test.py", line 44, in <module>
Process Test-1:
_ = test.join()
File "./test.py", line 34, in join
super(Test, self).join(timeout)
File "/path/to/multiprocessing/process.py", line 124, in join
res = self._popen.wait(timeout)
File "/path/to/multiprocessing/popen_fork.py", line 51, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/path/to/multiprocessing/popen_fork.py", line 29, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
File "/path/to/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "./test.py", line 25, in run
self.arg = [item for item in done.get()]
File "/path/to/multiprocessing/pool.py", line 638, in get
self.wait(timeout)
File "/path/to/multiprocessing/pool.py", line 635, in wait
self._event.wait(timeout)
File "/path/to/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/path/to/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
Why doesn't the stupid process stupid exit? The only thing a worker does is a single dereference and function call that executes one operation, it should be really simple.
I forgot to mention: This works fine if I make Test a subclass of threading.Thread instead of multiprocessing.Process. I'm really not sure why this breaks it in half.
Your goal is to do this work asynchronously. Why not spawn the asynchronous subprocess workers from your main process WITHOUT spawning a child process (class Test)? The results will be available in your main process and no fancy stuff needs to be done. You can stop reading here if you choose to do this. Otherwise, read on.
Your join is running forever because there are two separate pools, one when you create the process object (local to your main process), and another when you fork the process by calling process.start() (local to the spawned process)
For example, this doesn't work:
def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
self.pool = Pool()
def run(self):
iterable = list(range(10))
self.shared.extend(self.pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared))
self.pool.close()
However, this works:
def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
def run(self):
pool = Pool()
iterable = list(range(10))
self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared))
pool.close()
This has to do with the fact that when you spawn a process, the entire code, stack, and heap segments of your process is cloned into the process such that your main process and subprocess have separate contexts.
So, you are calling join() on the pool object created local to your main process, and that calls close() on the pool. Then, in run() there's another pool object that was cloned into the subprocess when start() was called, and that pool was never closed and cannot be joined in the way you're doing it. Simply put, your main process has no reference to the cloned pool object in the subprocess.
This works fine if I make Test a subclass of threading.Thread instead of multiprocessing.Process. I'm really not sure why this breaks it in half.
Makes sense, because threads differ from processes in that they have independent call stacks, but share the other segments of memory, so any updates you make to an object created in another thread is visible in your main process (which is the parent of these threads) and vice versa.
Resolution is to create the pool object local to the run() function. Close the pool object in the subprocess context, and join the subprocess in the main process. Which brings us to #2...
If someone more experienced with Manager() wants to chime in on its innards, that'd be cool. But, the following code gives you your expected behavior:
#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process, Manager
from sys import stdout
class Quest():
def __init__(self):
pass
def doIt(self, i):
return i
class Test(Process):
def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
def run(self):
with Pool() as pool:
iterable = list(range(10))
self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared)) # can remove, just to make sure we've updated state
def __str__(self):
return str(self.arg)
with Manager() as manager:
res = manager.list()
test = Test("test", res)
print(test) # should print 'test' (and does)
test.start()
test.join()
print("2" + str(res)) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'
Outputs:
rpg711$ python multiprocess_async_join.py
test
1[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With