I realize I could use the Pool class and probably get what I needed, but I want a little finer control over my problem. I have more jobs than I do processors, so I don't want them to run all at one time.
For instance:
from multiprocessing import Process,cpu_count
for dir_name in directories:
src_dir = os.path.join(top_level,dir_name)
dst_dir = src_dir.replace(args.src_dir,args.target_dir)
p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
p.start()
However, if I have more than 16 directories, I then will start more jobs than I have processors. Here was my solution that is really hack.
from multiprocessing import Process,cpu_count
jobs = []
for dir_name in directories:
src_dir = os.path.join(top_level,dir_name)
dst_dir = src_dir.replace(args.src_dir,args.target_dir)
p = Process(target=transfer_directory, args=(src_dir, dst_dir,))
jobs.append(p)
alive_jobs = []
while jobs:
if len(alive_jobs) >= cpu_count():
time.sleep(5)
print alive_jobs
for aj in alive_jobs:
if aj.is_alive():
continue
else:
print "job {} removed".format(aj)
alive_jobs.remove(aj)
continue
for job in jobs:
if job.is_alive():
continue
job.start()
alive_jobs.append(job)
print alive_jobs
jobs.remove(job)
if len(alive_jobs) >= cpu_count():
break
Is there a better solution using the built in tools?
I wanna share my idea here: create number of processes equals to cpu_count(), use a Queue stores all your directories,and pass the Queue into your transfer_directory
method, takes a dir_name
out from the Queue once a process finishes its work. A draft looks like this:
NUM_OF_PROCESSES = multiprocessing.cpu_count()
TIME_OUT_IN_SECONDS = 60
for dir_name in directories:
my_queue.put(dir_name)
# creates processes that equals to number of CPU
processes = [multiprocessing.Process(target=transfer_directory, args=(my_queue,)) for x in range(NUM_OF_PROCESSES)]
# starts processes
for p in processes:
p.start()
# blocks the calling thread
for p in processes:
p.join()
def transfer_directory(my_queue):
"""processes element of directory queue if queue is not empty"""
while my_queue is not empty:
dir_name = my_queue.get(timeout=TIME_OUT_IN_SECONDS)
src_dir = os.path.join(top_level,dir_name)
dst_dir = src_dir.replace(args.src_dir,args.target_dir)
Edit:
It also works efficient for reading a large file.
I was struggling how to read a huge file(it was more than 10 million lines) using multiprocessing
for a while, and finally I use a single process starts producer(a_queue)
that just reads and puts lines into the queue, and then start multiple consumers(a_queue)
to take lines from a_queue
and do time-consuming work, and it works properly for me.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With