I am new to Flink i am doing of pattern evaluation using Flink KeyedBroadCastProcessFunction some thing in similar lines to (https://flink.apache.org/2019/06/26/broadcast-state.html) and i am using JAVA to develop my code but i am not getting how can i handle exception if any failure happens while processing datastream i searched a lot but didn't get i ended up in below two links
Flink: what's the best way to handle exceptions inside Flink jobs
Apache Flink - exception handling in "keyBy"
As per the first link the user said he is using sideoutput in processfn to capture errors i am also using sideoutput in my program to send the data which does not match with patterns but i didn't get how to handle error and invalid data to same side output
As per the second link the user is trying add a sink to keyby function with null key and printsink function which i didn't understand at all
Can any one please help me with below things
1)Any documentation or small code snippet for exception handling i didn't find any thing in flink documentation site 2)Best practices for flink exception handling
I didn't find any valid resources online if some one can answer it will be useful for further references to others also
You can have as many side outputs from a ProcessFunction as you like -- each will have its own unique OutputTag. So you can use one for unmatched data, and another for errors.
final OutputTag<T> unmatched = new OutputTag<String>("unmatched-data"){};
final OutputTag<String> errors = new OutputTag<String>("side-output-for-errors"){};
SingleOutputStreamOperator<T> matchedData = ...;
DataStream<T> unmatched = matchedData.getSideOutput(unmatched);
DataStream<String> errors = matchedData.getSideOutput(errors);
If you end up with several different operators each using side outputs to collect errors, then you could union them all together for reporting, which might look something like this:
final OutputTag<String> errors = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<T> task1 = ...;
SingleOutputStreamOperator<T> task2 = ...;
SingleOutputStreamOperator<T> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With