Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

multiprocessing with nested dictionary

Is there a way to pass a nested dictionary to multiprocessing?

d = {'a': {'x': 1, 'y':100},
     'b': {'x': 2, 'y':200}}

I was hoping to start two parallel jobs, one for {'a': {'x':1, 'y':100}} and another for {'b': {'x': 2, 'y':200}}, and use the following function to create a new dictionary

def f(d):
    key = dd.keys()
    new_d[key]['x'] = d[key]['x']*2
    new_d[key]['y'] = d[key]['y']*2

This was my unsuccessful attempt

import multiprocessing

def f(key, d, container):
    container[key]['x'] = d[key]['x']*2
    container[key]['y'] = d[key]['y']*2
    
if __name__ == '__main__':
    manager = multiprocessing.Manager()
    container = manager.dict()
    d = manager.dict()
    
    d['a'] = {'x': 1, 'y':100}
    d['b'] = {'x': 2, 'y':200}
        
    p1 = multiprocessing.Process(target=f, args=('a',d, container))
    p2 = multiprocessing.Process(target=f, args=('b',d, container))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()

I get a KeyError: 'b' and also, I would like to avoid having to specify the number of processes manually, like p1 and p2 and so on. Is there maybe another way?

like image 361
HappyPy Avatar asked Aug 31 '25 16:08

HappyPy


1 Answers

@nonDucor is right: You have to create the nested dictionaries using the Manager object.

The following is an abbreviated solution using more Pythonic dictionary creation, as well as using the ProcessPoolExecutor interface for concurrency:

from concurrent.futures import ProcessPoolExecutor as Executor
import multiprocessing

def f(key, d, container):
    container[key]['x'] = d[key]['x'] * 2
    container[key]['y'] = d[key]['y'] * 2

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    d = manager.dict({
        'a': manager.dict({'x': 1, 'y': 100}),
        'b': manager.dict({'x': 2, 'y': 200}),
    })
    container = manager.dict({x: manager.dict() for x in d.keys()})
    with Executor() as exe:
        exe.submit(f, 'a', d, container)
        exe.submit(f, 'b', d, container)
        
    for the_dict in (d, container):
        print([the_dict[x].items() for x in the_dict.keys()])

For comparison, below we use multithreading instead of multiprocessing. Since memory is shared by both threads, there's no need for protected dictionaries--plain old dicts work just fine. But to make the target dictionary more dynamic and independent of the source dict upon creation, we use the defaultdict of defaultdicts data structure:

from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor as Executor

def f(key, d, container):
    container[key]['x'] = d[key]['x'] * 2
    container[key]['y'] = d[key]['y'] * 2

if __name__ == '__main__':
    d ={
        'a': {'x': 1, 'y': 100},
        'b': {'x': 2, 'y': 200},
    }
    container = defaultdict(lambda: defaultdict(int))
    with Executor() as exe:
        exe.submit(f, 'a', d, container)
        exe.submit(f, 'b', d, container)
        
    for the_dict in (d, container):
        print([the_dict[x].items() for x in the_dict.keys()])
like image 63
Velimir Mlaker Avatar answered Sep 02 '25 06:09

Velimir Mlaker