I'm using pyspark to process some streaming data coming in and I want to add a new column to my data frame with a 50-second moving average.
i tried using a Window spec with rangeBetween:
import pyspark.sql.window as W
w = (W.Window()
.partitionBy(col("sender"))
.orderBy(F.col("event_time").cast('long'))
.rangeBetween(-50, 0))
df2 = df.withColumn('rolling_average', F.avg("fr").over(w))
But this gives me an error, as structured streaming requires a time-based window (probably to manage state):
AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets
Using the sql.window function i can also calculate the a moving average, but this will give me the results by grouping on a window (and unique id key called sender) that uses a tumbling (or hopping) window:
df.select('sender', 'event_time', 'fr').groupBy("sender", window("event_time", "50 second")).avg().alias('avg_fr')
| sender | window | avg(fr) |
|---|---|---|
| 59834cfd-6cb2-4ece-8353-0a9b20389656 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.17443667352199554 |
| 8b5d90b9-65d9-4dd2-b742-31c4f0ce37d6 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.010564474388957024 |
| a74204f3-e25d-4737-a302-9206cd69e90a | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.16375258564949036 |
| db16426d-a9ba-449b-9777-3bdfadf0e0d9 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.17516431212425232 |
The tumbling window is obviously not what I want and I would need to somehow join this to the original table again. I'm not sure how to define a sliding window based on the irregular event timestamps coming in.
Right now I think about writing a stateful function that stores a set of the previously received records into a state and updating that for each new data point coming in. But this seems quite elaborate for such a common activity that I expect can be done in an easier way.
EDIT: current version of Spark (3.1.1) only allows arbitrary stateful functions to be built in Java or Scala, not python, to safeguard the conversion to JVM.
Any thoughts if this is actually the correct way to go?
You are getting the Exception because it looks like you are building the Window for batch processing, not a streaming Dataframe.
In the Structured Streaming Programming Guidelines in section Window Operations on Event-Time an example is given that can be applied to your use case:
streamDf = ... # streaming DataFrame of schema { event_time: Timestamp, sender: String, fr: Integer }
# Group the data by window and sender and compute the average of each group
movingAverageDf = streamDf.groupBy(
window(streamDf.event_time, "50 seconds", "5 seconds"),
streamDf.sender
).avg(streamDf.fr)
Keep in mind that without using a Watermark the internal state of your application will grow indefinitely. Therefore it is recommended to also add a Watermark. Make sure to use the same event time in the Watermark as your do for the Window.
Another note on the outputModes of your streaming query: Have a look at the overview in OutputModes to understand which modes are supported for your streaming query.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With