Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to efficiently match a list of timestamps to a list of timestamp ranges in Pandas?

I have 3 Pandas dataframes

df_a = pd.DataFrame(data={
  'id': [1, 5, 3, 2],
  'ts': [3, 5, 11, 14],
  'other_cols': ['...'] * 4
})

df_b = pd.DataFrame(data={
  'id': [2, 1, 3],
  'ts': [7, 8, 15],
  'other_cols': ['...'] * 3
})

df_c = pd.DataFrame(data={
  'id': [154, 237, 726, 814, 528, 237, 248, 514],
  'ts': [1, 2, 4, 6, 9, 10, 12, 13],
  'other_cols': ['...'] * 8
})

Here is the problem I need to solve.

  • for every id in df_a find the corresponding id in df_b and their timestamps. Lets assume ts_a and ts_b.
  • find all the rows in df_c between min(ts_a, ts_b) and max(ts_a, ts_b) and calculate some custom function on these rows. This function can be a pd function (in 95% of the time) but it can be any python function.

Here are examples of rows for each ids (id, ts):

  • id 1: [726, 4], [814, 6]
  • id 2: [528, 9], [237, 10], [248, 12], [514, 13]
  • id 3: [248, 12], [514, 13]
  • id 5: can be found only in A, but not in B, so nothing should be done

The output does not really matter, so anything that can map id to f(rows for that id) would do the job.

For example let's assume that I need to apply a simple len function on results, I will get the following results

id res
1 2
2 4
3 2

If my function is max(ts) - min(ts), the results are:

id res
1 2 = 6 - 4
2 4 = 13 - 9
3 1 = 13 - 12

Here are the assumptions on dataframes:

  • ids in each corresponding tables are unique
  • each dataframe is sorted by ts
  • there might exist id in df_a which does not exist in df_b and wise versa (but the percentage of missed ids is less than 1%)
  • tables A/B can be on the size of tens of millions, table C is on the size of hundreds of millions
  • although theoretically there can be any number of rows between timestamps, empirical observations found that median number is in two digit number and the maximum is slightly more than a thousand

My working solutions

Attempt 1

  • create a dictionary id -> ts from df_b. Linear in terms of length of df_b
  • create a sorted list of ts, other_cols from df_c. Linear in terms of df_c as it is already sorted by ts
  • iterate over df_a, then for each id find the ts in dictionary. Then 2 times do binary search in sorted list to find the edges of the data which should be analyzed. Then apply the function

Attempt 2

  • combine all the dataframe in one and order by ts df = pd.concat([df_a, df_b, df_c]).sort_values(by='ts').reset_index(drop=True)
  • iterate over this dataframe in a sliding window approach and maintain dictionary seen_ids (id -> index) where you put ids from table A/B. If you see the id, in this dictionary, then df.iloc[index_1:index_2], filter them to only rows in C and apply the function

Both attempts work correctly and run in loglinear time but for my data it takes ~20-30 mins to run, which is bearable but not ideal. On top of this there is an issue with additional memory requirement to store additional data.

My question to Pandas gurus

Can this be achieved with pure Pandas and be more efficient than my custom implementation?

like image 693
Salvador Dali Avatar asked Dec 18 '25 08:12

Salvador Dali


2 Answers

Here is my latest attempt. I think it is pretty fast but of course the speed depends entirely on the contents of the tables you try it on. Let me know how it works for you.

Synthetic data generation:

import random
import pandas as pd


a_len = int(1e7)
c_len = int(1e8)

df_a = pd.DataFrame(data={
  'id': random.sample(population=range(a_len), k=int(a_len * .99)),
  'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
  'other_cols': ['...'] * int(a_len * .99)
})
df_a.sort_values(by=["ts"], inplace=True)

df_b = pd.DataFrame(data={
  'id': random.sample(population=range(a_len), k=int(a_len * .99)),
  'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
  'other_cols': ['...'] * int(a_len * .99)
})
df_b.sort_values(by=["ts"], inplace=True)

df_c = pd.DataFrame(data={
  'id': range(c_len),
  'ts': random.choices(population=range(int(a_len * 1e7)), k=c_len),
  'other_cols': ['...'] * c_len
})
df_c.sort_values(by=["ts"], inplace=True)

Some stats on an example generation of these tables is:

size_by_id = df_c_labeled.groupby(by=["id"]).size()

size_by_id.max()
>>> 91

size_by_id.median()
>>> 26.0

The algorithm, utilizing pandas.IntervalIndex:

import functools

import numpy as np
import pandas as pd


def cartesian_product(*arrays):
    """https://stackoverflow.com/a/11146645/7059681"""
    la = len(arrays)
    dtype = np.result_type(*arrays)
    arr = np.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(np.ix_(*arrays)):
        arr[...,i] = a
    return arr.reshape(-1, la).T


# inner join on id
df_ts = pd.merge(
    left=df_a[["id", "ts"]],
    right=df_b[["id", "ts"]],
    how="inner",
    on="id",
    suffixes=["_a", "_b"]
)

# a = min ts, b = max ts
df_ts["ts_a"], df_ts["ts_b"] = (
    df_ts[["ts_a", "ts_b"]].min(axis=1),
    df_ts[["ts_a", "ts_b"]].max(axis=1),
)

a_min = df_ts["ts_a"].min()
b_max = df_ts["ts_b"].max()

interval_index = pd.IntervalIndex.from_arrays(
    left=df_ts["ts_a"],
    right=df_ts["ts_b"],
    closed="both",
)

# rename to avoid collisions
df_c.rename(columns={"id": "id_c", "ts": "ts_c"}, inplace=True)

ts_c = df_c["ts_c"].to_numpy()

df_c_idxs_list, df_ts_idxs_list = [], []

# the first item in ts_c that is at least equal to a_min
c_lo = 0
while ts_c[c_lo] < a_min:
    c_lo += 1
c_idx = c_lo
c_hi = len(ts_c)

while c_lo < c_hi and ts_c[c_lo] <= b_max:
    # the index of the next greatest ts in ts_c
    # depending on how often you many duplicate values you have in ts_c,
    # it may be faster to binary search instead of incrementing one by one
    # c_idx = bisect.bisect_right(a=ts_c, x=ts_c[c_lo], lo=c_idx, hi=c_hi)
    while c_idx < c_hi and ts_c[c_idx] == ts_c[c_lo]:
        c_idx += 1

    # the indicies of the intervals containing ts_c[c_lo]
    unique_ts_idxs = np.where(interval_index.contains(ts_c[c_lo]))[0]

    # all the indicies equal to ts_c[c_lo]
    unique_c_idxs = df_c.iloc[c_lo: c_idx].index
    
    # all the pairs of these indicies
    c_idxs, ts_idxs = cartesian_product(unique_c_idxs, unique_ts_idxs)

    df_c_idxs_list.append(c_idxs)
    df_ts_idxs_list.append(ts_idxs)

    c_lo = c_idx

df_c_idxs = np.concatenate(df_c_idxs_list)
df_ts_idxs = np.concatenate(df_ts_idxs_list)

df_c_labeled = pd.concat(
    [
        df_ts.loc[df_ts_idxs, :].reset_index(drop=True),
        df_c.loc[df_c_idxs, :].reset_index(drop=True)
    ],
    axis=1
)

print(df_c_labeled)
   id  ts_a  ts_b  id_c  ts_c other_cols
0   1     3     8   726     4        ...
1   1     3     8   814     6        ...
2   2     7    14   528     9        ...
3   2     7    14   237    10        ...
4   3    11    15   248    12        ...
5   2     7    14   248    12        ...
6   3    11    15   514    13        ...
7   2     7    14   514    13        ...

Now we can just do some groupby stuff:

id_groupby = df_c_labeled.groupby(by="id")

id_groupby["ts_c"].size()
id
1    2
2    4
3    2
Name: ts_c, dtype: int64

id_groupby["ts_c"].max() - id_groupby["ts_c"].min()
id
1    2
2    4
3    1
Name: ts_c, dtype: int64
like image 198
ringo Avatar answered Dec 20 '25 00:12

ringo


Overview

This problem can be efficiently solved in two parts.

The first part consists in finding the matching rows in df_a and df_b as well as the range of rows of df_c based on ts. This can can be done very quickly using a parallel Numba implementation (while consuming only a fraction of the input datasets).

The second part consists in computing the user-defined function based that are possibly Pandas ones. This later operation is inherently slow and memory expensive. Indeed, Pandas functions operate mainly on dataframes/series which are not efficient here. Iterating over Pandas dataframe containing generic pure-Python type is known to be painfully slow. Building many small dataframe is slow (there is a pretty high overhead to even create an empty dataframe) but memory efficient. Creating a big dataframe is significantly faster but this is clearly not memory efficient since it nearly force many rows to be replicated (dozens or even hundreds of time due to the number of items in df_c to extract per id of df_a/df_b. In the end, the fastest Pandas solution will be far slower than the optimal time (by at least one order of magnitude). Also note that parallelism will barely help here because of the GIL preventing multithreaded code to be fast and pickling preventing multiprocessing to be fast. In addition, tools like Numba or Cython cannot help for user-defined generic pure-Python functions. AFAIK, the only way to make this part really fast and memory efficient is simply not to apply generic pandas functions on huge dataframes or generic pure-Python functions.


Part 1: Extracting dataframe rows

The first part can be done using a parallel Numba (JIT compiler) code. While Numba do not supports Pandas directly, Pandas uses mainly Numpy internally which is well supported by Numba. The computation can be split in many chunks computed efficiently in parallel. The main idea is to build a fast index of df_b so to merge df_a and df_b in linear time, and use a binary search so to find ranges of matching rows in df_c. The resulting code is very fast. The thing is the output format is not very efficient for the part 2. Here is the code:

import numba as nb
import numpy as np
import pandas as pd

# Feel free to change the signature based on the actual type of your dataframe. Smaller types take less memory and tends to be faster because of that.
@nb.njit('(int64[::1], int64[::1], int64[::1], int64[::1], int64[::1])', parallel=True)
def find_matching_rows(df_a_id, df_a_ts, df_b_id, df_b_ts, df_c_ts):
    # Build an index of `df_b` IDs

    b_index = {df_b_id[i]: i for i in range(df_b_id.size)}


    # Mark the `df_a` rows found in `df_b` (parallel)

    found = np.empty(df_a_id.size, np.bool_)
    for a_row in nb.prange(df_a_id.size):
        a_id = df_a_id[a_row]
        found[a_row] = a_id in b_index


    # Count the number of valid rows (parallel)

    count = 0
    for a_row in nb.prange(df_a_id.size):
        count += found[a_row]


    # Count the number of valid item per chunk and
    # the offsets of the output of each chunk (mainly parallel)

    chunk_size = 32768
    chunk_count = (found.size + chunk_size - 1) // chunk_size
    count_by_chunk = np.empty(chunk_count, np.int32)

    for i in nb.prange(chunk_count):
        count_by_chunk[i] = np.sum(found[i*chunk_size:(i+1)*chunk_size])

    out_offsets = np.zeros(chunk_count + 1, np.int32)

    for i in range(chunk_count):
        out_offsets[i+1] = out_offsets[i] + count_by_chunk[i]

    assert out_offsets[chunk_count] == count


    # Main chunk-based computation (parallel)

    a_rows = np.empty(count, np.int32)              # `df_a` indices
    b_rows = np.empty(count, np.int32)              # `df_b` indices
    c_rows = np.empty((count, 2), np.int32)         # Start/end indices

    for chunk_id in nb.prange(chunk_count):
        a_row_start = chunk_id * chunk_size
        a_row_end = min(df_a_id.size, a_row_start + chunk_size)
        offset = out_offsets[chunk_id]
        for a_row in range(a_row_start, a_row_end):
            # Discard ids of `df_a` not in `df_b`
            if not found[a_row]:
                continue
            a_id = df_a_id[a_row]
            b_row = b_index[a_id]
            ts_a, ts_b = df_a_ts[a_row], df_b_ts[b_row]
            ts_min, ts_max = min(ts_a, ts_b), max(ts_a, ts_b)
            c_start_row = np.searchsorted(df_c_ts, ts_min, 'left')  # Included
            c_end_row = np.searchsorted(df_c_ts, ts_max, 'right')   # Excluded
            # If the is no row found in `df_c`
            if c_start_row >= c_end_row:
                c_start_row = c_end_row = -1                        # Not discarded (may be useful)
            # Save results
            a_rows[offset] = a_row
            b_rows[offset] = b_row
            c_rows[offset, 0] = c_start_row
            c_rows[offset, 1] = c_end_row
            offset += 1

    return (a_rows, b_rows, c_rows)

Here is the way to call the function:

a_rows, b_rows, c_rows = find_matching_rows(
    df_a['id'].values, df_a['ts'].values,
    df_b['id'].values, df_b['ts'].values,
    df_c['ts'].values
)

Part 2: dataframe & user-defined functions

As seen before, generic approaches are inherently inefficient (for both speed and memory-usage). One solution is to tweak your operation to apply them directly in the previous Numba code. This would make the overall implementation both very fast (ie. parallel and JIT-compiled) and memory efficient (ie. computed on the fly -- no need for a huge temporary dataframe). That being said, Numba do not support generic pure-Python object types nor pandas functions so this can require some non-trivial code rework regarding your actual dataframe.

The inefficient alternative is to create a big temporary dataframe from index-based arrays previously created by find_matching_rows. Here is an example of Numba code to do that:

@nb.njit('(int32[::1], int32[::1], int32[:,::1])')
def build_df_index(a_rows, b_rows, c_rows):
    n = a_rows.size

    # Count he total number of rows to be computed in df_c
    count = 0
    for i in range(n):
        count += c_rows[i, 1] - c_rows[i, 0]

    new_a_rows = np.empty(count, np.int32)
    new_b_rows = np.empty(count, np.int32)
    new_c_rows = np.empty(count, np.int32)
    offset = 0

    for i in range(n):
        for j in range(c_rows[i, 1] - c_rows[i, 0]):
            new_a_rows[offset] = a_rows[i]
            new_b_rows[offset] = b_rows[i]
            new_c_rows[offset] = c_rows[i,0] + j
            offset += 1

    return (new_a_rows, new_b_rows, new_c_rows)

The resulting index arrays can be use to create the final dataframe with df_a.iloc[new_a_rows], df_b.iloc[new_b_rows] and df_c.iloc[new_c_rows] for example. If your actual dataframe contains only uniform types or ones that Numba supports, then you can directly generate this temporary dataframe with Numba (significantly faster than Pandas iloc, especially if performed in parallel).

like image 43
Jérôme Richard Avatar answered Dec 20 '25 01:12

Jérôme Richard