I have this very simplified use case: I want to use Apache Flink (1.11) to read data from a Kafka topic (let's call it source_topic), count an attribute in it (called b) and write the result into another Kafka topic (result_topic).
I have the following code so far:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)`
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.2.jar")
source_ddl = """
CREATE TABLE source_table(
a STRING,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'node-1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
)
"""
sink_ddl = """
CREATE TABLE result_table(
b INT,
result BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'result_topic',
'properties.bootstrap.servers' = 'node-1:9092',
'format' = 'csv'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql("INSERT INTO result_table SELECT b,COUNT(b) FROM source_table GROUP BY b")
t_env.execute("Kafka_Flink_job")
if __name__ == '__main__':
log_processing()
But when I execute it, I get the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.result_table' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[b], select=[b, COUNT(b) AS EXPR$1])
I am able to write data into a Kafka topic with a simple SELECT statement. But as soon as I add the GROUP BY clause, the exception above is thrown. I followed Flink's documentation on the use of the Table API with SQL for Python: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#sql
Any help is highly appreciated, I am very new to Stream Processing and Flink. Thank you!
Using a GROUP BY clause will generate an updating stream, which is not supported by the Kafka connector as of Flink 1.11. On the other hand, when you use a simple SELECT statement without any aggregation, the result stream is append-only (this is why you're able to consume it without issues).
Flink 1.12 is very close to being released, and it includes a new upsert Kafka connector (FLIP-149, if you're curious) that will allow you to do this type of operation also in PyFlink (i.e. the Python Table API).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With