We are batch consuming with kafka. we consume X messages put them on MYSQL then commit them.
Times to times we have partial insertions to MYSQL (duplicate records, other failures, etc..)
using this example from the docs:
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
We want to commitSync only the successful records while having kafka replay the failures.
But I cant understand how to do this as the api got only commitSync() on the whole batch.
Ideas?
In Kafka you don't commit specific records, ie you can't mark offset N as processed and offset N-1 as not processed. Instead by committing offset N, you indicates you've processed all records up to N.
Things you can do when failing to process offset N:
Commit N-1 (using commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)
) and retry processing offset N as you still have it in memory. Only once N has successfully been process you commit N and move to newer records.
Assuming you run in a Sink Connector in Kafka Connect, upon failure to process N you can forward the record to Connect's Deal Letter Queue. Otherwise push it back to another topic for later processing. This temporarily skips offset N (you could also drop it if that's an option).
You can also do a mix of both, attempt a few retries, but if it's not possible to process this record, save/drop it and keep processing newer records.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With