I'm trying to implement the outbox pattern, but I'm running into an issue with Avro and Json.
I have a java quarkus application with an avro schema containing a union of objects. What I want to do is:
This is an anonymised example of my schema.:
{
"namespace": "com.acme.kafka",
"type": "record",
"name": "ValidatedUpdate",
"fields": [
{
"name": "update",
"type": {
"type": "record",
"name": "Update",
"fields": [
{
"name": "createDate",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "details",
"type": [
{
"name": "OtherUpdateDetails",
"type": "record",
"fields": [
{
"name": "stuff",
"type": "string"
},
{
"name": "otherNumber",
"type": "int"
}
]
},
{
"name": "SpecialUpdateDetails",
"type": "record",
"fields": [
{
"name": "stuff",
"type": "string"
},
{
"name": "isYes",
"type": "boolean"
}
]
}
]
}
]
}
}
]
}
However when I read the json from the database, map it with jackson to the generated avro message object, all seems fine, but when I try to send it to kafka, it complains about not being able to send it on account of containing a union. What is the best way to fix this?
I've looked into jackson dataformat dependency, but I couldn't figure it out how to fix it. I've checked out other stackoverflow posts, but couldn't find exactly what I'm running into. I've tried to rebuild parts of the kafka message, but that didn't work either. Thank you all in advance for the help.
UPDATE: I've found a solution, based on: https://www.baeldung.com/java-apache-avro
From Avro Class to JSON
public static <T extends SpecificRecord> String toJson(T avroObject) {
DatumWriter<T> writer = new SpecificDatumWriter<>(avroObject.getSchema());
try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(avroObject.getSchema(), stream);
writer.write(avroObject, jsonEncoder);
jsonEncoder.flush();
return stream.toString();
} catch (IOException e) {
log.errorf("Serialization error:", e.getMessage());
}
return null;
}
From JSON (String) to Avro Class
public static SpecificRecord fromJson(String json, Schema schema) {
DatumReader<SpecificRecord> reader = new SpecificDatumReader<>(schema);
try {
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, json);
return reader.read(null, decoder);
} catch (IOException e) {
log.errorf("Deserialization error: %s json: %s", e.getMessage(), json);
}
return null;
}
This works for me, but feel free to suggest more elegant solutions.
Imports
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
You can achieve it using jackson as well by extending jackson's ObjectMapper as below:
public class AvroJacksonObjectMapper extends ObjectMapper
{
public AvroJacksonObjectMapper()
{
super();
registerModule(new JavaTimeModule());
configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
addMixIn(SpecificRecordBase.class, SpecificRecordBaseMixIn.class);
}
public abstract class SpecificRecordBaseMixIn
{
@JsonIgnore
abstract void getSchema();
@JsonIgnore
abstract void getSpecificData();
}
}
Now use this object mapper to serialize avro objects to json string or vice versa.
// AvroObject is a class generated out of Avro Schema
ObjectMapper objectMapper = new AvroObjectMapper();
AvroObject o = objectMapper.readValue("json string", AvroObject.class);
String jsonString = objectMapper.writeValueAsString(avroObject);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With