I have a spark 2.0 application that reads messages from kafka using spark streaming (with spark-streaming-kafka-0-10_2.11).
Structured streaming looks really cool so I wanted to try and migrate the code but I can't figure out how to use it.
in the regular streaming I used kafkaUtils to createDstrean and in the parameters I passed it was the value deserializer.
in the Structured streaming the doc says that I should deserialize using DataFrame functions but I can't figure exactly what that means.
I looked at examples such as this example but my Avro object in Kafka is quit complex and cannot be simply casted like the String in the example..
So far I tried this kind of code (which I saw here in a different question):
import spark.implicits._
  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()
  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()
and I get "data type mismatch: cannot cast BinaryType to StructType(StructField(...."
how can I deserialize the value?
Kafka is a potential messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming.
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
As noted above, as of Spark 2.1.0 there is support for avro with the batch reader but not with SparkSession.readStream(). Here is how I got it to work in Scala based on the other responses. I've simplified the schema for brevity.
package com.sevone.sparkscala.mypackage
import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
object MyMain {
    // Create avro schema and reader
    case class KafkaMessage (
        deviceId: Int,
        deviceName: String
    )
    val schemaString = """{
        "fields": [
            { "name":  "deviceId",      "type": "int"},
            { "name":  "deviceName",    "type": "string"},
        ],
        "name": "kafkamsg",
        "type": "record"
    }"""
    val messageSchema = new Schema.Parser().parse(schemaString)
    val reader = new GenericDatumReader[GenericRecord](messageSchema)
    // Factory to deserialize binary avro data
    val avroDecoderFactory = DecoderFactory.get()
    // Register implicit encoder for map operation
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]
    def main(args: Array[String]) {
        val KafkaBroker =  args(0);
        val InTopic = args(1);
        val OutTopic = args(2);
        // Get Spark session
        val session = SparkSession
                .builder
                .master("local[*]")
                .appName("myapp")
                .getOrCreate()
        // Load streaming data
        import session.implicits._
        val data = session
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", KafkaBroker)
                .option("subscribe", InTopic)
                .load()
                .select($"value".as[Array[Byte]])
                .map(d => {
                    val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
                    val deviceId = rec.get("deviceId").asInstanceOf[Int]
                    val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
                    new KafkaMessage(deviceId, deviceName)
                })
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With