I got a dataframe (df1), where I have listed some time frames:
| start | end | event name |
|-------|-----|------------|
| 1 | 3 | name_1 |
| 3 | 5 | name_2 |
| 2 | 6 | name_3 |
In these time frames, I would like to extract some data from another dataframe (df2). For example, I want to extend df1 with the average measurementn from df2 inside the specified time range.
| timestamp | measurement |
|-----------|-------------|
| 1 | 5 |
| 2 | 7 |
| 3 | 5 |
| 4 | 9 |
| 5 | 2 |
| 6 | 7 |
| 7 | 8 |
I was thinking about an UDF function which filters df2 by timestamp and evaluates the average. But in a UDF I can not reference two dataframes:
def get_avg(start, end):
return df2.filter(df2.timestamp > start & df2.timestamp < end).agg({"average": "avg"})
udf_1 = f.udf(get_avg)
df1.select(udf_1('start', 'end').show()
This will throw an error TypeError: cannot pickle '_thread.RLock' object.
How would I solve this issue efficiently?
In this case there is no need to use UDFs, you can simply use join over a range interval determined by the timestamps
import pyspark.sql.functions as F
df1.join(df2, on=[(df2.timestamp > df1.start) & (df2.timestamp < df1.end)]) \
.groupby('start', 'end', 'event_name') \
.agg(F.mean('measurement').alias('avg')) \
.show()
+-----+---+----------+-----------------+
|start|end|event_name| avg|
+-----+---+----------+-----------------+
| 1| 3| name_1| 7.0|
| 3| 5| name_2| 9.0|
| 2| 6| name_3|5.333333333333333|
+-----+---+----------+-----------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With