Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How structured streaming dynamically parses kafka's json data

I am trying to read data from Kafka using structured streaming. The data received from kafka is in json format. My code is as follows: in the code I use the from_json function to convert the json to a dataframe for further processing.

val **schema**: StructType = new StructType()
    .add("time", LongType)
    .add(id", LongType)
    .add("properties",new StructType()
      .add("$app_version", StringType)
      .
      .
    )
val df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","...")
    .option("subscribe","...")
    .load()
    .selectExpr("CAST(value AS STRING) as value")
    .select(from_json(col("value"), **schema**))

My problem is that if the field is increased, I can't stop the spark program to manually add these fields, then how can I parse these fields dynamically, I tried schema_of_json(), it can only take the first line to infer the field type and it not suitable for multi-level nested structures json data.

like image 606
Sin Avatar asked Sep 18 '25 22:09

Sin


1 Answers

My problem is that if the field is increased, I can't stop the spark program to manually add these fields, then how can I parse these fields dynamically

It is not possible in Spark Structured Streaming (or even Spark SQL) out of the box. There are a couple of solutions though.

Changing Schema in Code and Resuming Streaming Query

You simply have to stop your streaming query, change the code to match the current schema, and resume it. It is possible in Spark Structured Streaming with data sources that support resuming from checkpoint. Kafka data source does support it.

User-Defined Function (UDF)

You could write a user-defined function (UDF) that would do this dynamic JSON parsing for you. That's also among the easiest options.

New Data Source (MicroBatchReader)

Another option is to create an extension to the built-in Kafka data source that would do the dynamic JSON parsing (similarly to Kafka deserializers). That requires a bit more development, but is certainly doable.

like image 138
Jacek Laskowski Avatar answered Sep 20 '25 12:09

Jacek Laskowski