I need to group by "KEY" Column and need to check whether "TYPE_CODE" column has both "PL" and "JL" values , if so then i need to add a Indicator Column as "Y" else "N"
Example :
//Input Values
val values = List(List("66","PL") ,
List("67","JL") , List("67","PL"),List("67","PO"),
List("68","JL"),List("68","PO")).map(x =>(x(0), x(1)))
import spark.implicits._
//created a dataframe
val cmc = values.toDF("KEY","TYPE_CODE")
cmc.show(false)
------------------------
KEY |TYPE_CODE |
------------------------
66 |PL |
67 |JL |
67 |PL |
67 |PO |
68 |JL |
68 |PO |
-------------------------
Expected Output :
For each "KEY", If it has "TYPE_CODE" has both PL & JL then Y else N
-----------------------------------------------------
KEY |TYPE_CODE | Indicator
-----------------------------------------------------
66 |PL | N
67 |JL | Y
67 |PL | Y
67 |PO | Y
68 |JL | N
68 |PO | N
---------------------------------------------------
For example, 67 has both PL & JL - So "Y" 66 has only PL - So "N" 68 has only JL - So "N"
One option:
1) collect TYPE_CODE as list;
2) check if it contains the specific strings;
3) then flatten the list with explode:
(cmc.groupBy("KEY")
.agg(collect_list("TYPE_CODE").as("TYPE_CODE"))
.withColumn("Indicator",
when(array_contains($"TYPE_CODE", "PL") && array_contains($"TYPE_CODE", "JL"), "Y").otherwise("N"))
.withColumn("TYPE_CODE", explode($"TYPE_CODE"))).show
+---+---------+---------+
|KEY|TYPE_CODE|Indicator|
+---+---------+---------+
| 68| JL| N|
| 68| PO| N|
| 67| JL| Y|
| 67| PL| Y|
| 67| PO| Y|
| 66| PL| N|
+---+---------+---------+
Another option:
Group by KEY and use agg to create two separate indicator columns (one for JL and on for PL), then calculate the combined indicator
join with the original DataFrame
Altogether:
val indicators = cmc.groupBy("KEY").agg(
sum(when($"TYPE_CODE" === "PL", 1).otherwise(0)) as "pls",
sum(when($"TYPE_CODE" === "JL", 1).otherwise(0)) as "jls"
).withColumn("Indicator", when($"pls" > 0 && $"jls" > 0, "Y").otherwise("N"))
val result = cmc.join(indicators, "KEY")
.select("KEY", "TYPE_CODE", "Indicator")
This might be slower than @Psidom's answer, but might be safer - collect_list might be problematic if you have a huge number of matches for a specific key (that list would have to be stored in a single worker's memory).
EDIT:
In case the input is known to be unique (i.e. JL / PL would only appear once per key, at most), indicators could be created using simple count aggregation, which is (arguably) easier to read:
val indicators = cmc
.where($"TYPE_CODE".isin("PL", "JL"))
.groupBy("KEY").count()
.withColumn("Indicator", when($"count" === 2, "Y").otherwise("N"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With