I was reading up details of Kafka High level consumer in this link and saw the below statement -
In practice, a more common pattern is to use sleep indefinitely and use a shutdown hook to trigger clean shutdown.
Are there any examples for doing this or pointers which could help?
This will be an example of an infinite loop
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
//do something
}
} catch (WakeupException e) {
// do nothing we are shutting down
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
And this will be your shutdown hook.
@PostConstruct
private void init(){
addShutdownHook();
}
private void addShutdownHook(){
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
shutdown();
}
}));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With