I have a Python function which accepts an argument and prints a number of text lines to stdout. I grab the stdout of this function and extract the relevant information.
The call to this function is very costly, it takes few minutes to complete. To speed up the computations, I use Python multiprocessing to run this function in parallel. The problem is now how to differentiate the stdout of each worker?
What could be the simplest way to keep track of each workers output to stdout? Can I redirect each workers output to some file descriptor and then read each file descriptor at the other end?
Note: I have no control over the function writing to stdout.
Assuming you're using a separate Process
for each task (which has downsides—e.g., running 200 tasks all at once will generally be slower than running 8 at a time, especially if you're on a platform like Windows where process spawning is a bit expensive—but may be worth it), this isn't too hard to do.
The first key is that you have to replace sys.stdout
in the child process, not the parent. Where can you do that? You can either subclass Process
to make the run
method first do your setup work, and then call the super's run
, or you can wrap each task's function in a function that first does the setup work, then calls the real function.
Next, what can you replace it with? You can have it write to a separate Pipe
that's passed in by the parent process. And, at least on Unix, just setting sys.stdout
to a pipe's write end should just work. But I'm not sure it does on Windows. So if you want it to be cross-platform (and since you won't tell me what platform(s) you care about, that appears to be necessary), writing to files is dead simple:
For example:
tempdir = tempfile.mkdtemp()
def wrap(task, name):
def wrapper(*args, **kwargs):
with open(os.path.join(tempdir, name), 'w') as f:
sys.stdout = f
task(*args, **kwargs)
return wrapper
procs = []
for i in range(8):
name = str(i)
proc = multiprocessing.Process(target=wrap(task, name), name=name, ...)
proc.start()
procs.append(proc)
for proc in procs:
proc.join()
with open(os.path.join(tempdir, proc.name)) as f:
do_stuff_with(f.read())
shutil.rmtree(tempdir)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With