Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink deserialize Kafka JSON

I am trying to read a json message from a kafka topic with flink.

I am using Kafka 2.4.1 and Flink 1.10

for my consumer I have set:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;


FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT, 
                new JSONKeyValueDeserializationSchema(false), properties);

when I use SimpleStringSchema I get the json as text which is fine but with the JSONKeyValueDeserializer I get:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

sensor_5 would be a key in the topic I am guessing that I need to add something else to get the JSON from the kafka message value fed to the serializer and handle the key somehow but I am not sure?

Any suggestions?

The json structure is:

{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}

and it is submitted via

# Python 3
import json
from confluent_kafka import Producer

dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})

producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))

like image 518
KillerSnail Avatar asked Oct 22 '25 15:10

KillerSnail


2 Answers

So, basically, if You will take a look at the source code of JSONKeyValueDeserializationSchema You can see that it looks like below :

    if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (record.key() != null) {
            node.set("key", mapper.readValue(record.key(), JsonNode.class));
        }
        if (record.value() != null) {
            node.set("value", mapper.readValue(record.value(), JsonNode.class));
        }
        if (includeMetadata) {
            node.putObject("metadata")
                .put("offset", record.offset())
                .put("topic", record.topic())
                .put("partition", record.partition());
        }
        return node;

So, generally the schema expects that Your key is JSON not a String, thus it will fail for sensor_5. I think the best and simplest solution would be to create Your own implementation that takes String as key.

like image 64
Dominik Wosiński Avatar answered Oct 25 '25 06:10

Dominik Wosiński


You can implement DeserializationSchema instead of KeyedDeserializationSchema if you don't want to include your key in your record.

An example would be like the following:

public class JSONValueDeserializationSchema implements DeserializationSchema<ObjectNode> {

    private static final long serialVersionUID = -1L;

    private ObjectMapper mapper;

    @Override
    public ObjectNode deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (message != null) {
            node.set("value", mapper.readValue(message, JsonNode.class));
        }
        return node;
    }

    @Override
    public boolean isEndOfStream(ObjectNode nextElement) {
        return false;
    }

    @Override
    public TypeInformation<ObjectNode> getProducedType() {
        return getForClass(ObjectNode.class);
    }
}

If you want to include the key as well in your record, you can implement KeyedDeserializationSchema as mentioned in the answer by Dominik Wosiński.

like image 21
damjad Avatar answered Oct 25 '25 06:10

damjad