I use confluent-kafka v1.3.0 and I have following problem with consumer group session timeout. My config looks like:
c['KAFKA'] = {
'bootstrap.servers': 'host.docker.internal:9104',
'consumer': {
'group.id': 'consumer',
'enable.auto.commit': True,
'default.topic.config': {
'auto.offset.reset': 'earliest
},
'heartbeat.interval.ms': 100000,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 100000
},
}
and logic in code like:
consumer.subscribe('database_changes')
with ThreadPoolExecutor(max_workers=500) as executor:
while True:
msg = consumer.poll(100)
if msg is not None:
executor.submit(process_message, msg)
Code in function process message waits a few ms because it's really simple logic. Everything works good but every moment I got this error:
{"asctime":"2020-04-27 08:42:25,759","levelname":"WARNING","name":"services.kafka","message":"SESSTMOUT [rdkafka#consumer-2] [thrd:main]: Consumer group session timed out (in join-state started) after 30131 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group"}
these rebalacing greatly hampers the whole process.
Does anyone have an idea of what can be wrongly set? I suspect a not working heartbeat, but I don't know how it verify or better fix.
Thanks
As per the confluent kafka docs, the heartbeat.interval.ms should be set no higher than 1/3 of session.timeout.ms. Since after the sessions.timeout.ms value, the consumer is assumed dead, it is recommended to wait atleast three heartbeats to assume that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With