Let's consider the following example. We have two topics. Each of the topics has 4 partitions. Two consumers connect to these topics. We use key_shared subscription.
Is there any guarantee in pulsar that records with the same keys from both topics will be landing on the same consumers?
Here is some illustration:
T1 P1(K1 K2) P2 P3 P4
T2 P1(K1 K2) P2 P3 P4
Messages with keys K1 and K2 are published to the same partition of topic T1 and topic T2. Two consumers subscribe to topic T1 and T2. Is it guaranteed that pulsar will be sending same keys across all partitions to each consumer? is this possible: C1(T1K1, T2K2) C2(T1K2, T2K1)
Pulsar's sticky key ranges can help join two topics by message key.
Let's say you have two topics, Topic A and Topic B, and you want to join them based on a common message key. You can use sticky key ranges to ensure that messages with the same key are sent to the same consumer across both topics. Assuming the topics use the same keys, you can create two consumers using the same key range.
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.Schema;
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumerA =
client.newConsumer(Schema.STRING)
.topic("TopicA")
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.KeySharedPolicySticky
.stickyHashRange().ranges(Range.of(0, 100), Range.of(1000, 2000)))
.subscribe();
Consumer<String> consumerB =
client.newConsumer(Schema.STRING)
.topic("TopicB")
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.KeySharedPolicySticky
.stickyHashRange().ranges(Range.of(0, 100), Range.of(1000, 2000)))
.subscribe();
This will ensure that BOTH consumers consume messages with the same keys.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With