I have a generator object, that loads quite big amount of data and hogs the I/O of the system. The data is too big to fit into memory all at once, hence the use of generator. And I have a consumer that all of the CPU to process the data yielded by generator. It does not consume much of other resources. Is it possible to interleave these tasks using threads?
For example I'd guess it is possible to run the simplified code below in 11 seconds.
import time, threading
lock = threading.Lock()
def gen():
for x in range(10):
time.sleep(1)
yield x
def con(x):
lock.acquire()
time.sleep(1)
lock.release()
return x+1
However, the simplest application of threads does not run in that time. It does speed up, but I assume because of parallelism between the dispatcher which does generation and the worked. But not thanks to parallelism between workers.
import joblib
%time joblib.Parallel(n_jobs=2,backend='threading',pre_dispatch=2)((joblib.delayed(con)(x) for x in gen()))
# CPU times: user 0 ns, sys: 0 ns, total: 0 ns
# Wall time: 16 s
Send your data to separate processes. I used concurrent.futures because I like the simple interface.
This runs in about 11 seconds on my computer.
from concurrent.futures import ThreadPoolExecutor
import concurrent
import threading
lock = threading.Lock()
def gen():
for x in range(10):
time.sleep(1)
yield x
def con(x):
lock.acquire()
time.sleep(1)
lock.release()
return f'{x+1}'
if __name__ == "__main__":
futures = []
with ThreadPoolExecutor() as executor:
t0 = time.time()
for x in gen():
futures.append(executor.submit(con,x))
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
print(time.time() - t0)
print('\n'.join(results))
Using 100 generator iterations (def gen(): for x in range(100):) it took about 102 seconds.
Your process may need to keep track of how much data has been sent to tasks that haven't finished to prevent swamping memory resources.
Adding some diagnostic prints to con seems to show that there might be at least two chunks of data out there at a time.
def con(x):
print(f'{x} received payload at t0 + {time.time()-t0:3.3f}')
lock.acquire()
time.sleep(1)
lock.release()
print(f'{x} released lock at t0 + {time.time()-t0:3.3f}')
return f'{x+1}'
I've created this question to see if there was an idiomatic drop-in replacement of the for-loop pattern. While wwii's answer does solve the problem, it has a caveat that the generator may get ahead of the consumer thread and swarm the memory if its output is sizeable. I also liked the joblib more.
The problem with joblib code in the question text is that gen is iterated in the main thread, so instead of dispatching the jobs it spends time waiting on gen. I've given up on trying to make sense of the scheduling is so weird when the input generator is slow with joblib. I however did manage to get it do the thing properly after moving both the producer and consumer inside the delayed function.
When the length of the iterable is actually known beforehand (e.g. a list of files to be processed one by one), the code is simple. The code below ensures that there is only one thread that does data generation and one thread does data consumption at the same time.
sync_gen,sync_con = threading.Lock(), threading.Lock()
@joblib.delayed
def work(iterable):
with sync_gen:
x = next(iterable)
with sync_con:
return con(x)
N=10
iterable = gen()
res1 = joblib.Parallel(2,'threading')(work(iterable) for x in range(N))
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
If the generator length is not known, the thread workers are better off accumulating their results rather than processing a single input.
sync_gen,sync_con = threading.Lock(), threading.Lock()
def thread_safe(gen):
try:
while True:
with sync_gen:
x = next(gen)
yield x
except StopIteration:
pass
def work2(safe_iterable):
res = []
for x in safe_iterable:
with sync_con:
res.append(con(x))
return res
iterable = gen()
de_work2= joblib.delayed(work2)
res2 = joblib.Parallel(2,'threading')(de_work2(thread_safe(iterable)) for x in range(2))
#[[1, 3, 5, 7, 9], [2, 4, 6, 8, 10]]
Or with ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
iterable = gen()
with ThreadPoolExecutor() as e:
futures = [e.submit(work2,thread_safe(iterable)) for x in range(2)]
res = [future.result() for future in futures]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With