So, the question is about safety to use external state storage inside stream functions like filter, map and etc.
Is it ok to do something like this:
JedisPool pool = ...;
KStream stream = ...;
stream.map((k, v) -> {
JedisClient client = pool.getResource();
....
client.close();
});
...
KafkaStreams streams = ...;
Can it cause errors because of using single pool inside multiple streaming tasks?
In apache flink i can use Rich*Function<> where i can configure connection pool to any storage only once inside open method. In apache spark i also can configure global connections. Do i need to do the same using kafka streams or not?
An equivalent to Rich*Function would be to use a transform() instead of map() that allows you to init() and close() a Transformer.
Your approach should work, too, even if you might want to try-catch to ensure close() is executed. However, it's not a recommended pattern.
Depending on your use case, it might be better to load the data from Redis into a Kafka topic (not sure if there is an Redis connector) and load the data into a KTable. Instead of a map() or transform() you would do a stream-table join.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With