Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why using "fork" works but using "spawn" fails in Python3.8+ `multiprocessing`?

I work on macOS and lately got bitten by the "fork" to "spawn" change in Python 3.8 multiprocessing (see doc). Below shows a simplified working example where using "fork" succeeds but using "spawn" fails. The purpose of the code is to create a custom queue object that supports calling size() under macOS, hence the inheritance from the Queue object and getting multiprocessing's context.

import multiprocessing
from multiprocessing import Process
from multiprocessing.queues import Queue
from time import sleep


class Q(Queue):
    def __init__(self):
        super().__init__(ctx=multiprocessing.get_context())
        self.size = 1

    def call(self):
        return print(self.size)


def foo(q):
    q.call()


if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')  # this would fail
    # multiprocessing.set_start_method('fork')  # this would succeed
    q = Q()
    p = Process(target=foo, args=(q,))
    p.start()
    p.join(timeout=1)

The error message output when using "spawn" is shown below.

Process Process-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/[email protected]/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/Cellar/[email protected]/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/fanchen/Private/python_work/sandbox.py", line 23, in foo
    q.call()
  File "/Users/fanchen/Private/python_work/sandbox.py", line 19, in call
    return print(self.size)
AttributeError: 'Q' object has no attribute 'size'

It seems that the child process deems self.size not necessary for code execution, so it is not copied. My question is why does this happen?

Code snippet tested under macOS Catalina 10.15.6, Python 3.8.5

like image 410
Fanchen Bao Avatar asked Oct 28 '25 04:10

Fanchen Bao


1 Answers

The problem is that spawned processes do not have shared resources, so to properly recreate the queue instance for each process you need to add serialization and deserialization methods. Here is a working code:

# Portable queue
# The idea of Victor Terron used in Lemon project (https://github.com/vterron/lemon/blob/master/util/queue.py).
# Pickling/unpickling methods are added to share Queue instance between processes correctly.

import multiprocessing
import multiprocessing.queues

class SharedCounter(object):
    """ A synchronized shared counter.

    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.

    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/

    """

    def __init__(self, n = 0):
        self.count = multiprocessing.Value('i', n)

    def __getstate__(self):
        return (self.count,)

    def __setstate__(self, state):
        (self.count,) = state

    def increment(self, n = 1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value

class Queue(multiprocessing.queues.Queue):
    """ A portable implementation of multiprocessing.Queue.

    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().

    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
        self._counter = SharedCounter(0)

    def __getstate__(self):
        return super().__getstate__() + (self._counter,)

    def __setstate__(self, state):
        super().__setstate__(state[:-1])
        self._counter = state[-1]

    def put(self, *args, **kwargs):
        super().put(*args, **kwargs)
        self._counter.increment(1)

    def get(self, *args, **kwargs):
        item = super().get(*args, **kwargs)
        self._counter.increment(-1)
        return item

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self._counter.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()
like image 110
SergeR Avatar answered Oct 30 '25 08:10

SergeR



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!