I'm trying to write a method for a features pipeline that returns a polars expression. The method should take a column name as a string and an integer number of days. I want to perform a rolling count on that column using a window equal to the number of days.
There doesn't seem to be a rolling_count expression, so I attempted to use rolling_sum_by to no avail.
def temporal_rolling_count(col: str, days: int) -> pl.Expr:
return (
pl.lit(1)
.rolling_sum_by(window_size=f"{days}d", by="date_time")
.over(col)
.fill_null(0)
)
I also tried this method, which was closer but still didn't work in all cases
def temporal_rolling_count(col: str, days: int) -> pl.Expr:
return (
pl.col(col)
.cum_count()
.over(col, (pl.col("date_time") - pl.col("date_time").min()).dt.days() % days == 0)
.fill_null(0)
)
Is there anyway to achieve this by returning an expression? Or will I have to act on the DataFrame directly, maybe by using rolling?
I believe you can do it with the rolling_sum_by if you create a dummy column, which one can drop later.
Example:
df = pl.from_repr("""
┌───────┬─────────────────────┐
│ group ┆ date_time │
│ --- ┆ --- │
│ str ┆ datetime[μs] │
╞═══════╪═════════════════════╡
│ a ┆ 2023-01-01 00:00:00 │
│ a ┆ 2023-01-02 00:00:00 │
│ a ┆ 2023-01-02 00:00:00 │
│ b ┆ 2023-01-02 00:00:00 │
│ b ┆ 2023-01-05 00:00:00 │
│ b ┆ 2023-01-05 00:00:00 │
│ b ┆ 2023-01-07 00:00:00 │
└───────┴─────────────────────┘
""")
(
df.with_columns(ones = pl.lit(1))
.with_columns(
rolling_count = pl.col('ones')
.rolling_sum_by(window_size='2d', by='date_time', closed='both')
.over('group'))
.drop('ones')
)
Result:
shape: (7, 3)
┌───────┬─────────────────────┬───────────────┐
│ group ┆ date_time ┆ rolling_count │
│ --- ┆ --- ┆ --- │
│ str ┆ datetime[μs] ┆ i32 │
╞═══════╪═════════════════════╪═══════════════╡
│ a ┆ 2023-01-01 00:00:00 ┆ 1 │
│ a ┆ 2023-01-02 00:00:00 ┆ 2 │
│ a ┆ 2023-01-02 00:00:00 ┆ 3 │
│ b ┆ 2023-01-02 00:00:00 ┆ 1 │
│ b ┆ 2023-01-05 00:00:00 ┆ 1 │
│ b ┆ 2023-01-05 00:00:00 ┆ 2 │
│ b ┆ 2023-01-07 00:00:00 ┆ 3 │
└───────┴─────────────────────┴───────────────┘
As per the suggestion from @jqurious, by using .clip I was able to achieve the desired outcome without acting on the DataFrame.
def temporal_rolling_count(col: str, days: int) -> pl.Expr:
return (
pl.col(col).clip(1,1)
.rolling_sum(window_size=f"{days}d", by="date_time")
.over(col)
.fill_null(0)
)
EDIT
I managed to perform the same thing but only count when a condition was true by doing the following.
def temporal_rolling_count(col: str, days: int) -> pl.Expr:
return (
pl.when(condition).then(1).otherwise(0)
.rolling_sum(window_size=f"{days}d", by="date_time")
.over(col)
.fill_null(0)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With