Is there a simple way to say if a consumer (created with spring boot and @KafkaListener) is operating normally? This includes - can access and poll a broker, has at least one partition assigned, etc.
I see there are ways to subscribe to different lifecycle events but this seems to be a very fragile solution.
Thanks in advance!
You can use the AdminClient
to get the current group status...
@SpringBootApplication
public class So56134056Application {
public static void main(String[] args) {
SpringApplication.run(So56134056Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so56134056", 1, (short) 1);
}
@KafkaListener(id = "so56134056", topics = "so56134056")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaAdmin admin) {
return args -> {
try (AdminClient client = AdminClient.create(admin.getConfig())) {
while (true) {
Map<String, ConsumerGroupDescription> map =
client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);
System.out.println(map);
System.in.read();
}
}
};
}
}
{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8, clientId=consumer-2, host=/127.0.0.1, assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}
We have been thinking about exposing getLastPollTime()
to the listener container API.
getAssignedPartitions()
has been available since 2.1.3.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With