I have a dataframe storing levels per quarter, df1:
| id | year | quarter | level |
|-----|------|---------|--------|
| 111 | 2021 | 1 | Silver |
| 111 | 2021 | 2 | Gold |
| 222 | 2021 | 1 | Bronze |
| 222 | 2021 | 2 | Silver |
I also have another dataframe, storing the same data but not grouped by quarter, df2:
| id | level |
|-----|--------|
| 111 | Bronze |
| 222 | Gold |
I want to calculate the max level across both dataframes but cannot use Max due to (g)old < (s)ilver. Is there a way to do a custom max which captures the rule of gold > silver > bronze?
My expected output would look like this.
| id | year | quarter | level |
|-----|------|---------|--------|
| 111 | 2021 | 1 | Silver |
| 111 | 2021 | 2 | Gold |
| 222 | 2021 | 1 | Gold |
| 222 | 2021 | 2 | Gold |
I tried this before running into the issue:
output = (
df1.join(df2, on = ['id'])
.groupby('id', 'year', 'quarter')
.agg(
F.max(F.col('level')).alias('level') #would rank Silver greater than Gold
)
)
You can create a mapping array column to facilitate sorting by the array index, and use greatest to get the max level that you wanted to calculate.
import pyspark.sql.functions as F
df = df1.alias('df1').join(df2.alias('df2'), 'id').select(
'id', 'year', 'quarter',
F.expr("""
array('Bronze', 'Silver', 'Gold')[
greatest(
map('Bronze', 0, 'Silver', 1, 'Gold', 2)[df1.level],
map('Bronze', 0, 'Silver', 1, 'Gold', 2)[df2.level]
)
] as level
""")
)
df.show()
+---+----+-------+------+
| id|year|quarter| level|
+---+----+-------+------+
|111|2021| 1|Silver|
|111|2021| 2| Gold|
|222|2021| 1| Gold|
|222|2021| 2| Gold|
+---+----+-------+------+
For newer Spark versions, you can use array_position:
df = df1.alias('df1').join(df2.alias('df2'), 'id').withColumn(
'mapping',
F.expr("array('Bronze', 'Silver', 'Gold')")
).select(
'id', 'year', 'quarter',
F.col('mapping')[
F.expr("greatest(array_position(mapping, df1.level), array_position(mapping, df2.level)) - 1")
].alias('level')
)
You can also define your custom ordering using when expression and use greatest function on structs to get the max :
import pyspark.sql.functions as F
order = (F.when(F.col("level") == "Gold", 3)
.when(F.col("level") == "Silver", 2)
.when(F.col("level") == "Bronze", 1))
df1 = df1.withColumn("level", F.struct(order, F.col("level")))
df2 = df2.withColumn("level", F.struct(order, F.col("level")))
result = df1.alias("df1").join(df2.alias("df2"), ["id"]).select(
"id", "year", "quarter",
F.greatest(F.col("df1.level"), F.col("df2.level")).getField("level").alias("level")
)
result.show()
# +---+----+-------+------+
# | id|year|quarter| level|
# +---+----+-------+------+
# |222|2021| 1| Gold|
# |222|2021| 2| Gold|
# |111|2021| 1|Silver|
# |111|2021| 2| Gold|
# +---+----+-------+------+
Or by using a map literal that defines the ordering, and the same way using max on structs:
order = F.create_map(*[F.lit(l) for l in ['Gold', 3, 'Silver', 2, 'Bronze', 1]])
df1 = df1.withColumn("level", F.struct(order.getItem(F.col("level")), F.col("level")))
df2 = df2.withColumn("level", F.struct(order.getItem(F.col("level")), F.col("level")))
result = df1.alias("df1").join(df2.alias("df2"), ["id"]).select(
"id", "year", "quarter",
F.greatest(F.col("df1.level"), F.col("df2.level")).getField("level").alias("level")
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With