Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Confluent Kafka: Consumer does not read from beginning for all partitions in a topic

I have a topic with 40 partitions. Settings are such:

def on_assign (c,ps):
    for p in ps:
        p.offset=0
    print ps
    c.assign(ps)

conf = {'bootstrap.servers': 'localhost:9092'
        'enable.auto.commit' : False,
        'group.id' : 'confluent_consumer',
        'default.topic.config': {'auto.offset.reset': 'earliest'}
        }
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)

msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())

I want to read from offset 0 for all partitions of the topic topic.source. But I don't see it happening for all partitions. For some partitions it reads from a specific offset which I'm assuming is the committed offset, changing the group.id every time doesn't help either. How can I read from beginning for all partitions of this topic irrespective of the committed offsets?

I printed ps in on_assign() and it printed something like this for all 40 partitions:

[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on
like image 327
NoName Avatar asked Sep 19 '25 20:09

NoName


1 Answers

If you use set group.id to a new value or use a group that has not committed any offset with auto.offset.reset set to earliest then the consumer will start from the beginning of the partition.

That said, the beginning might not be offset 0. Depending on your broker's log retention settings, Kafka can be deleting messages thus the 1st available message in your partitions could be at any offset.

like image 172
Mickael Maison Avatar answered Sep 22 '25 08:09

Mickael Maison