Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Partial commitSync when consume batch in kafka

We are batch consuming with kafka. we consume X messages put them on MYSQL then commit them.

Times to times we have partial insertions to MYSQL (duplicate records, other failures, etc..)

using this example from the docs:

List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }
        if (buffer.size() >= minBatchSize) {
            insertIntoDb(buffer);
            consumer.commitSync();
            buffer.clear();
        }

We want to commitSync only the successful records while having kafka replay the failures.

But I cant understand how to do this as the api got only commitSync() on the whole batch.

Ideas?

like image 231
rayman Avatar asked Sep 06 '25 02:09

rayman


1 Answers

In Kafka you don't commit specific records, ie you can't mark offset N as processed and offset N-1 as not processed. Instead by committing offset N, you indicates you've processed all records up to N.

Things you can do when failing to process offset N:

  • Commit N-1 (using commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)) and retry processing offset N as you still have it in memory. Only once N has successfully been process you commit N and move to newer records.

  • Assuming you run in a Sink Connector in Kafka Connect, upon failure to process N you can forward the record to Connect's Deal Letter Queue. Otherwise push it back to another topic for later processing. This temporarily skips offset N (you could also drop it if that's an option).

You can also do a mix of both, attempt a few retries, but if it's not possible to process this record, save/drop it and keep processing newer records.

like image 191
Mickael Maison Avatar answered Sep 08 '25 09:09

Mickael Maison