I am using Apache Camel to consume messages from kafka topic and then process the message, while processing if an exception occurs, I redirect that message to another kafka topic and process that in separate route. so I have a route something like below.
from ("kafka1").process("someProcessor").end();
onException(Throwable.class).process(exchange->{exchange.getIn().setBody("Message with error details")}).to("kafka2");
Above code is actually sending the error message in same kafka (kafka1).
I solved this by setting exchange.getIn().setHeader(KafkaConstants.TOPIC,"kafka2")) in onException process. Is this expected behavior? why would it ignore kafka2 and use kafka1 instead?
Version of camel used - 2.14.0
Kafka endpoint URLs:
Consumer:
from("kafka:" + ("kafka.broker") + "?topic="
+ ("offer.kafka.topic")
+ "&zookeeperHost=" + ("kafka.zookeeper.host")
+ "&zookeeperPort=" + ("kafka.zookeeper.port")
+ "&groupId=" + ("offer.kafka.group.id")
+ "&consumerStreams=" + ("kafka.streams")
+ "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals")
+ "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout")
+ "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries")
+ "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms")
+ "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
+ "&autoOffsetReset=" + ("kafka.auto.offset.reset")
+ "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes")
+ "&socketReceiveBufferBytes=" + ("receive.buffer.bytes"))
.routeId("offerEventRoute").to("direct:offerEventRoute");
Producer:
to("kafka:" + ("error.kafka.broker") + "?topic="
+ ("error.kafka.topic")
+ "&zookeeperHost=" + ("error.kafka.zookeeper.host")
+ "&zookeeperPort=" + ("error.kafka.zookeeper.port")
+ "&groupId=" + ("error.kafka.group.id")
+ "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout")
+ "&rebalanceMaxRetries=" + ("rebalance.max.retries")
+ "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms")
+ "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
+ "&autoOffsetReset=" + ("auto.offset.reset")
+ "&messageSendMaxRetries=" + ("error.max.retries")
+ "&serializerClass=kafka.serializer.StringEncoder"
);
You need to set the bridgeEndPoint to true in your producer kafka end point. Otherwise it looks for the topic name in the exchange headers and uses that as the topic name for the producer also.
By default it is false.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With