I am trying to read a json message from a kafka topic with flink.
I am using Kafka 2.4.1 and Flink 1.10
for my consumer I have set:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT,
new JSONKeyValueDeserializationSchema(false), properties);
when I use SimpleStringSchema I get the json as text which is fine but with the JSONKeyValueDeserializer I get:
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
sensor_5 would be a key in the topic I am guessing that I need to add something else to get the JSON from the kafka message value fed to the serializer and handle the key somehow but I am not sure?
Any suggestions?
The json structure is:
{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
and it is submitted via
# Python 3
import json
from confluent_kafka import Producer
dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})
producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))
So, basically, if You will take a look at the source code of JSONKeyValueDeserializationSchema You can see that it looks like below :
if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
if (record.key() != null) {
node.set("key", mapper.readValue(record.key(), JsonNode.class));
}
if (record.value() != null) {
node.set("value", mapper.readValue(record.value(), JsonNode.class));
}
if (includeMetadata) {
node.putObject("metadata")
.put("offset", record.offset())
.put("topic", record.topic())
.put("partition", record.partition());
}
return node;
So, generally the schema expects that Your key is JSON not a String, thus it will fail for sensor_5. I think the best and simplest solution would be to create Your own implementation that takes String as key.
You can implement DeserializationSchema instead of KeyedDeserializationSchema if you don't want to include your key in your record.
An example would be like the following:
public class JSONValueDeserializationSchema implements DeserializationSchema<ObjectNode> {
private static final long serialVersionUID = -1L;
private ObjectMapper mapper;
@Override
public ObjectNode deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
if (message != null) {
node.set("value", mapper.readValue(message, JsonNode.class));
}
return node;
}
@Override
public boolean isEndOfStream(ObjectNode nextElement) {
return false;
}
@Override
public TypeInformation<ObjectNode> getProducedType() {
return getForClass(ObjectNode.class);
}
}
If you want to include the key as well in your record, you can implement KeyedDeserializationSchema as mentioned in the answer by Dominik Wosiński.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With