Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink + Kafka + JSON - java example

I'm trying to get a JSON from a Kafka topic with this code:

public class FlinkMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream messageStream = env.addSource(
                new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

        messageStream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        });

        env.execute();
    }
}

The issues are:

1) this program does not run due

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

The problem is at line: `messageStream.map(....`

2) Maybe the above issue is related to the fact that DataStream has no type. But if I try to make:

DataStream<String> messageStream = env.addSource(...

The code will not compile due cannot resolve constructor FlinkKafkaConsumer09 ...

The pom.xml (the important part):

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>
</dependencies>

I've been looking for some code in Flink that uses a JSON DeserializationSchema without success. I've just found the unit test for the JSONKeyValueDeserializationSchema at this link

Does anyone knows how to do the right way?

Thanks

like image 758
miro Avatar asked Mar 23 '26 05:03

miro


1 Answers

I followed Vishnu viswanath answer, however JSONKeyValueDeserializationSchema raises an exception during JSON parser step, even for a simple JSON as {"name":"John Doe"}.

The code that throws is:

DataStream<ObjectNode> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
    , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
    private static final long serialVersionUID = -6867736771747690202L;

    @Override
    public String map(ObjectNode node) throws Exception {
        return "Kafka and Flink says: " + node.get(0);
    }
}).print();

Output:

09/05/2016 11:16:02 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
    at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
    at java.lang.Thread.run(Thread.java:745)

I was succeeded using another deserialization schema JSONDeserializationSchema

        DataStream<ObjectNode> messageStream = env.addSource(
            new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                    , new JSONDeserializationSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode value) throws Exception {
            return "Kafka and Flink says: " + value.get("key").asText();
        }
    }).print();
like image 169
miro Avatar answered Mar 25 '26 18:03

miro



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!