I'm trying to aggregate my data by getting the sum every 30 seconds. I would like to know if the result of this aggregation is zero, this will happen if there are no rows in that 30s region.
Here's a minimal working example illustrating the result I would like with pandas, and where it falls short with pyspark.
import pandas as pd
from pyspark.sql import functions as F
df = pd.DataFrame(
[
(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-10T15:27:29+00:00"),
(25, "2017-03-10T15:27:30+00:00"),
(101, "2017-03-10T15:29:00+00:00"),
(99, "2017-03-10T15:29:29+00:00")
],
columns=["dollars", "timestamp"],
)
df["timestamp"] = pd.to_datetime(df["timestamp"])
print(df)
dollars timestamp
0 17 2017-03-10 15:27:18+00:00
1 13 2017-03-10 15:27:29+00:00
2 25 2017-03-10 15:27:30+00:00
3 101 2017-03-10 15:29:00+00:00
4 99 2017-03-10 15:29:29+00:00
With pandas, we can use resample to aggregate every 30 second window, and then apply the sum function over these windows (note the results for 2017-03-10 15:28:00+00:00
, and 2017-03-10 15:28:30+00:00
):
desired_result = df.set_index("timestamp").resample("30S").sum()
desired_result
dollars
timestamp
2017-03-10 15:27:00+00:00 30
2017-03-10 15:27:30+00:00 25
2017-03-10 15:28:00+00:00 0
2017-03-10 15:28:30+00:00 0
2017-03-10 15:29:00+00:00 200
In pyspark, we can use pyspark.sql.functions.window
to window over every 30 seconds (adapted, with thanks from this stack answer), but this will miss out the window where there are no rows:
spark: pyspark.sql.session.SparkSession # I expect you to have set up your session...
sdf = spark.createDataFrame(df)
sdf.groupby(
F.window("timestamp", windowDuration="30 seconds", slideDuration="30 seconds")
).agg(F.sum("dollars")).display()
window,sum(dollars)
"{""start"":""2017-03-10T15:27:30.000+0000"",""end"":""2017-03-10T15:28:00.000+0000""}",25
"{""start"":""2017-03-10T15:27:00.000+0000"",""end"":""2017-03-10T15:27:30.000+0000""}",30
"{""start"":""2017-03-10T15:29:00.000+0000"",""end"":""2017-03-10T15:29:30.000+0000""}",200
How do I get pyspark to return window results for time window where there are no rows (like pandas)?
You can use timestamp arithmetic as mentioned in this answer (I recommend you take a look at it as he goes into details). In your case it would be:
from pyspark.sql import functions as F
seconds = 30
epoch = (F.col("timestamp").cast("timestamp").cast("bigint") / seconds).cast(
"bigint"
) * seconds
df = spark.createDataFrame(
[
(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-10T15:27:29+00:00"),
(25, "2017-03-10T15:27:30+00:00"),
(101, "2017-03-10T15:29:00+00:00"),
(99, "2017-03-10T15:29:29+00:00"),
],
["dollars", "timestamp"],
).withColumn("epoch", epoch)
min_epoch, max_epoch = df.select(F.min("epoch"), F.max("epoch")).first()
ref = spark.range(min_epoch, max_epoch + seconds, seconds).toDF("epoch")
(
ref.join(df, "epoch", "left")
.withColumn("ts_resampled", F.timestamp_seconds("epoch"))
.groupBy("ts_resampled")
.sum("dollars")
.orderBy("ts_resampled")
.fillna(0, subset=["sum(dollars)"])
.show(truncate=False)
)
Output
|ts_resampled |sum(dollars)|
+-------------------+------------+
|2017-03-10 12:27:00|30 |
|2017-03-10 12:27:30|25 |
|2017-03-10 12:28:00|0 |
|2017-03-10 12:28:30|0 |
|2017-03-10 12:29:00|200 |
+-------------------+------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With