Nico Kruber created FLINK-18769:
-----------------------------------
Summary: Streaming Table job stuck when enabling minibatching
Key: FLINK-18769
URL:
https://issues.apache.org/jira/browse/FLINK-18769 Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.11.1
Reporter: Nico Kruber
The following Table API streaming job is stuck when enabling mini batching
{code}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// disable mini-batching completely to get a result
Configuration tableConf = tableEnv.getConfig()
.getConfiguration();
tableConf.setString("table.exec.mini-batch.enabled", "true");
tableConf.setString("table.exec.mini-batch.allow-latency", "5 s");
tableConf.setString("table.exec.mini-batch.size", "5000");
tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
tableEnv.executeSql(
"CREATE TABLE input_table ("
+ "location STRING, "
+ "population INT"
+ ") WITH ("
+ "'connector' = 'kafka', "
+ "'topic' = 'kafka_batching_input', "
+ "'properties.bootstrap.servers' = 'localhost:9092', "
+ "'format' = 'csv', "
+ "'scan.startup.mode' = 'earliest-offset'"
+ ")");
tableEnv.executeSql(
"CREATE TABLE result_table WITH ('connector' = 'print') LIKE input_table (EXCLUDING OPTIONS)");
tableEnv
.from("input_table")
.groupBy($("location"))
.select($("location").cast(DataTypes.CHAR(2)).as("location"), $("population").sum().as("population"))
.executeInsert("result_table");
{code}
I am using a pre-populated Kafka topic called {{kafka_batching_input}} with these elements:
{code}
"Berlin",1
"Berlin",2
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)