Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to feed Data into Multiprocess

I have some ambiguity in me. I'm novice. I have reads that Multiprocessing are making local copies of Global Variable on each Process. However, this only applies on Windows, since it creates new instance of Python. Meanwhile, in Linux, Child Process are forked into Parent.

Now, I have Global Variable that contains states of User-Choices. Taking a similar approach to switch statement using dictionary in Python.

case = { 'lead_type': 0, 'deep': 0 }

All this time, I store them first in multiprocessing.Manager() with the idea: it will help to avoid local copy inside each Process. But, Manager() create a new Proxy to share the data across Processes, which considered slow. Thus, multiprocessing.Value() is faster because it creates a pointer for that case object.

Knowing all of that, should I stick with the use of Manager() or global? Or can I put dictionary object into Value() somehow? Is using Value() considered a good practice?

EDIT:

import re
import csv
import json
import socket
import multiprocessing
from collections import defaultdict
from multiprocessing import Manager, Process, cpu_count, Queue

#This is the Target Dictionary
case = { 'lead_type': 0, 'file_type': 0, 'deep': 0 }

def websocket_tls(proc, cases):
    ...

def consumer(procs, cases):
    while True:
        proc = procs.get()
        if proc is None:
            break
        if cases['lead_type'] == 1:
            if cases['deep'] == 1:
                websocket_tls(proc, cases)
            elif cases['deep'] == 2:
                websocket_insecure(proc, cases)
            elif cases['deep'] == 3:
                gorilla_tls(proc, cases)
            else:
                gorilla(proc, cases)
        else:
            break

def producer(procs, cases):
    plist = []
    for i in range(cpu_count()):
        procs.put(None)
        p = Process(target = consumer, args = (procs, cases))
        p.start()
        total.append(p)
    for p in plist:
        p.join()

# Another function also use Case
def processor(file):
    procs = Queue(cpu_count()*10)
    if case['deep'] == 0:
        with open('payloads.json') as f:
            payloads = json.loads(f)

    #This is my ambiguity.
    #Should I keep case global or readded it to Manager to avoid copies? :
    cases = Manager().dict()
    for i, v in case.items():
        cases[i] = v
    cases['payload'] = payloads

    columns = defaultdict(list)
    if case['file_type'] == 0:
        with open(file, 'r') as f:
            for line in f:
                liner = [line] + list(islice(f, cpu_count()))
                for i in liner:
                    procs.put(i.strip())
                producer(procs, cases)
    elif case['file_type'] == 1:
        with open(file, 'r') as f:
            reader = csv.reader(f)
            for rows in reader:
                for (i,v) in enumerate(row):
                    columns[i].append(v)
                procs.put(columns[9] + columns[3])
                producer(procs, cases)
    else:
        f = requests.get(file).text
        f = re.findall('(.*?),', f)
        for line in f:
            liner = [line] + list(islice(f, cpu_count()))
            for i in liner:
                procs.put(i.strip())
            producer(procs, cases)
    print('Result: ' + str(cases['Result']))
    print('Scrape: ' + str(cases['Scrape']))
    return

# Craft case identifying the correct file_type
def choose_file():
    print('1. Use .txt file')
    print('2. Use .csv file')
    print('3. Use Online Repository')
    answer = input('Choose: ')
    print()
    if answer == '1':
        case['file_type'] = 0
    elif answer == '2':
        case['file_type'] = 1
    else:
        case['file_type'] = 2
    file = input('Choose Location: ')
    return file

# Craft case to appropiate function with it's own deep
def menu():
    print('1) Test Alive Subdomain')
    print('2) Test Alive Proxy')
    print('3) Test Alive SNI')
    answer = input('Choose: ')
    print('')
    if asnwer == '1':
        print('1) Websocket Secure')
        print('2) Websocket Insecure')
        print('3) Websocket Secure with Gorilla')
        print('4) Websocket Insecure with Gorilla')
        answer = input('Choose: ')
        print('')
        if asnwer == '1':
            case['lead_type'] = 0
            case['deep'] = 0
        elif answer == '2':
            case['lead_type'] = 1
            case['deep'] = 0
        elif asnwer == '3':
            case['lead_type'] = 0
            case['deep'] = 1
        else:
            case['lead_type'] = 1
            case['deep'] = 1
    file = choose_file()
    processor(file)
    exit()
menu()

Added example to make it more clear @Booboo. As you can see, there's many function that accesses and modify the case, there's more actually. So, I can't make the case local, it stay global. The problem remains, should it keep global or need to be passed to Manager().dict() or even multiprocessing.Value() to avoid copies of case.

Also, good to mention that the case has more suplement into it such as payloads on processor(). Note that there's more data to feed, it can be more bigger, depends on the user choices in menu(). After data feeding is completed, it's time for multiprocessing to determine result.

Analogy:

Most Functions -> Modify / Craft case -> Final case -> Manager().dict() / Stay global / multiprocessing.Value -> Producer & Consumer. case stays on Read-Only.

like image 703
MC874 Avatar asked Dec 06 '25 20:12

MC874


1 Answers

Update

Based on your updated code/description where the dictionary has no keys added or deleted but integer values of existing keys might be changed by a process and the change reflected to other processes using the dictionary, then there are two approaches:

  1. Use a managed dictionary (e.g. multiprocessing.Manager().dict()). As you have noted, accessing or modifying key values are somewhat expensive since it results in making a remote method call to the actual dictionary that resides in a process created by the SyncManager instance. But multiprocessing is only advantageous if your worker function(s) are sufficiently CPU-intensive such that what is gained by running these functions in parallel more than offsets what is lost due to the additional overhead incurred by multiprocessing (e.e. process creation and inter-process communication). So you need to ask how much of your total processing is represented by the accessing of the dictionary values. If it is a significant portion then this is not the best approach but perhaps also your processes are not sufficiently CPU-intensive to warrant a multiprocessing approach.
  2. Use multiprocessing.Value instances as the values for the dictionary. Each process will have its own copy of the dictionary but the values stored are sharable because they reside in shared memory. These instances can be created with or without a lock. Updates under control of a lock to force updates to be "one-at-a-time" would be required if the updating logic is not atomic. For example, if you decide that a new value for key 'x' needs to be replaced and this new value is not based on the value of something that is being shared (which might get updated while you are executing your update logic), then you just need a statement such as d['x'].value = new_value. However, an update such as d[x].value += 1 needs to be done under control of a lock: with d['x'].get_lock(): d['x'].value += 1.

In the code below, which is a modified version of my third coding example that I originally posted (since this version is closes to the approach you are taking, i.e. using multiprocessing.Process instances with a multiprocssing.Queue), I am following the second approach outlined above and using a regular dictionary with multiprocessing.Value instances as the values. I have added code just to show how an update made by one process is visible to the other processes. Normally, the way I am updating a value, i.e. first reading the current value and then replacing the value with a new value computed from the current value, would require that these operations be done under control of a lock to ensure that no other process can be modifying this key's value while this non-atomic logic is being performed. I am, however, not doing this under control of a lock just because I don't believe that the way you are updating values requires a lock and I didn't want to imply otherwise.

from multiprocessing import Process, cpu_count, Queue, Value, current_process

def user_choices():
    props = {'deep': Value('i', 0, lock=False)}
    lead_type = input('Put Leader type: ')
    props['lead_type'] = Value('i', int(lead_type), lock=False)
    return props

def processing():
    props = user_choices()
    processors = [ 'blah', 'blah', 'blah', 'blah', 'blah', 'blah', 'blah', 'blah' ]
    queue = Queue()
    n_processors = cpu_count()
    for p in processors:
        queue.put(p)
    for _ in range(n_processors):
        # Use None as the sentinel so it cannot be mistaken for actual data:
        queue.put(None)

    procs = [Process(target=processes, args=(queue, props)) for _ in range(n_processors)]
    for p in procs:
        p.start()
    for p in procs:
        p.join()

def processes(queue, props):
    import time

    while True:
        process = queue.get()
        if process is None: # Sentinel
            break
        v = props['lead_type']
        if v.value == 1:
            new_value = 2
        else:
            new_value = 1
        v.value = new_value
        print(f"Process {current_process().pid} is setting props['lead_type'] to {new_value}: {process}")
        time.sleep(.1) # Give other processes a chance to rum

# For platforms that use *spawn*, such as windows:
if __name__ == '__main__':
    processing()

Prints:

Put Leader type: 1
Process 8764 is setting props['lead_type'] to 2: blah
Process 23740 is setting props['lead_type'] to 1: blah
Process 15276 is setting props['lead_type'] to 2: blah
Process 8984 is setting props['lead_type'] to 1: blah
Process 18300 is setting props['lead_type'] to 2: blah
Process 10740 is setting props['lead_type'] to 1: blah
Process 17796 is setting props['lead_type'] to 2: blah
Process 7880 is setting props['lead_type'] to 1: blah

Original Posting

A few observations:

  1. Your current code will result in all of your children processes except for one never terminating: You need to put N instances of 'ends' to the queue where N is the number of child processes you are creating so that each process knows that there is no more data. Right now you have only put a single instance of 'ends' on the queue so only one child process can get it and the others will wait indefinitely doing a blocking get against an empty queue.
  2. You can greatly simplify the code and use a standard dictionary by using a multiprocessing pool, e.g. a militprocessing.Pool. The idea is to have the main process initialize the props dictionary and then initialize within each pool process' address space a global variable pool initialized appropriately. Alternatively, you could pass the props dictionary as an an argument to processes. Of course, in your current code you could have also passed it as additional argument.
  3. Multiprocessing will perform worse unless function processes is sufficiently CPU-intensive.
from multiprocessing import Pool

def init_pool_processes(the_props):
    """
    Initialize global variable props for each pool process.
    """
    global props

    props = the_props

def user_choices():
    props = {'deep': 0}
    lead_type = input('Put Leader type: ')
    props['lead_type'] = int(lead_type)
    return props

def processing():
    props = user_choices()
    processors = [ 'blah', 'blah', 'blah', 'blah', 'blah', 'blah', 'blah', 'blah' ]

    with Pool(initializer=init_pool_processes, initargs=(props,)) as pool:
        pool.map(processes, processors)

def processes(process):
    print(f"Lead {props['lead_type']}: {process}")

# For platforms that use *spawn*, such as windows:
if __name__ == '__main__':
    processing()

If you want to use individual multiprocessing.Process instances then a simpler approach would be to use a multiprocessing.JoinableQueue with daemon child processes.

from multiprocessing import Process, cpu_count, JoinableQueue

def user_choices():
    props = {'deep': 0}
    lead_type = input('Put Leader type: ')
    props['lead_type'] = int(lead_type)
    return props

def processing():
    props = user_choices()
    processors = [ 'blah', 'blah', 'blah', 'blah', 'blah', 'blah', 'blah', 'blah' ]
    queue = JoinableQueue()
    for p in processors:
        queue.put(p)

    for _ in range(cpu_count()):
        # A daemon process will terminate automatically when the main
        # process terminates:
        Process(target=processes, args=(queue, props), daemon=True).start()

    # Wait for all queue items to have been processed:
    queue.join()

def processes(queue, props):
    while True:
        process = queue.get()
        print(f"Lead {props['lead_type']}: {process}")
        queue.task_done() # Show we have finished processing the item

# For platforms that use *spawn*, such as windows:
if __name__ == '__main__':
    processing()

If you use a multiprocessing.Queue instance, then you need to put N sentinel items (where N is the number of child processes) on the queue that signify there is no more data to be gotten from the queue. I think that using None is a better choice of a sentinel value than 'ends' as it could not possibly be mistaken for actual data:

from multiprocessing import Process, cpu_count, Queue

def user_choices():
    props = {'deep': 0}
    lead_type = input('Put Leader type: ')
    props['lead_type'] = int(lead_type)
    return props

def processing():
    props = user_choices()
    processors = [ 'blah', 'blah', 'blah', 'blah', 'blah', 'blah', 'blah', 'blah' ]
    queue = Queue()
    n_processors = cpu_count()
    for p in processors:
        queue.put(p)
    for _ in range(n_processors):
        # Use None as the sentinel so it cannot be mistaken for actual data:
        queue.put(None)

    procs = [Process(target=processes, args=(queue, props)) for _ in range(n_processors)]
    for p in procs:
        p.start()
    for p in procs:
        p.join()

def processes(queue, props):
    while True:
        process = queue.get()
        if process is None: # Sentinel
            break
        print(f"Lead {props['lead_type']}: {process}")

# For platforms that use *spawn*, such as windows:
if __name__ == '__main__':
    processing()

All three code examples above result in the following output:

Put Leader type: 1
Lead 1: blah
Lead 1: blah
Lead 1: blah
Lead 1: blah
Lead 1: blah
Lead 1: blah
Lead 1: blah
Lead 1: blah
like image 190
Booboo Avatar answered Dec 08 '25 09:12

Booboo



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!