Building off of this post, I implemented the custom mode formula, but have found issues with performance on this function. Essentially, when I enter into this aggregation, my cluster only uses one of my threads, which is not great for performance. I am doing calculations on over 150 attributes (mostly categorical data) across 16k rows, which I think I can split up into individual threads/processes and throw back together into a single dataframe later on. Note that this aggregation has to be on two columns so I might be getting worse performance for not being able to use a single column as an index.
Is there a way to incorporate dask futures or parallel processing into the aggregate calculation?
import dask.dataframe as dd
from dask.distributed import Client
from pandas import DataFrame
def chunk(s):
return s.value_counts()
def agg(s):
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()
def finalize(s):
# s is a multi-index series of the form (group, value): count. First
# manually group on the group part of the index. The lambda will receive a
# sub-series with multi index. Next, drop the group part from the index.
# Finally, determine the index with the maximum value, i.e., the mode.
level = list(range(s.index.nlevels - 1))
return (
s.groupby(level=level)
.apply(lambda s: s.reset_index(level=level, drop=True).argmax())
)
def main() -> DataFrame:
client = Client('scheduler:8786')
ddf = dd.read_csv('/sample/data.csv')
custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)
result = ddf.groupby(['a','b']).agg(custom_mode).compute()
return result
Side note, I am using Docker to spin up my scheduler and workers using the daskdev/dask (2.18.1) docker image.
In the end, I used futures to essentially parallelize the aggregation for each column. Since I had so many columns, passing each aggregation to its own worker thread saved me a bunch of time. Thanks to David for his comments as well as the article on parallel workloads from the dask documentation!
from dask.distributed import Client
from pandas import DataFrame
def chunk(s):
return s.value_counts()
def agg(s):
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()
def finalize(s):
level = list(range(s.index.nlevels - 1))
return (
s.groupby(level=level)
.apply(lambda s: s.reset_index(level=level, drop=True).idxmax())
)
def delayed_mode(ddf, groupby, col, custom_agg):
return ddf.groupby(groupby).agg({col: custom_agg}).compute()
def main() -> DataFrame:
client = Client('scheduler:8786')
ddf = dd.read_csv('/sample/data.csv')
custom_mode = dd.Aggregation('custom mode', chunk, agg, finalize)
futures = []
for col in multiple_trimmed.columns:
future = client.submit(delayed_mode, ddf, ["a", "b"], col, custom_mode_dask)
futures.append(future)
ddfs = client.gather(futures)
result = pd.concat(ddfs, axis=1)
return result
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With