I have a kafka broker running in my local system. To communicate with the broken using my Django based web application I am using confluent-kafka wrapper. However by browsing through the admin api, I could not find out any api for listing kafka topics. (The topics are created pragmatically and are dynamic).
Is there any way I could list them within my program? The requirement is that, if my worker reboots all assigned consumers listening to those topics will have to be re-initialized so I want to loop to all the topics and assign a consumer to each.
Here is how to do it:
>>> from confluent_kafka.admin import AdminClient
>>> conf = {'bootstrap.servers': 'vps01:9092,vps02:9092,vps03:9092'}
>>> kadmin = AdminClient(conf)
>>> kadmin.list_topics().topics # Returns a dict(). See example below.
{'topic01': TopicMetadata(topic01, 3 partitions),}
I hope that helps.
Edit: There may be newer APIs these days (I don't know), but above is how I used to do and it worked well.
Two ways you can achieve it:
from confluent_kafka import Consumer
consummer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'group1'
}
consumer = Consumer(consummer_config)
consumer.list_topics().topics
This will return a dictionary something like this:
{'raw_request': TopicMetadata(raw_request, 4 partitions),
'raw_requests': TopicMetadata(raw_requests, 1 partitions),
'idw': TopicMetadata(idw, 1 partitions),
'__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions)}
the other way is already posted by @NYCeyes
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With