Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Grabbing stdout of a function with multiprocessing

Tags:

python

I have a Python function which accepts an argument and prints a number of text lines to stdout. I grab the stdout of this function and extract the relevant information.

The call to this function is very costly, it takes few minutes to complete. To speed up the computations, I use Python multiprocessing to run this function in parallel. The problem is now how to differentiate the stdout of each worker?

What could be the simplest way to keep track of each workers output to stdout? Can I redirect each workers output to some file descriptor and then read each file descriptor at the other end?

Note: I have no control over the function writing to stdout.

like image 353
Vladislavs Dovgalecs Avatar asked Sep 06 '25 15:09

Vladislavs Dovgalecs


1 Answers

Assuming you're using a separate Process for each task (which has downsides—e.g., running 200 tasks all at once will generally be slower than running 8 at a time, especially if you're on a platform like Windows where process spawning is a bit expensive—but may be worth it), this isn't too hard to do.

The first key is that you have to replace sys.stdout in the child process, not the parent. Where can you do that? You can either subclass Process to make the run method first do your setup work, and then call the super's run, or you can wrap each task's function in a function that first does the setup work, then calls the real function.

Next, what can you replace it with? You can have it write to a separate Pipe that's passed in by the parent process. And, at least on Unix, just setting sys.stdout to a pipe's write end should just work. But I'm not sure it does on Windows. So if you want it to be cross-platform (and since you won't tell me what platform(s) you care about, that appears to be necessary), writing to files is dead simple:

For example:

tempdir = tempfile.mkdtemp()

def wrap(task, name):
    def wrapper(*args, **kwargs):
        with open(os.path.join(tempdir, name), 'w') as f:
            sys.stdout = f
            task(*args, **kwargs)
    return wrapper

procs = []
for i in range(8):
    name = str(i)
    proc = multiprocessing.Process(target=wrap(task, name), name=name, ...)
    proc.start()
    procs.append(proc)
for proc in procs:
    proc.join()
    with open(os.path.join(tempdir, proc.name)) as f:
        do_stuff_with(f.read())
shutil.rmtree(tempdir)
like image 85
abarnert Avatar answered Sep 08 '25 09:09

abarnert