I am new to Kafka. I am running into below scenario, I have inbound and outbound Kafka queue. App read the message from inbound queue, process it(calls 10 downstream services), on success, it puts the message in outbound queue and then commits the message in inbound queue.
WARN [kafka-coordinator-heartbeat-thread] [Consumer clientId=604dd51a-9b36-4490-aa80-51125bafb465, groupId=abc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
INFO [kafka-coordinator-heartbeat-thread] - [Consumer clientId=604dd51a-9b36-4490-aa80-51125bafb465, groupId=abc] Member 604dd51a-9b36-4490-aa80-51125bafb465-e0 sending LeaveGroup request to coordinator zk2-abc.com:9092 (id: 214748 rack: null)
There are two problems i am facing,
I understand the suggested solution is to set "max.poll.interval.ms" as long as the time it requires to process the message but I know the processing time when things are good but not sure when my hard dependency fails as I have to wait for dependent service to respond(with retries). i can set to max time available but not sure whether that is good approach and what is the implication at kafka level.
What I tried,
I want to understand what possible solution i have to handle above problem and pros and cons of each.
You could create another thread specifically for the message processing, so the consumer thread won't ever delay when calling poll().
You could create a pool of processing threads (big enough not to slow down the consumer thread when trying to allocate the new message), or save the backpressure within an internal structure, such as an ConcurrentHashMap or some kind of synchronized Deque/Queue with limited size. Your "processor" threads will do the hard work, so the consumer is able to call poll in time.
Hope it's helpful, writing it in a rust!
This is taken from the Kafka Consumer documentation:
We have intentionally avoided implementing a particular threading model for processing. This leaves several options for implementing multi-threaded processing of records.
- One Consumer Per Thread
A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach: PRO: It is the easiest to implement PRO: It is often the fastest as no inter-thread co-ordination is needed PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just processes messages in the order it receives them). CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles connections very efficiently so this is generally a small cost. CON: Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput. CON: The number of total threads across all processes will be limited by the total number of partitions.
- Decouple Consumption and Processing
Another alternative is to have one or more consumer threads that do all data consumption and hands off ConsumerRecords instances to a blocking queue consumed by a pool of processor threads that actually handle the record processing. This option likewise has pros and cons: PRO: This option allows independently scaling the number of consumers and processors. This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions. CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing. For processing that has no ordering requirements this is not a problem. CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition. There are many possible variations on this approach. For example each processor thread can have its own queue, and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With