As we know in kafka stream doc, peek, filter, branch are stateless operations? However,I wanna do some stateful operations in this processor? For example, I wanna do some query, and filter messages base the results, can I do that?
The operations peek(), filter(), and branch() are inherently stateless. When you say:
I wanna do some query, and filter messages base the results
than it depends what you want to query? It's possible (but not recommended) to query an "external" API. However, there is no built-in support for it, and there are many corner case to consider to make it robust. Note thought, that querying an external system does not make the operation stateful.
If you want to work with state, you can use transform() (and siblings) and build custom operators. If you name all your downstream operators (via Named and similar) you can use context.forward(..., To.child(...)) to implement a custom branch. For filtering you can return null to not forward anything.
Not sure what a stateful peek() would be used for, but you could also do that.
Depending on the use-case, it's also possible to implement a "stateful filter" via a stream-table join or stream-globalTable join.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With