I have following program to consume all the messages coming to Kafka.
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()
Using above program i am able to consume all the messages coming to Kafka. But once all messages are consumed, i want to close the kafka consumer which is not happening. I need help in same.
Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.
Always close() the consumer before exiting. This will close the network connections and sockets.
You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.
A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
I am able to close kafka consumer now if i provide consumer_timeout_ms argument to KafkaConsumer object. It accepts timeout value in millisecond. Below is the code snippet.
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'],
                         consumer_timeout_ms=1000)
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()
In above code if consumer doesn't see any message for 1 second it will close the session.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With