I'm trying to send a 2GB CPython read-only object (can be pickled) to dask distributed workers via apply()
. This ends up consuming a lot of memory for processes/ threads (14+ GB).
Is there a way to load the object only once into memory and have the workers concurrently use the object?
I have 2 Dask series Source_list and Pattern_list, containing 7 Million and 3 Million strings respectively. I'm trying to find all sub-string matches in Source_list (7M) from Pattern_list(3M).
To speed up the sub-string search, I use the pyahocorasick package to create a Cpython data-structure (a class object) from Pattern_list (The object is pickle-able).
distributed.worker - WARNING - Memory use is high but worker has no data to
store to disk. Perhaps some other process is leaking memory? Process memory:
2.85 GB -- Worker memory limit: 3.00 GB
running with dask distributed with memory limit increased to 8GB/16GB:
Threads
distributed.worker - WARNING - Memory use is high but worker has no
data to store to disk. Perhaps some other process is leaking
memory?
Process memory: 14.5 GB -- Worker memory limit: 16.00 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
Processes Takes more than 2.5 hours to process and I've never seen it finish (left it running for 8+ hours before cancelling). It also consumes 10+ GB of memory
Source_list.str.find_all(Pattern_list)
takes more than 2.5 hours.# OS = Windows 10
# RAM = 16 GB
# CPU cores = 8
# dask version 1.1.1
import dask.dataframe as dd
import ahocorasick
from dask.distributed import Client, progress
def create_ahocorasick_trie(pattern_list):
A = ahocorasick.Automaton()
for index, item in pattern_list.iteritems():
A.add_word(item,item)
A.make_automaton()
return A
if __name__ == '__main__':
client = Client(memory_limit="12GB",processes=False)
# Using Threading, because, the large_object seems to get copied in memory
# for each process when processes = True
Source_list = dd.read_parquet("source_list.parquet")
Pattern_list = dd.read_parquet("pattern_list.parquet")
# Note: 'source_list.parquet' and 'pattern_list.parquet' are generated via dask
large_object = create_ahocorasick_trie(Pattern_list)
result = Source_list.apply(lambda source_text: {large_object.iter(source_text)}, meta=(None,'O'))
# iter() is an ahocorasick Cpython method
progress(result.head(10))
client.close()
The short answer is to wrap it in a dask.delayed call
big = dask.delayed(big)
df.apply(func, extra=big)
Dask will move it around as necessary and treat it as its own piece of data. That being said, it will need to exist on every worker, so you should have significantly more RAM per worker than that thing takes up. (at least 4x or so more).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With