Using Databricks, Spark 3.0.1
To use the legacy format, I have set: spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
I have a dataframe similar to the sample below.
Each row needs to be split in several rows based on a change in consecutive values. The other columns can be filled with null.
Sample:
+----+----+----+----+----+----+----+----+----+----+
|id |t.n1|t.n2|t.n3|t.n4|t.n5|t.n6|t.n7|t.n8|t.n9|
+----+----+----+----+----+----+----+----+----+----+
|1 |100 |100 |100 |500 |500 |500 |200 |200 |200 |
|2 |100 |100 |700 |700 |700 |100 |100 |100 |100 |
+----+----+----+----+----+----+----+----+----+----+
Expected Output:
+----+----+----+----+----+----+----+----+----+----+
|id |t.n1|t.n2|t.n3|t.n4|t.n5|t.n6|t.n7|t.n8|t.n9|
+----+----+----+----+----+----+----+----+----+----+
|1 |100 |100 |100 |Nan |Nan |Nan |Nan |Nan |Nan |
|2 |Nan |Nan |Nan |500 |500 |500 |Nan |Nan |Nan |
|3 |Nan |Nan |Nan |Nan |Nan |Nan |200 |200 |200 |
|4 |100 |100 |Nan |Nan |Nan |Nan |Nan |Nan |Nan |
|5 |Nan |Nan |700 |700 |700 |Nan |Nan |Nan |Nan |
|6 |Nan |Nan |Nan |Nan |Nan |100 |100 |100 |100 |
+----+----+----+----+----+----+----+----+----+----+
given this dataframe :
+---+---+---+---+---+---+---+---+---+---+
| id| n1| n2| n3| n4| n5| n6| n7| n8| n9|
+---+---+---+---+---+---+---+---+---+---+
| 1|100|100|100|500|500|500|200|200|200|
| 2|100|100|700|700|700|100|100|100|100|
+---+---+---+---+---+---+---+---+---+---+
I came up with a solution based in a mixture between dataframes and datasets:
val l = 9 // number of cols
df
// put values into array
.select($"id", array(df.columns.tail.map(col): _*).as("col"))
// switch to dataset api
.as[(Int, Seq[Int])]
.flatMap { case (id, arr) => {
val arrI = arr.zipWithIndex
// split list in sublist based on adjacent values
arrI.tail
.foldLeft(Seq(Seq(arrI.head)))((acc, curr) =>
if (acc.last.last._1 == curr._1) {
acc.init :+ (acc.last :+ curr)
} else {
acc :+ Seq(curr)
}
)
// aggregate sublists into value, from, to
.map(chunk => (chunk.head._1, chunk.map(_._2).min, chunk.map(_._2).max))
// generate new lists, fill with Nones
.zipWithIndex
.map { case ((num, from, to),subI) => (id,subI+1,(0 until l).map(i=> if(i>=from && i<=to) Some(num) else None))}
}
}
.toDF("id","sub_id","values") // back to dataframe api
// rename columns
.select($"id"+:$"sub_id"+:(0 until l).map(i => $"values"(i).as(s"n${i+1}")):_*)
.show(false)
which yields:
+---+------+----+----+----+----+----+----+----+----+----+
|id |sub_id|n1 |n2 |n3 |n4 |n5 |n6 |n7 |n8 |n9 |
+---+------+----+----+----+----+----+----+----+----+----+
|1 |1 |100 |100 |100 |null|null|null|null|null|null|
|1 |2 |null|null|null|500 |500 |500 |null|null|null|
|1 |3 |null|null|null|null|null|null|200 |200 |200 |
|2 |1 |100 |100 |null|null|null|null|null|null|null|
|2 |2 |null|null|700 |700 |700 |null|null|null|null|
|2 |3 |null|null|null|null|null|100 |100 |100 |100 |
+---+------+----+----+----+----+----+----+----+----+----+
As you can see I was not yet successful to get the correct id, this would need some more work. The problem is to make a subsequent id, this would need a wide transformation (window-function without partitioning) which would lead to a performance-bottleneck
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With