I am trying to implement this multiprocessing tutorial for my own purposes. At first I thought it did not scale well, but when I made a reproducible example I found that if the list of items goes above 124, it seems to never return an answer. At x = 124 it runs in .4 seconds, but when I set it to x = 125 it never finishes. I am running Python 2.7 on Windows 7.
from multiprocessing import Lock, Process, Queue, current_process
import time
class Testclass(object):
    def __init__(self, x):
        self.x = x
def toyfunction(testclass):
    testclass.product = testclass.x * testclass.x
    return testclass
def worker(work_queue, done_queue):
    try:
        for testclass in iter(work_queue.get, 'STOP'):
            print(testclass.counter)
            newtestclass = toyfunction(testclass)
            done_queue.put(newtestclass)
    except:
        print('error')
    return True
def main(x):
    counter = 1
    database = []
    while counter <= x:
        database.append(Testclass(10))
        counter += 1
        print(counter)
    workers = 8
    work_queue = Queue()
    done_queue = Queue()
    processes = []
    start = time.clock()
    counter = 1
    for testclass in database:
        testclass.counter = counter
        work_queue.put(testclass)
        counter += 1
        print(counter)
    print('items loaded')
    for w in range(workers):
        p = Process(target=worker, args=(work_queue, done_queue))
        p.start()
        processes.append(p)
        work_queue.put('STOP')
    for p in processes:
        p.join()
    done_queue.put('STOP')
    newdatabase = []
    for testclass in iter(done_queue.get, 'STOP'):
        newdatabase.append(testclass)
    print(time.clock()-start)
    print("Done")
    return(newdatabase)
if __name__ == '__main__':
    database = main(124)
    database2 = main(125)
OK! From the docs:
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. Note that a queue created using a manager does not have this issue. See Programming guidelines.
As I noted in a comment earlier, the code attempts to .join() processes before the done_queue Queue is drained - and that after changing the code in a funky way to be sure done_queue was drained before .join()'ing, the code worked fine for a million items.
So this is a case of pilot error, although quite obscure.  As to why behavior depends on the number passed to main(x), it's unpredictable:  it depends on how buffering is done internally.  Such fun ;-)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With