I have 3 Pandas dataframes
df_a = pd.DataFrame(data={
'id': [1, 5, 3, 2],
'ts': [3, 5, 11, 14],
'other_cols': ['...'] * 4
})
df_b = pd.DataFrame(data={
'id': [2, 1, 3],
'ts': [7, 8, 15],
'other_cols': ['...'] * 3
})
df_c = pd.DataFrame(data={
'id': [154, 237, 726, 814, 528, 237, 248, 514],
'ts': [1, 2, 4, 6, 9, 10, 12, 13],
'other_cols': ['...'] * 8
})
Here is the problem I need to solve.
id in df_a find the corresponding id in df_b and their timestamps. Lets assume ts_a and ts_b.df_c between min(ts_a, ts_b) and max(ts_a, ts_b) and calculate some custom function on these rows. This function can be a pd function (in 95% of the time) but it can be any python function.Here are examples of rows for each ids (id, ts):
[726, 4], [814, 6][528, 9], [237, 10], [248, 12], [514, 13][248, 12], [514, 13]The output does not really matter, so anything that can map id to f(rows for that id) would do the job.
For example let's assume that I need to apply a simple len function on results, I will get the following results
| id | res |
|---|---|
| 1 | 2 |
| 2 | 4 |
| 3 | 2 |
If my function is max(ts) - min(ts), the results are:
| id | res |
|---|---|
| 1 | 2 = 6 - 4 |
| 2 | 4 = 13 - 9 |
| 3 | 1 = 13 - 12 |
Here are the assumptions on dataframes:
ids in each corresponding tables are uniquetsid in df_a which does not exist in df_b and wise versa (but the percentage of missed ids is less than 1%)My working solutions
Attempt 1
id -> ts from df_b. Linear in terms of length of df_bts, other_cols from df_c. Linear in terms of df_c as it is already sorted by tsAttempt 2
df = pd.concat([df_a, df_b, df_c]).sort_values(by='ts').reset_index(drop=True)seen_ids (id -> index) where you put ids from table A/B. If you see the id, in this dictionary, then df.iloc[index_1:index_2], filter them to only rows in C and apply the functionBoth attempts work correctly and run in loglinear time but for my data it takes ~20-30 mins to run, which is bearable but not ideal. On top of this there is an issue with additional memory requirement to store additional data.
My question to Pandas gurus
Can this be achieved with pure Pandas and be more efficient than my custom implementation?
Here is my latest attempt. I think it is pretty fast but of course the speed depends entirely on the contents of the tables you try it on. Let me know how it works for you.
Synthetic data generation:
import random
import pandas as pd
a_len = int(1e7)
c_len = int(1e8)
df_a = pd.DataFrame(data={
'id': random.sample(population=range(a_len), k=int(a_len * .99)),
'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
'other_cols': ['...'] * int(a_len * .99)
})
df_a.sort_values(by=["ts"], inplace=True)
df_b = pd.DataFrame(data={
'id': random.sample(population=range(a_len), k=int(a_len * .99)),
'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
'other_cols': ['...'] * int(a_len * .99)
})
df_b.sort_values(by=["ts"], inplace=True)
df_c = pd.DataFrame(data={
'id': range(c_len),
'ts': random.choices(population=range(int(a_len * 1e7)), k=c_len),
'other_cols': ['...'] * c_len
})
df_c.sort_values(by=["ts"], inplace=True)
Some stats on an example generation of these tables is:
size_by_id = df_c_labeled.groupby(by=["id"]).size()
size_by_id.max()
>>> 91
size_by_id.median()
>>> 26.0
The algorithm, utilizing pandas.IntervalIndex:
import functools
import numpy as np
import pandas as pd
def cartesian_product(*arrays):
"""https://stackoverflow.com/a/11146645/7059681"""
la = len(arrays)
dtype = np.result_type(*arrays)
arr = np.empty([len(a) for a in arrays] + [la], dtype=dtype)
for i, a in enumerate(np.ix_(*arrays)):
arr[...,i] = a
return arr.reshape(-1, la).T
# inner join on id
df_ts = pd.merge(
left=df_a[["id", "ts"]],
right=df_b[["id", "ts"]],
how="inner",
on="id",
suffixes=["_a", "_b"]
)
# a = min ts, b = max ts
df_ts["ts_a"], df_ts["ts_b"] = (
df_ts[["ts_a", "ts_b"]].min(axis=1),
df_ts[["ts_a", "ts_b"]].max(axis=1),
)
a_min = df_ts["ts_a"].min()
b_max = df_ts["ts_b"].max()
interval_index = pd.IntervalIndex.from_arrays(
left=df_ts["ts_a"],
right=df_ts["ts_b"],
closed="both",
)
# rename to avoid collisions
df_c.rename(columns={"id": "id_c", "ts": "ts_c"}, inplace=True)
ts_c = df_c["ts_c"].to_numpy()
df_c_idxs_list, df_ts_idxs_list = [], []
# the first item in ts_c that is at least equal to a_min
c_lo = 0
while ts_c[c_lo] < a_min:
c_lo += 1
c_idx = c_lo
c_hi = len(ts_c)
while c_lo < c_hi and ts_c[c_lo] <= b_max:
# the index of the next greatest ts in ts_c
# depending on how often you many duplicate values you have in ts_c,
# it may be faster to binary search instead of incrementing one by one
# c_idx = bisect.bisect_right(a=ts_c, x=ts_c[c_lo], lo=c_idx, hi=c_hi)
while c_idx < c_hi and ts_c[c_idx] == ts_c[c_lo]:
c_idx += 1
# the indicies of the intervals containing ts_c[c_lo]
unique_ts_idxs = np.where(interval_index.contains(ts_c[c_lo]))[0]
# all the indicies equal to ts_c[c_lo]
unique_c_idxs = df_c.iloc[c_lo: c_idx].index
# all the pairs of these indicies
c_idxs, ts_idxs = cartesian_product(unique_c_idxs, unique_ts_idxs)
df_c_idxs_list.append(c_idxs)
df_ts_idxs_list.append(ts_idxs)
c_lo = c_idx
df_c_idxs = np.concatenate(df_c_idxs_list)
df_ts_idxs = np.concatenate(df_ts_idxs_list)
df_c_labeled = pd.concat(
[
df_ts.loc[df_ts_idxs, :].reset_index(drop=True),
df_c.loc[df_c_idxs, :].reset_index(drop=True)
],
axis=1
)
print(df_c_labeled)
id ts_a ts_b id_c ts_c other_cols
0 1 3 8 726 4 ...
1 1 3 8 814 6 ...
2 2 7 14 528 9 ...
3 2 7 14 237 10 ...
4 3 11 15 248 12 ...
5 2 7 14 248 12 ...
6 3 11 15 514 13 ...
7 2 7 14 514 13 ...
Now we can just do some groupby stuff:
id_groupby = df_c_labeled.groupby(by="id")
id_groupby["ts_c"].size()
id
1 2
2 4
3 2
Name: ts_c, dtype: int64
id_groupby["ts_c"].max() - id_groupby["ts_c"].min()
id
1 2
2 4
3 1
Name: ts_c, dtype: int64
This problem can be efficiently solved in two parts.
The first part consists in finding the matching rows in df_a and df_b as well as the range of rows of df_c based on ts. This can can be done very quickly using a parallel Numba implementation (while consuming only a fraction of the input datasets).
The second part consists in computing the user-defined function based that are possibly Pandas ones. This later operation is inherently slow and memory expensive. Indeed, Pandas functions operate mainly on dataframes/series which are not efficient here. Iterating over Pandas dataframe containing generic pure-Python type is known to be painfully slow. Building many small dataframe is slow (there is a pretty high overhead to even create an empty dataframe) but memory efficient. Creating a big dataframe is significantly faster but this is clearly not memory efficient since it nearly force many rows to be replicated (dozens or even hundreds of time due to the number of items in df_c to extract per id of df_a/df_b. In the end, the fastest Pandas solution will be far slower than the optimal time (by at least one order of magnitude). Also note that parallelism will barely help here because of the GIL preventing multithreaded code to be fast and pickling preventing multiprocessing to be fast. In addition, tools like Numba or Cython cannot help for user-defined generic pure-Python functions. AFAIK, the only way to make this part really fast and memory efficient is simply not to apply generic pandas functions on huge dataframes or generic pure-Python functions.
The first part can be done using a parallel Numba (JIT compiler) code. While Numba do not supports Pandas directly, Pandas uses mainly Numpy internally which is well supported by Numba. The computation can be split in many chunks computed efficiently in parallel. The main idea is to build a fast index of df_b so to merge df_a and df_b in linear time, and use a binary search so to find ranges of matching rows in df_c. The resulting code is very fast. The thing is the output format is not very efficient for the part 2. Here is the code:
import numba as nb
import numpy as np
import pandas as pd
# Feel free to change the signature based on the actual type of your dataframe. Smaller types take less memory and tends to be faster because of that.
@nb.njit('(int64[::1], int64[::1], int64[::1], int64[::1], int64[::1])', parallel=True)
def find_matching_rows(df_a_id, df_a_ts, df_b_id, df_b_ts, df_c_ts):
# Build an index of `df_b` IDs
b_index = {df_b_id[i]: i for i in range(df_b_id.size)}
# Mark the `df_a` rows found in `df_b` (parallel)
found = np.empty(df_a_id.size, np.bool_)
for a_row in nb.prange(df_a_id.size):
a_id = df_a_id[a_row]
found[a_row] = a_id in b_index
# Count the number of valid rows (parallel)
count = 0
for a_row in nb.prange(df_a_id.size):
count += found[a_row]
# Count the number of valid item per chunk and
# the offsets of the output of each chunk (mainly parallel)
chunk_size = 32768
chunk_count = (found.size + chunk_size - 1) // chunk_size
count_by_chunk = np.empty(chunk_count, np.int32)
for i in nb.prange(chunk_count):
count_by_chunk[i] = np.sum(found[i*chunk_size:(i+1)*chunk_size])
out_offsets = np.zeros(chunk_count + 1, np.int32)
for i in range(chunk_count):
out_offsets[i+1] = out_offsets[i] + count_by_chunk[i]
assert out_offsets[chunk_count] == count
# Main chunk-based computation (parallel)
a_rows = np.empty(count, np.int32) # `df_a` indices
b_rows = np.empty(count, np.int32) # `df_b` indices
c_rows = np.empty((count, 2), np.int32) # Start/end indices
for chunk_id in nb.prange(chunk_count):
a_row_start = chunk_id * chunk_size
a_row_end = min(df_a_id.size, a_row_start + chunk_size)
offset = out_offsets[chunk_id]
for a_row in range(a_row_start, a_row_end):
# Discard ids of `df_a` not in `df_b`
if not found[a_row]:
continue
a_id = df_a_id[a_row]
b_row = b_index[a_id]
ts_a, ts_b = df_a_ts[a_row], df_b_ts[b_row]
ts_min, ts_max = min(ts_a, ts_b), max(ts_a, ts_b)
c_start_row = np.searchsorted(df_c_ts, ts_min, 'left') # Included
c_end_row = np.searchsorted(df_c_ts, ts_max, 'right') # Excluded
# If the is no row found in `df_c`
if c_start_row >= c_end_row:
c_start_row = c_end_row = -1 # Not discarded (may be useful)
# Save results
a_rows[offset] = a_row
b_rows[offset] = b_row
c_rows[offset, 0] = c_start_row
c_rows[offset, 1] = c_end_row
offset += 1
return (a_rows, b_rows, c_rows)
Here is the way to call the function:
a_rows, b_rows, c_rows = find_matching_rows(
df_a['id'].values, df_a['ts'].values,
df_b['id'].values, df_b['ts'].values,
df_c['ts'].values
)
As seen before, generic approaches are inherently inefficient (for both speed and memory-usage). One solution is to tweak your operation to apply them directly in the previous Numba code. This would make the overall implementation both very fast (ie. parallel and JIT-compiled) and memory efficient (ie. computed on the fly -- no need for a huge temporary dataframe). That being said, Numba do not support generic pure-Python object types nor pandas functions so this can require some non-trivial code rework regarding your actual dataframe.
The inefficient alternative is to create a big temporary dataframe from index-based arrays previously created by find_matching_rows. Here is an example of Numba code to do that:
@nb.njit('(int32[::1], int32[::1], int32[:,::1])')
def build_df_index(a_rows, b_rows, c_rows):
n = a_rows.size
# Count he total number of rows to be computed in df_c
count = 0
for i in range(n):
count += c_rows[i, 1] - c_rows[i, 0]
new_a_rows = np.empty(count, np.int32)
new_b_rows = np.empty(count, np.int32)
new_c_rows = np.empty(count, np.int32)
offset = 0
for i in range(n):
for j in range(c_rows[i, 1] - c_rows[i, 0]):
new_a_rows[offset] = a_rows[i]
new_b_rows[offset] = b_rows[i]
new_c_rows[offset] = c_rows[i,0] + j
offset += 1
return (new_a_rows, new_b_rows, new_c_rows)
The resulting index arrays can be use to create the final dataframe with df_a.iloc[new_a_rows], df_b.iloc[new_b_rows] and df_c.iloc[new_c_rows] for example. If your actual dataframe contains only uniform types or ones that Numba supports, then you can directly generate this temporary dataframe with Numba (significantly faster than Pandas iloc, especially if performed in parallel).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With