I was wondering if there is a way to catch an exception/throwable when producing a kafka message via the Kafka Template.
I don't see anything where it would throw a KafkaException if something fails. I need to find out if the message was committed to Kafka before I can continue with my application flow. I know the ListenableFuture would log a failure but I don't know how to capture that onFailure.
public void sendToKafkaTopic(KafkaMessage data) {
    ListenableFuture<SendResult<String, KafkaMessage>> future = kafkaTemplate.send(primaryKafkaTopic, data);
    future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaMessage>>() {
      @Override
      public void onSuccess(SendResult<String, KafkaMessage> result) {
        log.info("sent message='{}' with offset={}", data,
            result.getRecordMetadata().offset());
      }
      @Override
      public void onFailure(Throwable ex) {
        log.error("unable to send message='{}'", data, ex);
      }
    });
  }
I was trying to find something like this:
  public void sendToKafkaTopic(KafkaMessage data) {
    try {
      kafkaTemplate.send(primaryKafkaTopic, data);
    } catch (RuntimeException err) {
      log.error("unable to send message='{}'", data, ex);
      throw new CustomException(err);
    }
  }
}
before I can continue with my application flow
There is nothing wrong if you just do Future.get(). When an exception happens downstream, it is going to be thrown to you from this blocking get().
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With