Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there any way to use multiprocessing.pool within a nested function or module?

thanks for taking a look at this. I confess I have been dabbling with parallel processing in python for all of 1 week now so I apologize if there is an obvious solution I missed. I have a piece of code that I would like to run several different instances of of a mp.pool(). Those that were on the main .py file called worked fine but when I tried to add them to functions in modules I get no output from them all. The app just runs past it and continues. I am thinking it may have something to do with this post but it didn't give any ideas on alternative methods to accomplish what I need. The code that works in a simple example is this:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()


def veggie():
    print 'carrot'
    status = True
    return status

results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()

And the code that doesn't work is:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

    def veggie():
        print 'carrot'
        status = True
        return status

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

Ultimately I would like that example that doesn't work to be one more step removed by having it live in another function in a separate module. So that when I import the module packngo and use it as packngo.basic_packngo(inputs) and has the contents of the nest function somewhere within it they would run. Any help would be greatly appreciated. :D I am a very simple man so if you could explain as you would to a child maybe then it will sink in my head!

like image 823
matrimcauthon Avatar asked Oct 23 '25 20:10

matrimcauthon


1 Answers

The other question you linked has the solution, it's just not spelled out: You cannot use nested functions as the func argument for the apply*/*map* family of methods on multiprocessing.Pool. They work for multiprocessing.dummy.Pool, because multiprocessing.dummy is backed by threads which can directly pass around function references, but multiprocessing.Pool must pickle the functions, and only functions with importable names can be pickled. If you check the name of a nested function, it's something like modulename.outerfuncname.<locals>.innerfuncname, and that <locals> component makes it impossible to import (which is usually a good thing; nested functions that make use of being nested usually have critical state in closure scope, which mere importing would lose).

It's perfectly fine for the callback functions to be defined in a nested fashion, as they're executed in the parent process, they aren't sent to the workers. In your case, only the callback is relying on closure scope, so it's perfectly fine to move the func (veggie) out to global scope, defining your packngo module as:

def veggie():
    print 'carrot'
    status = True
    return status

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

Yes, it means veggie becomes a public member of the module in question. You can prefix it with an underscore (_veggie) if you want to indicate it should be considered an implementation detail, but it must necessarily be global to use it with multiprocessing.Pool.

like image 168
ShadowRanger Avatar answered Oct 25 '25 08:10

ShadowRanger