Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams - all instances local store pointing to the same topic

We have the following problem:

We want to listen on certain Kafka topic and build it's "history" - so for specified key extract some data, add it to already existing list for that key (or create a new one if it does not exist) an put it to another topic, which has only single partition and is highly compacted. Another app can just listen on that topic and update it's history list.

I'm thinking how does it fit with Kafka streams library. We can certainly use aggregation:

msgReceived.map((key, word) -> new KeyValue<>(key, word))
           .groupBy((k,v) -> k, stringSerde, stringSerde)
           .aggregate(String::new,
                     (k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
                     stringSerde, "summaries2")
           .to(stringSerde, stringSerde, "transaction-summary50");

which creates a local store backed by Kafka and use it as history table.

My concern is, if we decide to scale such app, each running instance will create a new backed topic ${applicationId}-${storeName}-changelog (I assume each app has different applicationId). Each instance start to consume input topic, gets a different set of keys and build a different subset of the state. If Kafka decides to rebalance, some instances will start to miss some historic states in local store as they get a completely new set of partitions to consume from.

Question is, if I just set the same applicationId for each running instance, should it eventually replay all data from the very same kafka topic that each running instance has the same local state?

like image 880
Lukáš Gemela Avatar asked Dec 06 '25 06:12

Lukáš Gemela


2 Answers

Why would you create multiple apps with different ID's to perform the same job? The way Kafka achieves parallelism is through tasks:

An application’s processor topology is scaled by breaking it into multiple tasks.

More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application.

Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention.

If you need to scale your app, you can start new instances running the same app (same application ID), and some of the already assigned tasks will reassigned to the new instance. The migration of the local state stores will be automatically handled by the library:

When the re-assignment occurs, some partitions – and hence their corresponding tasks including any local state stores – will be “migrated” from the existing threads to the newly added threads. As a result, Kafka Streams has effectively rebalanced the workload among instances of the application at the granularity of Kafka topic partitions.

I recommend you to have a look to this guide.

like image 67
jose.goncabel Avatar answered Dec 10 '25 08:12

jose.goncabel


My concern is, if we decide to scale such app, each running instance will create a new backed topic ${applicationId}-${storeName}-changelog (I assume each app has different applicationId). Each instance start to consume input topic, gets a different set of keys and build a different subset of the state. If Kafka decides to rebalance, some instances will start to miss some historic states in local store as they get a completely new set of partitions to consume from.

Some assumptions are not correct:

  • if you run multiple instances of your application to scale your app, all of them must have the same application ID (cf. Kafka's consumer group management protocol) -- otherwise, load will not be shared because each instance will be considered an own application, and each instance will get all partitions assigned.

Thus, if all instanced do use the same application ID, all running application instance will use the same changelog topic name and thus, what you intend to do, should work out-of-the box.

like image 32
Matthias J. Sax Avatar answered Dec 10 '25 07:12

Matthias J. Sax



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!