I am using scala & consuming data from Kafka using below Spark Streaming approach:
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
Above variable returns InputDStream through which I am able to see data in raw/binary format using below code: println(line)
But I need to apply avro format(schema available) on raw/binary format in order to see data in expected json format. In order to apply avro format, I need to to convert above InputDStream to Array[Bytes] which is used by avro.
Can someone please let me know convert InputDStream to Array[Bytes]?
Or
If you know some better way to apply avro schema on InputDStream(of spark Streaming), please share.
Two things you need to do. The first is use the DefaultDecoder for Kafka which gives you an Array[Byte] for the value type:
val lines: DStream[(String, Array[Byte])] =
KafkaUtils
.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
And then you need to apply your Avro deserialization logic via an additional map:
lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }
Where avroDeserializer is an arbitrary class of yours which knows how to create your type from Avro bytes.
I personally use avro4s to get case class deserialization via macros.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With