Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting Spark-kafka InputDStream to Array[Bytes]

I am using scala & consuming data from Kafka using below Spark Streaming approach:

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

Above variable returns InputDStream through which I am able to see data in raw/binary format using below code: println(line)

But I need to apply avro format(schema available) on raw/binary format in order to see data in expected json format. In order to apply avro format, I need to to convert above InputDStream to Array[Bytes] which is used by avro.

Can someone please let me know convert InputDStream to Array[Bytes]?

Or

If you know some better way to apply avro schema on InputDStream(of spark Streaming), please share.

like image 600
k_b Avatar asked Dec 08 '25 11:12

k_b


1 Answers

Two things you need to do. The first is use the DefaultDecoder for Kafka which gives you an Array[Byte] for the value type:

val lines: DStream[(String, Array[Byte])] = 
  KafkaUtils
   .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

And then you need to apply your Avro deserialization logic via an additional map:

lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }

Where avroDeserializer is an arbitrary class of yours which knows how to create your type from Avro bytes.

I personally use avro4s to get case class deserialization via macros.

like image 113
Yuval Itzchakov Avatar answered Dec 10 '25 02:12

Yuval Itzchakov