Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Working with deque object across multiple processes

I'm trying to reduce the processing time of reading a database with roughly 100,000 entries, but I need them to be formatted a specific way, in an attempt to do this, I tried to use python's multiprocessing.map function which works perfectly except that I can't seem to get any form of queue reference to work across them.

I've been using information from Filling a queue and managing multiprocessing in python to guide me for using queues across multiple processes, and Using a global variable with a thread to guide me for using global variables across threads. I've gotten the software to work, but when I check the list/queue/dict/map length after running the process, it always returns zero

I've written a simple example to show what I mean: You have to run the script as a file, the map's initialize function does not work from the interpreter.

from multiprocessing import Pool
from collections import deque

global_q = deque()

def my_init(q):
    global global_q
    global_q = q
    q.append("Hello world")


def map_fn(i):
    global global_q
    global_q.append(i)


if __name__ == "__main__":
    with Pool(3, my_init, (global_q,)) as pool:
        pool.map(map_fn, range(3))
    for p in range(len(global_q)):
        print(global_q.pop())

Theoretically, when I pass the queue object reference from the main thread to the worker threads using the pool function, and then initialize that thread's global variables using with the given function, then when I insert elements into the queue from the map function later, that object reference should still be pointing to the original queue object reference (long story short, everything should end up in the same queue, because they all point to the same location in memory).

So, I expect:

Hello World
Hello World
Hello World
1
2
3

of course, the 1, 2, 3's are in arbitrary order, but what you'll see on the output is ''.

How come when I pass object references to the pool function, nothing happens?

like image 454
iggy12345 Avatar asked Nov 17 '25 04:11

iggy12345


1 Answers

Here's an example of how to share something between processes by extending the multiprocessing.managers.BaseManager class to support deques.

There's a Customized managers section in the documentation about creating them.

import collections
from multiprocessing import Pool
from multiprocessing.managers import BaseManager


class DequeManager(BaseManager):
    pass

class DequeProxy(object):
    def __init__(self, *args):
        self.deque = collections.deque(*args)
    def __len__(self):
        return self.deque.__len__()
    def appendleft(self, x):
        self.deque.appendleft(x)
    def append(self, x):
        self.deque.append(x)
    def pop(self):
        return self.deque.pop()
    def popleft(self):
        return self.deque.popleft()

# Currently only exposes a subset of deque's methods.
DequeManager.register('DequeProxy', DequeProxy,
                      exposed=['__len__', 'append', 'appendleft',
                               'pop', 'popleft'])


process_shared_deque = None  # Global only within each process.

def my_init(q):
    """ Initialize module-level global. """
    global process_shared_deque
    process_shared_deque = q
    q.append("Hello world")


def map_fn(i):
    process_shared_deque.append(i)  # deque's don't have a "put()" method.


if __name__ == "__main__":
    manager = DequeManager()
    manager.start()
    shared_deque = manager.DequeProxy()

    with Pool(3, my_init, (shared_deque,)) as pool:
        pool.map(map_fn, range(3))

    for p in range(len(shared_deque)):  # Show left-to-right contents.
        print(shared_deque.popleft())

Output:

Hello world
0
1
2
Hello world
Hello world
like image 115
martineau Avatar answered Nov 19 '25 19:11

martineau



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!