Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to set the consumer in Faust to a specific offset

Tags:

faust

From the Faust documentation I can't find out how to set the consumer to a specific offset.

With confluent-kafka I use consumer.offsets_for_times to find a start_offset and then assign the TopicPartition to that specific offset, something like:

start_offset = consumer.offsets_for_times([
    TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
    TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])

consumer.assign([
    TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])

With Faust I can't find much more than:

consumer_auto_offset_reset

Which only let you set earliest or latest. How would I start reading from a specific hour or beginning of day?

like image 987
galinden Avatar asked Oct 27 '25 13:10

galinden


1 Answers

to set the offset to a specific value you can use these example. Here I set the offset to 50000. Every time I start my app, the agent starts reading at the offset 50000. For this I use app.consumer.seek

Here tp takes in two params, topic - test in this case and 0 which is the partition number. For more info faust.types

from faust.types import TP, Message

tp = TP("test", 0)
topic = app.topic(tp.topic)

@app.task()
async def on_start():
    await app.consumer.seek(tp, 50000)
    print("App startet")

@app.agent(topic)
async def receive(stream):
    async for event in stream.events():
        print((event.message.offset, event.value))
like image 134
Phil997 Avatar answered Oct 29 '25 18:10

Phil997



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!