Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink app's checkpoint size keeps growing

I have a pipeline like this:

env.addSource(kafkaConsumer, name_source)
            .keyBy { value -> value.f0 }
            .window(EventTimeSessionWindows.withGap(Time.seconds(2)))
            .process(MyProcessor())
            .addSink(kafkaProducer)
            

The keys are guaranteed to be unique in the data that is being currently processed. Thus I would expect the state size to not grow over 2 seconds of data.

However, I notice the state size has been steadily growing over the last day (since the app was deployed).

enter image description here

Is this a bug in flink?

using flink 1.11.2 in aws kinesis data analytics.

like image 764
pdeva Avatar asked Sep 05 '25 03:09

pdeva


1 Answers

Kinesis Data Analytics always uses RocksDB as its state backend. With RocksDB, dead state isn't immediately cleaned up, it's merely marked with a tombstone and is later compacted away. I'm not sure how KDA configures RocksDB compaction, but typically it's done when a level reaches a certain size -- and I suspect your state size is still small enough that compaction hasn't occurred.

With incremental checkpoints (which is what KDA does), checkpointing is done by copying RocksDB's SST files -- which in your case are presumably full of stale data. If you let this run long enough you should eventually see a significant drop in checkpoint size, once compaction has been done.

like image 68
David Anderson Avatar answered Sep 07 '25 21:09

David Anderson