Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Type erasure & Flink: what causes run time error?

I have an abstract class whose abstract method creates a SourceFunction, so derived classes can return simple or more complex sources (e.g. KafkaConsumers etc). ChangeMe is a simple auto-generated class created by the compilation of an AvroSchema.

public SourceFunction<ChangeMe> createSourceFunction(ParameterTool params) {
        FromElementsFunction<ChangeMe> dataSource = null;

        List<ChangeMe> changeMeList = Arrays.asList(
                ChangeMe.newBuilder().setSomeField("Some field 1").build(),
                ChangeMe.newBuilder().setSomeField("Some field 2").build(),
                ChangeMe.newBuilder().setSomeField("Some field 3").build()
        );
        try {
            dataSource = new FromElementsFunction<>(new AvroSerializer<>(ChangeMe.class), changeMeList);
        }
        catch (IOException ex){

        }

        return dataSource;
}

In my Flink job I basically have this:

SourceFunction<ChangeMe> source = createSourceFunction(params);
DataStream<T> sourceDataStream = streamExecutionEnvironment.addSource(source);


DataStream<ChangeMe> changeMeEventsStream = this.getSourceDataStream();  // gets sourceDataStream above
changeMeEventsStream.print();

When I run the job, I get this error with respect to the call to print():

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
……
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class org.apache.flink.streaming.api.functions.source.FromElementsFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

I am using the Eclipse compiler, so I had thought the type information would be included (although I thought this was just for lambdas, and there are none in the above). What do I need to do to get this to run correctly?

like image 436
John Avatar asked Sep 06 '25 05:09

John


1 Answers

If you want to directly instantiate a FromElementsFunction, then you have to manually provide a TypeInformation instance for the ChangeMe class when calling addSource. This is necessary for Flink to learn about the element type.

The following code snippet should do the trick:

SourceFunction<ChangeMe> source = createSourceFunction();

TypeInformation<ChangeMe> typeInfo = TypeInformation.of(ChangeMe.class);
DataStream<ChangeMe> sourceDataStream = env.addSource(source, typeInfo);

DataStream<ChangeMe> changeMeEventsStream = sourceDataStream;
changeMeEventsStream.print();
like image 90
Till Rohrmann Avatar answered Sep 07 '25 19:09

Till Rohrmann