Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use Spark Scala to transform flat data into nested object

I need help converting a flat dataset into a nested format using Apache Spark / Scala.

Is it possible to automatically create a nested structure derived from input column namespaces

[level 1].[level 2]? In my example, the nesting level is determined by the period symbol '.' within the column headers.

I assuming this is possible to achieve using a map function. I am open to alternative solutions, particularly if there is a more elegant way of achieving the same outcome.

package org.acme.au

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import scala.collection.Seq

object testNestedObject extends App {

  // Configure spark
  val spark = SparkSession.builder()
    .appName("Spark batch demo")
    .master("local[*]")
    .config("spark.driver.host", "localhost")
    .getOrCreate()

  // Start spark
  val sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  val sqlContext = new SQLContext(sc)

  // Define schema for input data
  val flatSchema = new StructType()
    .add(StructField("id", StringType, false))
    .add(StructField("name", StringType, false))
    .add(StructField("custom_fields.fav_colour", StringType, true))
    .add(StructField("custom_fields.star_sign", StringType, true))

  // Create a row with dummy data
  val row1 = Row("123456", "John Citizen", "Blue", "Scorpio")
  val row2 = Row("990087", "Jane Simth", "Green", "Taurus")
  val flatData = Seq(row1, row2)

  // Convert into dataframe
  val dfIn = spark.createDataFrame(spark.sparkContext.parallelize(flatData), flatSchema)

  // Print to console
  dfIn.printSchema()
  dfIn.show()

  // Convert flat data into nested structure as either Parquet or JSON format
  val dfOut = dfIn.rdd
    .map(
      row => ( /* TODO: Need help with mapping flat data to nested structure derived from input column namespaces
           * 
           * For example:
           * 
           * <id>12345<id>
           * <name>John Citizen</name>
           * <custom_fields>
           *   <fav_colour>Blue</fav_colour>
           *   <star_sign>Scorpio</star_sign>
           * </custom_fields>
           * 
           */ ))

  // Stop spark
  sc.stop()

}
like image 431
Rushy Nova Avatar asked Sep 06 '25 13:09

Rushy Nova


2 Answers

This solution is for the revised requirement that the JSON output would consist of an array of {K:valueK, V:valueV} rather than {valueK1: valueV1, valueK2: valueV2, ...}. For example:

// FROM:
"custom_fields":{"fav_colour":"Blue", "star_sign":"Scorpio"}

// TO:
"custom_fields":[{"key":"fav_colour", "value":"Blue"}, {"key":"star_sign", "value":"Scorpio"}]

Sample code below:

import org.apache.spark.sql.functions._

val dfIn = Seq(
  (123456, "John Citizen", "Blue", "Scorpio"),
  (990087, "Jane Simth", "Green", "Taurus")
).toDF("id", "name", "custom_fields.fav_colour", "custom_fields.star_sign")

val structCols = dfIn.columns.filter(_.contains("."))
// structCols: Array[String] =
//   Array(custom_fields.fav_colour, custom_fields.star_sign)

val structColsMap = structCols.map(_.split("\\.")).
  groupBy(_(0)).mapValues(_.map(_(1)))
// structColsMap: scala.collection.immutable.Map[String,Array[String]] =
//   Map(custom_fields -> Array(fav_colour, star_sign))

val dfExpanded = structColsMap.foldLeft(dfIn){ (accDF, kv) =>
  val cols = kv._2.map( v =>
    struct(lit(v).as("key"), col("`" + kv._1 + "." + v + "`").as("value"))
  )
  accDF.withColumn(kv._1, array(cols: _*))
}

val dfResult = structCols.foldLeft(dfExpanded)(_ drop _)

dfResult.show(false)
// +------+------------+----------------------------------------+
// |id    |name        |custom_fields                           |
// +------+------------+----------------------------------------+
// |123456|John Citizen|[[fav_colour,Blue], [star_sign,Scorpio]]|
// |990087|Jane Simth  |[[fav_colour,Green], [star_sign,Taurus]]|
// +------+------------+----------------------------------------+

dfResult.printSchema
// root
//  |-- id: integer (nullable = false)
//  |-- name: string (nullable = true)
//  |-- custom_fields: array (nullable = false)
//  |    |-- element: struct (containsNull = false)
//  |    |    |-- key: string (nullable = false)
//  |    |    |-- value: string (nullable = true)

dfResult.toJSON.show(false)
// +-------------------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                          |
// +-------------------------------------------------------------------------------------------------------------------------------+
// |{"id":123456,"name":"John Citizen","custom_fields":[{"key":"fav_colour","value":"Blue"},{"key":"star_sign","value":"Scorpio"}]}|
// |{"id":990087,"name":"Jane Simth","custom_fields":[{"key":"fav_colour","value":"Green"},{"key":"star_sign","value":"Taurus"}]}  |
// +-------------------------------------------------------------------------------------------------------------------------------+

Note that we cannot make value type Any to accommodate a mix of different types, as Spark DataFrame API doesn't support type Any. As a consequence, the value in the array must be of a given type (e.g. String). Like the previous solution, this also handles only up to one nested level.

like image 51
Leo C Avatar answered Sep 09 '25 12:09

Leo C


This can be solved with a dedicated case class and a UDF that converts the input data into case class instances. For example:

Define the case class

case class NestedFields(fav_colour: String, star_sign: String)

Define the UDF that takes the original column values as input and returns an instance of NestedFields:

private val asNestedFields = udf((fc: String, ss: String) => NestedFields(fc, ss))

Transform the original DataFrame and drop the flat columns:

val res = dfIn.withColumn("custom_fields", asNestedFields($"`custom_fields.fav_colour`", $"`custom_fields.star_sign`"))
              .drop($"`custom_fields.fav_colour`")
              .drop($"`custom_fields.star_sign`")

It produces

root
|-- id: string (nullable = false)
|-- name: string (nullable = false)
|-- custom_fields: struct (nullable = true)
|    |-- fav_colour: string (nullable = true)
|    |-- star_sign: string (nullable = true)

+------+------------+---------------+
|    id|        name|  custom_fields|
+------+------------+---------------+
|123456|John Citizen|[Blue, Scorpio]|
|990087|  Jane Simth|[Green, Taurus]|
+------+------------+---------------+
like image 42
Antot Avatar answered Sep 09 '25 12:09

Antot