Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer health check

Tags:

spring-kafka

Is there a simple way to say if a consumer (created with spring boot and @KafkaListener) is operating normally? This includes - can access and poll a broker, has at least one partition assigned, etc.

I see there are ways to subscribe to different lifecycle events but this seems to be a very fragile solution.

Thanks in advance!

like image 403
Admit Avatar asked Oct 14 '25 02:10

Admit


1 Answers

You can use the AdminClient to get the current group status...

@SpringBootApplication
public class So56134056Application {

    public static void main(String[] args) {
        SpringApplication.run(So56134056Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56134056", 1, (short) 1);
    }

    @KafkaListener(id = "so56134056", topics = "so56134056")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaAdmin admin) {
        return args -> {
            try (AdminClient client = AdminClient.create(admin.getConfig())) {
                while (true) {
                    Map<String, ConsumerGroupDescription> map =
                            client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get(10, TimeUnit.SECONDS);
                    System.out.println(map);
                    System.in.read();
                }
            }
        };
    }

}

{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8, clientId=consumer-2, host=/127.0.0.1, assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}

We have been thinking about exposing getLastPollTime() to the listener container API.

getAssignedPartitions() has been available since 2.1.3.

like image 94
Gary Russell Avatar answered Oct 18 '25 01:10

Gary Russell