Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Calculating a moving average column using pyspark structured streaming

I'm using pyspark to process some streaming data coming in and I want to add a new column to my data frame with a 50-second moving average.

i tried using a Window spec with rangeBetween:

import pyspark.sql.window as W

w = (W.Window()
     .partitionBy(col("sender"))
     .orderBy(F.col("event_time").cast('long'))
     .rangeBetween(-50, 0))
df2 = df.withColumn('rolling_average', F.avg("fr").over(w))

But this gives me an error, as structured streaming requires a time-based window (probably to manage state):

AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets

Using the sql.window function i can also calculate the a moving average, but this will give me the results by grouping on a window (and unique id key called sender) that uses a tumbling (or hopping) window:

df.select('sender', 'event_time', 'fr').groupBy("sender", window("event_time", "50 second")).avg().alias('avg_fr')
sender window avg(fr)
59834cfd-6cb2-4ece-8353-0a9b20389656 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.17443667352199554
8b5d90b9-65d9-4dd2-b742-31c4f0ce37d6 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.010564474388957024
a74204f3-e25d-4737-a302-9206cd69e90a {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.16375258564949036
db16426d-a9ba-449b-9777-3bdfadf0e0d9 {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} 0.17516431212425232

The tumbling window is obviously not what I want and I would need to somehow join this to the original table again. I'm not sure how to define a sliding window based on the irregular event timestamps coming in.

Right now I think about writing a stateful function that stores a set of the previously received records into a state and updating that for each new data point coming in. But this seems quite elaborate for such a common activity that I expect can be done in an easier way.

EDIT: current version of Spark (3.1.1) only allows arbitrary stateful functions to be built in Java or Scala, not python, to safeguard the conversion to JVM.

Any thoughts if this is actually the correct way to go?

like image 678
joris_van_agtmaal Avatar asked Dec 12 '25 12:12

joris_van_agtmaal


1 Answers

You are getting the Exception because it looks like you are building the Window for batch processing, not a streaming Dataframe.

In the Structured Streaming Programming Guidelines in section Window Operations on Event-Time an example is given that can be applied to your use case:

streamDf = ...  # streaming DataFrame of schema { event_time: Timestamp, sender: String, fr: Integer }

# Group the data by window and sender and compute the average of each group
movingAverageDf = streamDf.groupBy(
    window(streamDf.event_time, "50 seconds", "5 seconds"),
    streamDf.sender
).avg(streamDf.fr)

Keep in mind that without using a Watermark the internal state of your application will grow indefinitely. Therefore it is recommended to also add a Watermark. Make sure to use the same event time in the Watermark as your do for the Window.

Another note on the outputModes of your streaming query: Have a look at the overview in OutputModes to understand which modes are supported for your streaming query.

like image 83
Michael Heil Avatar answered Dec 15 '25 02:12

Michael Heil