Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Aggregating on 5 minute windows in pyspark

I Have the following dataframe df:

User | Datetime         | amount | length
A    | 2016-01-01 12:01 | 10     | 20
A    | 2016-01-01 12:03 | 6      | 10
A    | 2016-01-01 12:05 | 1      | 3
A    | 2016-01-01 12:06 | 3      | 5
B    | 2016-01-01 12:01 | 10     | 20
B    | 2016-01-01 12:02 | 8      | 20

And I want to use pyspark efficiently to aggregate over a 5 minute time window and do some calculations - so for example calculate the average amount & length for every use for every 5 minute time window - the df will look like this:

User | Datetime         | amount | length
A    | 2016-01-01 12:00 | 8      | 15
B    | 2016-01-01 12:00 | 2      | 4
A    | 2016-01-01 12:05 | 9      | 20

How can I achieve this in the most efficient way? In pandas I used:

df.groupby(['cs_username', pd.TimeGrouper('5Min')].apply(...)
like image 408
Menkes Avatar asked Oct 16 '25 19:10

Menkes


1 Answers

Unfortunately, in pyspark this won't look so cool like in pandas ;-) You can try casting date to timestamp and using modulo, for example:

import pyspark.sql.functions as F
seconds = 300
seconds_window = F.from_unixtime(F.unix_timestamp('date') - F.unix_timestamp('date') % seconds)
dataframe.withColumn('5_minutes_window', seconds_window)

Then you can simply group by new column and perform requested aggregations.

like image 168
Mariusz Avatar answered Oct 18 '25 07:10

Mariusz