I'd like to know how multiprocessing is done right. Assuming I have a list [1,2,3,4,5] generated by function f1 which is written to a Queue (left green circle). Now I start two processes pulling from that queue (by executing f2 in the processes). They process the data, say: doubling the value, and write it to the second queue. Now, function f3 reads this data and prints it out.

Inside the functions there is a kind of a loop, trying to read from the queue forever. How do I stop this process?
Idea 1
f1 does not only send the list, but also a None object or a custon object, class PipelineTerminator: pass or some such which is just being propagated all the way down. f3 now waits for None to come, when it's there, it breaks out of the loop. Problem: it's possible that one of the two f2s reads and propagates the None while the other one it still processing a number. Then the last value is lost.
Idea 2
f3 is f1. So the function f1 generates the data and the pipes, spawns the processes with f2 and feeds all data. After spawning and feeding, it listens on the second pipe, simply counting and processing the received objects. Because it knows how much data fed, it can terminate the processes executing f2. But if the target is to set up a processing pipeline, the different steps should be separable. So f1, f2 and f3 are different elements of a pipeline, and the expensive steps are done in parallel.
Idea 3

Each piece of the pipeline is a function, this function spawns processes as it likes to and is responsible to manage them. It knows, how much data came in and how much data has been returned (with yield maybe). So it's safe to propagate a None object.
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(Instead of using threads, maybe one can think of a smarter solution.)
So ...
I have implemented a solution following idea 2, feeding and waiting for the results to arrive, but it was not really a pipeline with independent functions plugged together. It worked for the task I had to manage, but was hard to maintain.
I'd like to hear from you now how you implement pipelines (easy in one process with generator functions and so on, but with multiple processes?) and manage them usually.
What is a Pipe. In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing.
Multiprocessing is the ability of a system to run multiple processors at one time. If you had a computer with a single processor, it would switch between multiple processes to keep all of them running.
We can return a variable from a process using the multiprocessing. Queue class. A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
2-Use Cases for Multiprocessing: Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. Show activity on this post. Process may have multiple threads. These threads may share memory and are the units of execution within a process.
With MPipe module, simply do this:
from mpipe import OrderedStage, Pipeline
def f1(value):
return value * 2
def f2(value):
print(value)
s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))
for task in 1, 2, 3, 4, 5, None:
p.put(task)
The above runs 4 processes:
The MPipe cookbook offers some explanation of how processes are shut down internally using None as the last task.
To run the code, install MPipe:
virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py
Output:
2
4
6
8
10
For Idea 1, how about:
import multiprocessing as mp
sentinel=None
def f2(inq,outq):
while True:
val=inq.get()
if val is sentinel:
break
outq.put(val*2)
def f3(outq):
while True:
val=outq.get()
if val is sentinel:
break
print(val)
def f1():
num_workers=2
inq=mp.Queue()
outq=mp.Queue()
for i in range(5):
inq.put(i)
for i in range(num_workers):
inq.put(sentinel)
workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)]
printer=mp.Process(target=f3,args=(outq,))
for w in workers:
w.start()
printer.start()
for w in workers:
w.join()
outq.put(sentinel)
printer.join()
if __name__=='__main__':
f1()
The only difference from the description of Idea 1 is that f2 breaks out of the while-loop when it receives the sentinel (thus terminating itself). f1 blocks until the workers are done (using w.join()) and then sends f3 the sentinel (signaling that it break out of its while-loop).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With