I have an abstract class whose abstract method creates a SourceFunction
, so derived classes can return simple or more complex sources (e.g. KafkaConsumers
etc). ChangeMe
is a simple auto-generated class created by the compilation of an AvroSchema.
public SourceFunction<ChangeMe> createSourceFunction(ParameterTool params) {
FromElementsFunction<ChangeMe> dataSource = null;
List<ChangeMe> changeMeList = Arrays.asList(
ChangeMe.newBuilder().setSomeField("Some field 1").build(),
ChangeMe.newBuilder().setSomeField("Some field 2").build(),
ChangeMe.newBuilder().setSomeField("Some field 3").build()
);
try {
dataSource = new FromElementsFunction<>(new AvroSerializer<>(ChangeMe.class), changeMeList);
}
catch (IOException ex){
}
return dataSource;
}
In my Flink job I basically have this:
SourceFunction<ChangeMe> source = createSourceFunction(params);
DataStream<T> sourceDataStream = streamExecutionEnvironment.addSource(source);
DataStream<ChangeMe> changeMeEventsStream = this.getSourceDataStream(); // gets sourceDataStream above
changeMeEventsStream.print();
When I run the job, I get this error with respect to the call to print():
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
……
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class org.apache.flink.streaming.api.functions.source.FromElementsFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
I am using the Eclipse compiler, so I had thought the type information would be included (although I thought this was just for lambdas, and there are none in the above). What do I need to do to get this to run correctly?
If you want to directly instantiate a FromElementsFunction
, then you have to manually provide a TypeInformation
instance for the ChangeMe
class when calling addSource
. This is necessary for Flink to learn about the element type.
The following code snippet should do the trick:
SourceFunction<ChangeMe> source = createSourceFunction();
TypeInformation<ChangeMe> typeInfo = TypeInformation.of(ChangeMe.class);
DataStream<ChangeMe> sourceDataStream = env.addSource(source, typeInfo);
DataStream<ChangeMe> changeMeEventsStream = sourceDataStream;
changeMeEventsStream.print();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With