Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to consume messages in last N days using confluent-kafka-python?

This question is similar to Python KafkaConsumer start consuming messages from a timestamp except I want to know how to do it in the official Python Kafka client by Confluent.

I looked into the Consumer.offsets_for_times function but I'm confused by that it accepts timestamps in the TopicPartition.offset field.

How is a offset equivalent to a timestamp?

like image 738
wxh Avatar asked Oct 18 '25 15:10

wxh


1 Answers

I did this recently for $work. You need to get the result of offsets_for_times(), then assign() that list to your consumer, and then call consume(). Importantly, don't subscribe() to the topic. (See Eden Hill's comment on https://github.com/confluentinc/confluent-kafka-python/issues/373).

You're right that the documentation for this function is somewhat confusing when it comes to defining timestamps vs offsets.

Update to answer followup question:

The difference to How do I get the the offset of last message of a Kafka topic using confluent-kafka-python? is that rather than

topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]

you would do something like this:

whents = datetime.fromisoformat("2022-01-01T12:34:56.000")
whenms = int(whents) * 1000   # to get milliseconds

topicparts = [TopicPartition(topic_name, i, whenms) for i in range(0, 8)]
like image 148
James McPherson Avatar answered Oct 21 '25 06:10

James McPherson