Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Confluent-Kafka Python : How to list all topics programmatically

I have a kafka broker running in my local system. To communicate with the broken using my Django based web application I am using confluent-kafka wrapper. However by browsing through the admin api, I could not find out any api for listing kafka topics. (The topics are created pragmatically and are dynamic).

Is there any way I could list them within my program? The requirement is that, if my worker reboots all assigned consumers listening to those topics will have to be re-initialized so I want to loop to all the topics and assign a consumer to each.

like image 774
Paras Avatar asked Mar 22 '26 06:03

Paras


2 Answers

Here is how to do it:

>>> from confluent_kafka.admin import AdminClient

>>> conf = {'bootstrap.servers': 'vps01:9092,vps02:9092,vps03:9092'}
>>> kadmin = AdminClient(conf)

>>> kadmin.list_topics().topics # Returns a dict(). See example below.
{'topic01': TopicMetadata(topic01, 3 partitions),}

I hope that helps.

Edit: There may be newer APIs these days (I don't know), but above is how I used to do and it worked well.

like image 180
NYCeyes Avatar answered Mar 23 '26 18:03

NYCeyes


Two ways you can achieve it:

from confluent_kafka import Consumer

consummer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'group1'
}
consumer = Consumer(consummer_config)
consumer.list_topics().topics

This will return a dictionary something like this:

{'raw_request': TopicMetadata(raw_request, 4 partitions),
 'raw_requests': TopicMetadata(raw_requests, 1 partitions),
 'idw': TopicMetadata(idw, 1 partitions),
 '__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions)}

the other way is already posted by @NYCeyes

like image 29
PanicLion Avatar answered Mar 23 '26 19:03

PanicLion