Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka python Consumer group session timed out

I use confluent-kafka v1.3.0 and I have following problem with consumer group session timeout. My config looks like:

c['KAFKA'] = {
    'bootstrap.servers': 'host.docker.internal:9104',
    'consumer': {
        'group.id': 'consumer',
        'enable.auto.commit': True,
        'default.topic.config': {
            'auto.offset.reset': 'earliest
        },
        'heartbeat.interval.ms': 100000,
        'max.poll.interval.ms': 300000,
        'session.timeout.ms': 100000
    },
}

and logic in code like:

consumer.subscribe('database_changes')

with ThreadPoolExecutor(max_workers=500) as executor:
    while True:
        msg = consumer.poll(100)
        if msg is not None:
            executor.submit(process_message, msg)

Code in function process message waits a few ms because it's really simple logic. Everything works good but every moment I got this error:

{"asctime":"2020-04-27 08:42:25,759","levelname":"WARNING","name":"services.kafka","message":"SESSTMOUT [rdkafka#consumer-2] [thrd:main]: Consumer group session timed out (in join-state started) after 30131 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group"}

these rebalacing greatly hampers the whole process.

Does anyone have an idea of what can be wrongly set? I suspect a not working heartbeat, but I don't know how it verify or better fix.

Thanks

like image 206
XWizard Avatar asked Sep 08 '25 16:09

XWizard


1 Answers

As per the confluent kafka docs, the heartbeat.interval.ms should be set no higher than 1/3 of session.timeout.ms. Since after the sessions.timeout.ms value, the consumer is assumed dead, it is recommended to wait atleast three heartbeats to assume that.

like image 124
Himanshu Taneja Avatar answered Sep 10 '25 07:09

Himanshu Taneja