Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I convert an Avro object to Json and back ? (when avro schema contains union)

I'm trying to implement the outbox pattern, but I'm running into an issue with Avro and Json.

I have a java quarkus application with an avro schema containing a union of objects. What I want to do is:

  1. create the class object (message to be sent)
  2. save it to the database in a human readable format such as json
  3. then retrieve it from the database and send it to kafka

This is an anonymised example of my schema.:

{
  "namespace": "com.acme.kafka",
  "type": "record",
  "name": "ValidatedUpdate",
  "fields": [
    {
      "name": "update",
      "type": {
        "type": "record",
        "name": "Update",
        "fields": [
          {
            "name": "createDate",
            "type": {
              "type": "long",
              "logicalType": "timestamp-millis"
            }
          },
          {
            "name": "details",
            "type": [
              {
                "name": "OtherUpdateDetails",
                "type": "record",
                "fields": [
                  {
                    "name": "stuff",
                    "type": "string"
                  },
                  {
                    "name": "otherNumber",
                    "type": "int"
                  }
                
                ]
              },
              {
                "name": "SpecialUpdateDetails",
                "type": "record",
                "fields": [
                  {
                    "name": "stuff",
                    "type": "string"
                  },
                  {
                    "name": "isYes",
                    "type": "boolean"
                  }
                ]
              }
            ]
          }
        ]
      }
    }
  ]
}

However when I read the json from the database, map it with jackson to the generated avro message object, all seems fine, but when I try to send it to kafka, it complains about not being able to send it on account of containing a union. What is the best way to fix this?

I've looked into jackson dataformat dependency, but I couldn't figure it out how to fix it. I've checked out other stackoverflow posts, but couldn't find exactly what I'm running into. I've tried to rebuild parts of the kafka message, but that didn't work either. Thank you all in advance for the help.

like image 290
Nan0fire Avatar asked Oct 24 '25 05:10

Nan0fire


2 Answers

UPDATE: I've found a solution, based on: https://www.baeldung.com/java-apache-avro

From Avro Class to JSON

  public static <T extends SpecificRecord> String toJson(T avroObject) {
        DatumWriter<T> writer = new SpecificDatumWriter<>(avroObject.getSchema());
        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
            Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(avroObject.getSchema(), stream);
            writer.write(avroObject, jsonEncoder);
            jsonEncoder.flush();
            return stream.toString();
        } catch (IOException e) {
            log.errorf("Serialization error:", e.getMessage());
        }
        return null;
    }

From JSON (String) to Avro Class


    public static SpecificRecord fromJson(String json, Schema schema) {
        DatumReader<SpecificRecord> reader = new SpecificDatumReader<>(schema);
        try {
            Decoder decoder = DecoderFactory.get().jsonDecoder(schema, json);
            return reader.read(null, decoder);
        } catch (IOException e) {
            log.errorf("Deserialization error: %s json: %s", e.getMessage(), json);
        }
        return null;
    }

This works for me, but feel free to suggest more elegant solutions.

Imports

import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
like image 149
Nan0fire Avatar answered Oct 26 '25 20:10

Nan0fire


You can achieve it using jackson as well by extending jackson's ObjectMapper as below:

public class AvroJacksonObjectMapper extends ObjectMapper
{
  public AvroJacksonObjectMapper()
  {
    super();
    registerModule(new JavaTimeModule());
    configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
    addMixIn(SpecificRecordBase.class, SpecificRecordBaseMixIn.class);
  }

  public abstract class SpecificRecordBaseMixIn
  {
    @JsonIgnore
    abstract void getSchema();

    @JsonIgnore
    abstract void getSpecificData();
  }

}

Now use this object mapper to serialize avro objects to json string or vice versa.

// AvroObject is a class generated out of Avro Schema

ObjectMapper objectMapper = new AvroObjectMapper();    
AvroObject o = objectMapper.readValue("json string", AvroObject.class);
String jsonString = objectMapper.writeValueAsString(avroObject);
like image 20
Manmay Avatar answered Oct 26 '25 19:10

Manmay



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!