I use a simple file source reader
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
File content is a simple JSON object in each line. I found that there is a way to replace a record key and use transformations to do this, like
# Add the `id` field as the key using Simple Message Transformations
transforms=InsertKey
# `ValueToKey`: push an object of one of the column fields (`id`) into the key
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=ip
But I got an error
Only Struct objects supported for [copying fields from value to key], found: java.lang.String
Is there a way to parse string json and get a key from there like I can do with Flume and regex_extractor?
When using Transformation on SourceConnector, the transformation is done on the List<SourceRecord>
that is returned by the SourceConnector.poll()
In your case, the FileStreamSourceConnector
reads the lines of the file and puts each line as a String in the SourceRecord
object. Therefore, when the transformation gets the SourceRecord
, it only sees it as a String and does not know the structure of the object.
To solve this problem,
FileStreamSourceConnector
code so that it returns the SourceRecord
with a valid Struct and Schema of your input json String. You can use the Kafka's SchemaBuilder class for this."value.converter":"org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable": "false"
If you go with the 2nd option, don't forget to put these configs on your SourceConnector.
"value.convertor":"org.apache.kafka.connect.storage.StringConverter"
"value.converter.schemas.enable": "false"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With