Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark / Scala: Split row into several rows based on value change in current row

Using Databricks, Spark 3.0.1

To use the legacy format, I have set: spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

I have a dataframe similar to the sample below.

Each row needs to be split in several rows based on a change in consecutive values. The other columns can be filled with null.

Sample:

+----+----+----+----+----+----+----+----+----+----+
|id  |t.n1|t.n2|t.n3|t.n4|t.n5|t.n6|t.n7|t.n8|t.n9|
+----+----+----+----+----+----+----+----+----+----+
|1   |100 |100 |100 |500 |500 |500 |200 |200 |200 |
|2   |100 |100 |700 |700 |700 |100 |100 |100 |100 |
+----+----+----+----+----+----+----+----+----+----+

Expected Output:

+----+----+----+----+----+----+----+----+----+----+   
|id  |t.n1|t.n2|t.n3|t.n4|t.n5|t.n6|t.n7|t.n8|t.n9|
+----+----+----+----+----+----+----+----+----+----+
|1   |100 |100 |100 |Nan |Nan |Nan |Nan |Nan |Nan |
|2   |Nan |Nan |Nan |500 |500 |500 |Nan |Nan |Nan |
|3   |Nan |Nan |Nan |Nan |Nan |Nan |200 |200 |200 |
|4   |100 |100 |Nan |Nan |Nan |Nan |Nan |Nan |Nan |
|5   |Nan |Nan |700 |700 |700 |Nan |Nan |Nan |Nan |
|6   |Nan |Nan |Nan |Nan |Nan |100 |100 |100 |100 |
+----+----+----+----+----+----+----+----+----+----+
like image 678
hiya Avatar asked Oct 26 '25 07:10

hiya


1 Answers

given this dataframe :

+---+---+---+---+---+---+---+---+---+---+
| id| n1| n2| n3| n4| n5| n6| n7| n8| n9|
+---+---+---+---+---+---+---+---+---+---+
|  1|100|100|100|500|500|500|200|200|200|
|  2|100|100|700|700|700|100|100|100|100|
+---+---+---+---+---+---+---+---+---+---+

I came up with a solution based in a mixture between dataframes and datasets:

val l = 9 // number of cols
df
  // put values into array
  .select($"id", array(df.columns.tail.map(col): _*).as("col"))
  // switch to dataset api
  .as[(Int, Seq[Int])]
  .flatMap { case (id, arr) => {
    val arrI = arr.zipWithIndex
    // split list in sublist based on adjacent values
    arrI.tail
      .foldLeft(Seq(Seq(arrI.head)))((acc, curr) =>
        if (acc.last.last._1 == curr._1) {
          acc.init :+ (acc.last :+ curr)
        } else {
          acc :+ Seq(curr)
        }
      )
     // aggregate sublists into value, from, to
      .map(chunk => (chunk.head._1, chunk.map(_._2).min, chunk.map(_._2).max))
      // generate new lists, fill with Nones
      .zipWithIndex
      .map { case ((num, from, to),subI) => (id,subI+1,(0 until l).map(i=> if(i>=from && i<=to) Some(num) else None))}
  }
  }
  .toDF("id","sub_id","values") // back to dataframe api
  // rename columns
  .select($"id"+:$"sub_id"+:(0 until l).map(i => $"values"(i).as(s"n${i+1}")):_*)
  .show(false)

which yields:

+---+------+----+----+----+----+----+----+----+----+----+
|id |sub_id|n1  |n2  |n3  |n4  |n5  |n6  |n7  |n8  |n9  |
+---+------+----+----+----+----+----+----+----+----+----+
|1  |1     |100 |100 |100 |null|null|null|null|null|null|
|1  |2     |null|null|null|500 |500 |500 |null|null|null|
|1  |3     |null|null|null|null|null|null|200 |200 |200 |
|2  |1     |100 |100 |null|null|null|null|null|null|null|
|2  |2     |null|null|700 |700 |700 |null|null|null|null|
|2  |3     |null|null|null|null|null|100 |100 |100 |100 |
+---+------+----+----+----+----+----+----+----+----+----+

As you can see I was not yet successful to get the correct id, this would need some more work. The problem is to make a subsequent id, this would need a wide transformation (window-function without partitioning) which would lead to a performance-bottleneck

like image 161
Raphael Roth Avatar answered Oct 28 '25 23:10

Raphael Roth



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!