Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to get the intersection of sets using dask?

I have a big dataset (50 million rows) in which I need to do some row-wise computations like getting the intersection of two sets (each in a different column)

e.g.

col_1:{1587004, 1587005, 1587006, 1587007}
col_2:{1587004, 1587005}
col_1.intersection(col_2) = {1587004, 1587005}

This works fine for my dummy dataset (100 000) rows. However, when I try the same with the actual one the memory runs out

My coding works using pandas 1:1 porting it to dask does not work NotImplementedError: Series getitem in only supported for other series objects with matching partition structure

playing around with map_partitions so far did not work

working code:

df["intersection"] = [col_1.intersection(col_2) for col_1,col2 in zip(df.col_1,df.col_2)]

replacing pandas df with dask df runs in the non implemented error:

ddf["intersection"] = [col_1.intersection(col_2) for col_1,col2 in zip(df.col_1,df.col_2)]

using map_partions "works" but I do not get how I assign the result to my existing ddf

def intersect_sets(df, col_1, col_2):
    result = df[col_1].intersection(df[col_2])
    return result

newCol = ddf.map_partitions(lambda df : df.apply(lambda series: intersect_sets(series,"col_1","col_2"),axis=1),meta=str).compute()

just doing:

ddf['result'] = newCol

Leads to: ValueError: Not all divisions are known, can't align partitions. Please use set_index to set the index.

update: resetting the index removes the error however then the column containing the intersections does not any longer match the other two columns. It looks like the order got messed up...

ddf2 = ddf.reset_index().set_index('index')
ddf2 ['result'] = result

I would expect a dask dataframe with the following columns

col_1:{1587004, 1587005, 1587006, 1587007}
col_2:{1587004, 1587005}
col_3:{1587004, 1587005}

Not only a perfectly working solution is appreciated also just some insights on how map_partitions works would help me already a lot :)

update: Thanks to M.Rocklin I figured it out. For the future me or others stumbling about this question:

ddf = ddf.assign(
       new_col = ddf.map_partitions(
           lambda df : df.apply(
                        lambda series:intersect_sets(
                                series,"col_1","col_2"),axis=1),meta=str)
 )
 df = ddf.compute()
like image 430
Ivo Leist Avatar asked Feb 02 '26 13:02

Ivo Leist


1 Answers

If you have a function that works on pandas dataframes:

def f(df: pandas.DataFrame) -> pandas.Series:
    return df.apply(...)

Then you can map this function across your partitions

df['new'] = df.map_partitions(f)

I think that your problem is that you've called a compute here needlessly, and so you're trying to push a pandas dataframe into a dask dataframe.

# Don't do this
new = df.map_partitions(f).compute() 
df['new'] = new  # tries to put a pandas dataframe into a dask dataframe
like image 144
MRocklin Avatar answered Feb 04 '26 02:02

MRocklin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!