Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocessing Pipe is very slow (>100ms)

I'm currently writing an image processing program in Python 3.x that needs to process frames in real-time (30 FPS) with low-latency (<60ms). I have 1 parent process that reads frames and sends them to multiple child processes via a SharedMemory object. The computations done by the child processes are CPU bound and running all of them on a single core is not possible at 30 FPS. But since they work independently of each other, I decided to run them as separate processes.

Currently, I'm using Pipes to send commands to the child processes, most importantly to inform them whenever the frame is updated. On measuring the time between the send() command of the parent and the recv() command on the child, the latency is always >100ms. I used time.time_ns() for this.

This is a problem because the output feed will now always be lagging by >100ms + time taken by all the children to finish processing (another 20-30ms + the delays between all the send() functions).

The application is meant to be used on a live sports feed and therefore cannot introduce such a high latency. So I have exactly 2 questions:

  1. Are Pipes actually that slow in Python? Or is something wrong with my implementation of them. (Note: I have tested the latency on an Intel i5 9th Gen as well as an Apple M1)

  2. If Pipes indeed are this slow, do I have any other options in Python? Other than resorting to some form of sockets?

Thanks.

Edit:

I've added the code I've used to test the Pipe latency here.

import multiprocessing as mp
import time

def proc(child_conn):
    
    child_conn.recv()
    ts = time.time_ns()
    child_conn.send(ts)
    child_conn.close()

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    ts = time.time_ns()
    parent_conn.send("START")
    ts_end = parent_conn.recv()

    print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
like image 397
Abhishek Satish Avatar asked Oct 18 '25 11:10

Abhishek Satish


1 Answers

Just wrote one possible solution for you, using multiprocessing objects Process and Queue.

I measured its throughtput speed and it takes on average 150 mcs (micro-seconds) to process one task that does almost nothing. Processing just takes integer number from a task, adds 1 to it and sends it back. I think 150 micro-seconds delay should be totally enough for you to process 30 FPS.

Queue is used instead of your Pipe, as I think it is more suitable for multi-task processing. And also if your time measurements are precise then Queue is also 660x times faster than Pipe (150 Micro seconds compared to 100 Milli seconds delay).

You can notice that processing loop sends tasks in batches, meaning that first it sends many tasks to all processes and only after that gathers all sent and processed tasks. This kind of batch processing makes processing smooth, compared to sending just 1 task at a time and then gathering few results.

Even better would be if you send tasks to processes and then gather results asynchrounously in separate lighweight threads. This will prevent you blocking on waiting slowest process to finish tasks.

Processes are signalled to finish and exit by sending None task to them.

Try it online!

def process(idx, in_q, out_q):
    while True:
        task = in_q.get()
        if task is None:
            break
        out_q.put({'n': task['n'] + 1})

def main():
    import multiprocessing, time

    queue_size = 1 << 16
    procs = []
    for i in range(multiprocessing.cpu_count()):
        in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
        procs.append({
            'in_q': in_q,
            'out_q': out_q,
            'proc': multiprocessing.Process(target = process,
                kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
        })
        procs[-1]['proc'].start()

    num_blocks = 1 << 2
    block = 1 << 10
    assert block <= queue_size

    tb = time.time()
    for k in range(num_blocks):
        # Send tasks
        for i in range(block):
            for j, proc in enumerate(procs):
                proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
        # Receive tasks results
        for i in range(block):
            for proc in procs:
                proc['out_q'].get()
    print('Processing speed:', round((time.time() - tb) /
        (num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
    
    # Send finish signals to processes
    for proc in procs:
        proc['in_q'].put(None)
    # Join processes (wait for exit)
    for proc in procs:
        proc['proc'].join()

if __name__ == '__main__':
    main()

Output:

Processing speed: 150.7 mcs per task

Also measured timings for sending just 1 task at a time (instead of 1000 tasks at a time) to all processes and receiving 1 task at a time. In this case delay is 460 mcs (micro-seconds). So you can think of this as if pure delay of Queue is 460 mcs in the worst case of using it (460 mcs include both send + recv).


I've taken your example snippet and modified it a bit to use Queue instead of Pipe, and got 0.1 ms delay.

Notice that I do this in a loop 5 times because first or second try initializes some Queue related stuff.

Try it online!

import multiprocessing as mp
import time

def proc(inp_q, out_q):
    for i in range(5):
        e = inp_q.get()
        ts = float(time.time_ns())
        out_q.put(ts)

if __name__ == "__main__":

    inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
    p1 = mp.Process(target=proc, args=(inp_q, out_q))
    p1.start()

    for i in range(5):
        ts = float(time.time_ns())
        inp_q.put("START")
        ts_end = out_q.get()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
    p1.join()

Output:

Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032

Also running your example in loop several times makes second and other send/recv iterations much faster than first time.

First time is very slow due to Lazily initializing resources. Most algorithms are Lazily Initialized, meaning that they allocate all needed resources only on first call. This is needed to prevent unnecessary allocation when algorithm is not used at all. On the other side this makes first call much more slower, hence you have to do few first empty calls to pre-heat Lazy algorithm.

Try it online!

import multiprocessing as mp
import time

def proc(child_conn):
    for i in range(5):
        child_conn.recv()
        ts = time.time_ns()
        child_conn.send(ts)

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    for i in range(5):
        ts = time.time_ns()
        parent_conn.send("START")
        ts_end = parent_conn.recv()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")

Output:

Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021
like image 112
Arty Avatar answered Oct 20 '25 00:10

Arty



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!