Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark aggregate while find the first value of the group

Suppose I have 5 TB of data with the following schema, and I am using Pyspark.

| id | date | Month | KPI_1 | ... | KPI_n

For 90% of the KPIs, I only need to know the sum/min/max value aggregate to (id, Month) level. For the rest 10%, I need to know the first value based on date.

One option for me is to use window. For example, I can do

from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy("id", "Month").orderBy(F.desc("date"))

# for the 90% kpi
agg_df = df.withColumn("kpi_1", F.sum("kpi_1").over(w))
agg_df = agg_df.withColumn("kpi_2", F.max("kpi_2").over(w))
agg_df = agg_df.withColumn("kpi_3", F.min("kpi_3").over(w))
...

# Select last row for each window to get last accumulated sum for 90% kpis and last value for 10% kpi (which is equivalent to first value if ranked ascending). 

# continue process agg_df with filters based on sum/max/min values of 90% KIPs. 

But I am not sure how to select last row of each window. Does anyone have any suggestions, or if there is a better way to aggregate?

like image 479
Y_KL Avatar asked Nov 16 '25 03:11

Y_KL


1 Answers

Let's assume we have this data

+---+----------+-------+-----+-----+
| id|      date|  month|kpi_1|kpi_2|
+---+----------+-------+-----+-----+
|  1|2000-01-01|2000-01|    1|  100|
|  1|2000-01-02|2000-01|    2|  200|
|  1|2000-01-03|2000-01|    3|  300|
|  1|2000-01-04|2000-01|    4|  400|
|  1|2000-01-05|2000-01|    5|  500|
|  1|2000-02-01|2000-02|   10|   11|
|  1|2000-02-02|2000-02|   20|   21|
|  1|2000-02-03|2000-02|   30|   31|
|  1|2000-02-04|2000-02|   40|   41|
+---+----------+-------+-----+-----+

and we want to calculate the min, max and sum for kpi_1 and get the last value of kpi_2 for each group.

Getting the min, max and sum of kpi_1 can be achieved by grouping the data by id and month. With Spark >= 3.0.0 max_by can be used to the get latest value of kpi_2:

df_avg = df \
    .groupBy("id","month") \
    .agg(F.sum("kpi_1"), F.min("kpi_1"), F.max("kpi_1"), F.expr("max_by(kpi_2, date)"))
df_avg.show()

prints

+---+-------+----------+----------+----------+-------------------+
| id|  month|sum(kpi_1)|min(kpi_1)|max(kpi_1)|max_by(kpi_2, date)|
+---+-------+----------+----------+----------+-------------------+
|  1|2000-02|       100|        10|        40|                 41|
|  1|2000-01|        15|         1|         5|                500|
+---+-------+----------+----------+----------+-------------------+

For Spark version < 3.0.0 max_by is not available and so getting the last value of kpi_2 for each group is more difficult. A first idea could be to use the aggregation function first() on an descending ordered data frame . A simple test gave me the correct result, but unfortunately the documentation states "The function is non-deterministic because its results depends on order of rows which may be non-deterministic after a shuffle".

A better approach to get the last value of kpi_2 is to use a window like shown in the question. As window function row_number() would work:

w = Window.partitionBy("id", "Month").orderBy(F.desc("date"))
df_first = df.withColumn("row_number", F.row_number().over(w)).where("row_number = 1")\
    .drop("row_number") \
    .select("id", "month", "KPI_2")
df_first.show()

prints

+---+-------+-----+
| id|  month|KPI_2|
+---+-------+-----+
|  1|2000-02|   41|
|  1|2000-01|  500|
+---+-------+-----+

Joining the first part (without the max_by column) and the second part gives the desired result:

df_result = df_avg.join(df_first, ['id', 'month'])
df_result.show()

prints

+---+-------+----------+----------+----------+-----+
| id|  month|sum(kpi_1)|min(kpi_1)|max(kpi_1)|KPI_2|
+---+-------+----------+----------+----------+-----+
|  1|2000-02|       100|        10|        40|   41|
|  1|2000-01|        15|         1|         5|  500|
+---+-------+----------+----------+----------+-----+
like image 113
werner Avatar answered Nov 17 '25 18:11

werner



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!