when trying to stream Avro data with Kafka Streams, I came across this error:
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Even though I found several older threads about it in the mailing list, none of the solutions stated there fixed the problem. So hopefully, I can find a solution here.
My setup looks as follows:
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)
I already tried setting the KEY_SERDE to the same as the VALUE_SERDE, but even though this was "marked" as a solution in the Mailing list, it did not work in my case.
I'm generating GenericData.Record with my Schema as follows:
val record = new GenericData.Record(schema)
...
record.put(field, value)
When I start the debug mode and check the generated record, everything looks fine, there is data in the record and the mapping is correct.
I stream the KStream like this (I used branch before):
splitTopics.get(0).to(s"${destTopic}_Testing")
I'm using GenericData.Record for the records. Might this be a problem in combination with the GenericAvroSerde?
The solution to my problem was to exchange the VALUE_SERDE after deserializing the String value I get from my Input topic.
Since Serde is a combined "element" of Serializing and Deserializing, I cannot simply use StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde] but have to use a StringSerde for deserializing the input records and only then use an AvroSerde to write it out to the output topic.
Looks like this now:
// default streams configuration serdes are different from the actual output configurations
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamsConf.getString("APPLICATION_ID"))
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kStreamsConf.getString("BOOTSTRAP_SERVERS_CONFIG"))
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kStreamsConf.getString("AUTO_OFFSET_RESET_CONFIG"))
p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kStreamsConf.getString("SCHEMA_REGISTRY_URL_CONFIG"))
p
}
// adjusted output serdes for avro records
val keySerde: Serde[String] = Serdes.String
val valSerde: Serde[GenericData.Record] = new GenericAvroSerde()
valSerde.configure(
Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
streamsConfiguration.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)
),
/* isKeySerde = */ false
)
// Now using the adjusted serdes to write to output like this
stream.to(keySerde, valSerde, "destTopic")
This way, it works like charm.
Thank you
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With