We are writing a Kafka Streams Topology that aggregates data and displays them in real time. We would like to make the display as robust as possible - ideally log the record and continue for any exception.
According to the documentation, a few tests from us and
Kafka Streams very well supports handling exceptions that occur in the Producer or during Deserialization. The provided LogAndContinueExceptionHandler gives exactly our desired behavior. However our main problem are exceptions occuring during the processing (such as in .mapValues() or .leftJoin()
The ideas we had were basically to validate preconditions
/ by zero error, etc.)However if there is something unforeseen in the data an exception could still bubble up and the topology would shut down.
Kafka Streams provides an UncaughtExceptionHandler but it is called after the thread already died and therefore it cannot be used to prevent a topology shutdown.
Is there some way to write a UncaughtExceptionHandler that skips a record? Or alteratively a mechanism to skip the current record that we could in a try-catch block inside the processing function?
I think the best solution is to write your processing operations (e.g: Mapper, Filter, etc) in way that you never throw any exception. For doing that, you can used a wrapper object which can be either in Success of in Error (eg: the Either type in scala). After that, you can use the branch() method to get two streams: one for success records and one for the errors.
The below code shows the basic idea :
public static void main(String[] args) {
var builder = new StreamsBuilder();
KStream<Object, Result<Object>> stream = builder.stream("my-topic")
.map((k, v) -> {
try {
// unsafe operation, i.e that may throw an exception
return KeyValue.pair(k, new Success<>(v));
} catch (Exception e) {
return KeyValue.pair(k, new Error<>(e));
}
});
KStream<Object, Result<Object>>[] branch = stream.branch((k, v) -> !v.hasError(), (k, v) -> v.hasError());
// Handle the success steam
KStream<Object, Result<Object>> successStream = branch[0];
// Handle the error steam, e.g: log errors, write errors to a Dead Letter Queue
KStream<Object, Result<Object>> errorStream = branch[1];
}
public interface Result<T> {
T get() throws Exception;
Exception exception();
boolean hasError();
}
public static class Success<T> implements Result<T> {
private final T value;
public Success(T value) {
this.value = value;
}
@Override
public T get() throws Exception {
return value;
}
@Override
public Exception exception() {
return null;
}
@Override
public boolean hasError() {
return false;
}
}
public static class Error<T> implements Result<T> {
private final Exception error;
public Error(Exception error) { this.error = error; }
@Override
public T get() throws Exception{
throw error;
}
@Override
public Exception exception() {
return error;
}
@Override
public boolean hasError() {
return true;
}
}
In addition, for the Deserialization exceptions you have mentionned, the project Azkarra Streams provides some convenient java classes that can help you (eg. SafeSerdes, DeadLetterTopicExceptionHandler): GitHub
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With