Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python - Passing a TCP socket object to a multiprocessing Queue

I have a TCP server and client. At some point in the server script, I start a process, which needs to be able to get every new connection and send data to it. In order to do so, I have a multiprocessing.Queue(), to which I want to put every new connection from the main process, so that the process I opened can get the connections from it and send data to them. However, it seems that you cannot pass anything you want to a Queue. When I try to pass the connection (a socket object), I get:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found

Are there any alternatives that I could use?

like image 238
Nick K. Avatar asked Oct 20 '25 11:10

Nick K.


1 Answers

Sending a socket through a multiprocessing.Queue works fine starting with python3.4 because from that version a ForkingPickler is used to serialize the objects to be put in the queue, and that pickler knows how to serialize sockets and other objects containing a file handle.

The multiprocessing.reduction.ForkingPickler class does already exist in python2.7 and can pickle sockets, it's just not used by multiprocessing.Queue.

If you can't switch to python3.4+ and really need similar functionality in python2.7 a workaround would be to create a function that uses the ForkingPickler to serialize objects, e.g:

from multiprocessing.reduction import ForkingPickler
import StringIO

def forking_dumps(obj):
    buf = StringIO.StringIO()
    ForkingPickler(buf).dump(obj)
    return buf.getvalue()

Instead of sending the socket directly you then need to send its pickled version and unpickle it in the consumer. Simple example:

from multiprocessing import Queue, Process
from socket import socket
import pickle

def handle(q):
    sock = pickle.loads(q.get())
    print 'rest:', sock.recv(2048)

if __name__ == '__main__':
    sock = socket()
    sock.connect(('httpbin.org', 80))
    sock.send(b'GET /get\r\n')
    # first bytes read in parent
    print 'first part:', sock.recv(50)

    q = Queue()
    proc = Process(target=handle, args=(q,))
    proc.start()
    # use the function from above to serialize socket
    q.put(forking_dumps(sock))
    proc.join()

Making sockets pickleable only makes sense here in the context of multiprocessing, it would not make sense to write it to a file and use later or try to use it on a different pc or after the original process has ended. Therefore it wouldn't be a good idea to make sockets pickleable globally (e.g. by using the copyreg mechanisms).

like image 154
mata Avatar answered Oct 24 '25 06:10

mata



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!