I know there has to be a way to do this, but I am not able to figure this out. I need to stop the kafka consumer once I have read all the messages from the queue.
Can somebody provide any info on this?
Produce a message to a Kafka topic Repeat the same step to publish more messages. The message published to Kafka has a null key. Press Ctrl+C to exit.
You can use ConsumerGroupCommand to check if certain consumer group has finished processing all messages in a particular topic: Zero lag for every partition will indicate that the messages have been consumed successfully, and offsets committed by the consumer.
Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.
Stop the Kafka broker with Ctrl-C .
Currently, Kafka version 2.11-2.1.1 has a script called kafka-console-consumer.sh.
It has a new flag: --timeout-ms.
Basically, this flag is the maximum time to wait before exiting when there is no new log to wait. It's in millisecond term.
You can use this property to end you console consumer after reading all the messages.
You can pass parameter: -consumer-timeout-ms with a value when starting the consumer and it will throw an exception if no messages have been read during that time. For example, to stop the consumer if no new messages have arrived in the last 2 seconds: kafka.consumer.ConsoleConsumer -consumer-timeout-ms 2000
You can see this and all the other input options here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With