In Flink, when we have two or more operators which are side outputing the same data type of records, can we reuse the OutputTag that data output data type?
Example:
OutputTag<A> sideOutputTag = new OutputTag<A>("side-output") {};
ProcessFunction1 processFunction1 = new ProcessFunction1(sideOutputTag);
ProcessFunction2 processFunction2 = new ProcessFunction2(sideOutputTag);
SingleOutputStreamOperator<A> output1 = input.process(processFunction1).getSideOutput(sideOutputTag);
SingleOutputStreamOperator<A> output2 = input.process(processFunction2).getSideOutput(sideOutputTag);
In this approach, will output1 contains the outputs processed by processFunction2?
Or, will output1 and output2 contain the records processed by processFunction1 and processFunction2 separately?
Thanks!
You can reuse the same tag, and the resulting streams will be distinct. For example:
final OutputTag<String> errors = new OutputTag<String>("errors"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With