I need help converting a flat dataset into a nested format using Apache Spark / Scala.
Is it possible to automatically create a nested structure derived from input column namespaces
[level 1].[level 2]? In my example, the nesting level is determined by the period symbol '.' within the column headers.
I assuming this is possible to achieve using a map function. I am open to alternative solutions, particularly if there is a more elegant way of achieving the same outcome.
package org.acme.au
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import scala.collection.Seq
object testNestedObject extends App {
  // Configure spark
  val spark = SparkSession.builder()
    .appName("Spark batch demo")
    .master("local[*]")
    .config("spark.driver.host", "localhost")
    .getOrCreate()
  // Start spark
  val sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  val sqlContext = new SQLContext(sc)
  // Define schema for input data
  val flatSchema = new StructType()
    .add(StructField("id", StringType, false))
    .add(StructField("name", StringType, false))
    .add(StructField("custom_fields.fav_colour", StringType, true))
    .add(StructField("custom_fields.star_sign", StringType, true))
  // Create a row with dummy data
  val row1 = Row("123456", "John Citizen", "Blue", "Scorpio")
  val row2 = Row("990087", "Jane Simth", "Green", "Taurus")
  val flatData = Seq(row1, row2)
  // Convert into dataframe
  val dfIn = spark.createDataFrame(spark.sparkContext.parallelize(flatData), flatSchema)
  // Print to console
  dfIn.printSchema()
  dfIn.show()
  // Convert flat data into nested structure as either Parquet or JSON format
  val dfOut = dfIn.rdd
    .map(
      row => ( /* TODO: Need help with mapping flat data to nested structure derived from input column namespaces
           * 
           * For example:
           * 
           * <id>12345<id>
           * <name>John Citizen</name>
           * <custom_fields>
           *   <fav_colour>Blue</fav_colour>
           *   <star_sign>Scorpio</star_sign>
           * </custom_fields>
           * 
           */ ))
  // Stop spark
  sc.stop()
}
This solution is for the revised requirement that the JSON output would consist of an array of {K:valueK, V:valueV} rather than {valueK1: valueV1, valueK2: valueV2, ...}.  For example:
// FROM:
"custom_fields":{"fav_colour":"Blue", "star_sign":"Scorpio"}
// TO:
"custom_fields":[{"key":"fav_colour", "value":"Blue"}, {"key":"star_sign", "value":"Scorpio"}]
Sample code below:
import org.apache.spark.sql.functions._
val dfIn = Seq(
  (123456, "John Citizen", "Blue", "Scorpio"),
  (990087, "Jane Simth", "Green", "Taurus")
).toDF("id", "name", "custom_fields.fav_colour", "custom_fields.star_sign")
val structCols = dfIn.columns.filter(_.contains("."))
// structCols: Array[String] =
//   Array(custom_fields.fav_colour, custom_fields.star_sign)
val structColsMap = structCols.map(_.split("\\.")).
  groupBy(_(0)).mapValues(_.map(_(1)))
// structColsMap: scala.collection.immutable.Map[String,Array[String]] =
//   Map(custom_fields -> Array(fav_colour, star_sign))
val dfExpanded = structColsMap.foldLeft(dfIn){ (accDF, kv) =>
  val cols = kv._2.map( v =>
    struct(lit(v).as("key"), col("`" + kv._1 + "." + v + "`").as("value"))
  )
  accDF.withColumn(kv._1, array(cols: _*))
}
val dfResult = structCols.foldLeft(dfExpanded)(_ drop _)
dfResult.show(false)
// +------+------------+----------------------------------------+
// |id    |name        |custom_fields                           |
// +------+------------+----------------------------------------+
// |123456|John Citizen|[[fav_colour,Blue], [star_sign,Scorpio]]|
// |990087|Jane Simth  |[[fav_colour,Green], [star_sign,Taurus]]|
// +------+------------+----------------------------------------+
dfResult.printSchema
// root
//  |-- id: integer (nullable = false)
//  |-- name: string (nullable = true)
//  |-- custom_fields: array (nullable = false)
//  |    |-- element: struct (containsNull = false)
//  |    |    |-- key: string (nullable = false)
//  |    |    |-- value: string (nullable = true)
dfResult.toJSON.show(false)
// +-------------------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                          |
// +-------------------------------------------------------------------------------------------------------------------------------+
// |{"id":123456,"name":"John Citizen","custom_fields":[{"key":"fav_colour","value":"Blue"},{"key":"star_sign","value":"Scorpio"}]}|
// |{"id":990087,"name":"Jane Simth","custom_fields":[{"key":"fav_colour","value":"Green"},{"key":"star_sign","value":"Taurus"}]}  |
// +-------------------------------------------------------------------------------------------------------------------------------+
Note that we cannot make value type Any to accommodate a mix of different types, as Spark DataFrame API doesn't support type Any.  As a consequence, the value in the array must be of a given type (e.g. String).  Like the previous solution, this also handles only up to one nested level.
This can be solved with a dedicated case class and a UDF that converts the input data into case class instances. For example:
Define the case class
case class NestedFields(fav_colour: String, star_sign: String)
Define the UDF that takes the original column values as input and returns an instance of NestedFields:
private val asNestedFields = udf((fc: String, ss: String) => NestedFields(fc, ss))
Transform the original DataFrame and drop the flat columns:
val res = dfIn.withColumn("custom_fields", asNestedFields($"`custom_fields.fav_colour`", $"`custom_fields.star_sign`"))
              .drop($"`custom_fields.fav_colour`")
              .drop($"`custom_fields.star_sign`")
It produces
root
|-- id: string (nullable = false)
|-- name: string (nullable = false)
|-- custom_fields: struct (nullable = true)
|    |-- fav_colour: string (nullable = true)
|    |-- star_sign: string (nullable = true)
+------+------------+---------------+
|    id|        name|  custom_fields|
+------+------------+---------------+
|123456|John Citizen|[Blue, Scorpio]|
|990087|  Jane Simth|[Green, Taurus]|
+------+------------+---------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With