I am trying to read data from Kafka using structured streaming. The data received from kafka is in json format. My code is as follows: in the code I use the from_json function to convert the json to a dataframe for further processing.
val **schema**: StructType = new StructType()
.add("time", LongType)
.add(id", LongType)
.add("properties",new StructType()
.add("$app_version", StringType)
.
.
)
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","...")
.option("subscribe","...")
.load()
.selectExpr("CAST(value AS STRING) as value")
.select(from_json(col("value"), **schema**))
My problem is that if the field is increased, I can't stop the spark program to manually add these fields, then how can I parse these fields dynamically, I tried schema_of_json(), it can only take the first line to infer the field type and it not suitable for multi-level nested structures json data.
My problem is that if the field is increased, I can't stop the spark program to manually add these fields, then how can I parse these fields dynamically
It is not possible in Spark Structured Streaming (or even Spark SQL) out of the box. There are a couple of solutions though.
You simply have to stop your streaming query, change the code to match the current schema, and resume it. It is possible in Spark Structured Streaming with data sources that support resuming from checkpoint. Kafka data source does support it.
You could write a user-defined function (UDF) that would do this dynamic JSON parsing for you. That's also among the easiest options.
Another option is to create an extension to the built-in Kafka data source that would do the dynamic JSON parsing (similarly to Kafka deserializers). That requires a bit more development, but is certainly doable.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With