Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka connection transformations. Parse input string and get a record key

I use a simple file source reader

connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1

File content is a simple JSON object in each line. I found that there is a way to replace a record key and use transformations to do this, like

# Add the `id` field as the key using Simple Message Transformations
transforms=InsertKey

# `ValueToKey`: push an object of one of the column fields (`id`) into the key
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=ip

But I got an error

Only Struct objects supported for [copying fields from value to key], found: java.lang.String

Is there a way to parse string json and get a key from there like I can do with Flume and regex_extractor?

like image 1000
Sergei Grigorev Avatar asked Sep 15 '25 23:09

Sergei Grigorev


1 Answers

When using Transformation on SourceConnector, the transformation is done on the List<SourceRecord> that is returned by the SourceConnector.poll()

In your case, the FileStreamSourceConnector reads the lines of the file and puts each line as a String in the SourceRecord object. Therefore, when the transformation gets the SourceRecord, it only sees it as a String and does not know the structure of the object.

To solve this problem,

  1. Either you modify the FileStreamSourceConnector code so that it returns the SourceRecord with a valid Struct and Schema of your input json String. You can use the Kafka's SchemaBuilder class for this.
  2. Or in case you're consuming this data in sink connector, you can have KafkaConnect convert it to JSON by setting following config on the sink connector and then do the transformations on the sink connector.

"value.converter":"org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable": "false"

If you go with the 2nd option, don't forget to put these configs on your SourceConnector.

"value.convertor":"org.apache.kafka.connect.storage.StringConverter"
"value.converter.schemas.enable": "false"

like image 187
Harry Avatar answered Sep 17 '25 16:09

Harry