[jira] [Created] (FLINK-18769) Streaming Table job stuck when enabling minibatching

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18769) Streaming Table job stuck when enabling minibatching

Shang Yuanchun (Jira)
Nico Kruber created FLINK-18769:
-----------------------------------

             Summary: Streaming Table job stuck when enabling minibatching
                 Key: FLINK-18769
                 URL: https://issues.apache.org/jira/browse/FLINK-18769
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.11.1
            Reporter: Nico Kruber


The following Table API streaming job is stuck when enabling mini batching

{code}
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

    // disable mini-batching completely to get a result
    Configuration tableConf = tableEnv.getConfig()
        .getConfiguration();
    tableConf.setString("table.exec.mini-batch.enabled", "true");
    tableConf.setString("table.exec.mini-batch.allow-latency", "5 s");
    tableConf.setString("table.exec.mini-batch.size", "5000");
    tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

    tableEnv.executeSql(
        "CREATE TABLE input_table ("
            + "location STRING, "
            + "population INT"
            + ") WITH ("
            + "'connector' = 'kafka', "
            + "'topic' = 'kafka_batching_input', "
            + "'properties.bootstrap.servers' = 'localhost:9092', "
            + "'format' = 'csv', "
            + "'scan.startup.mode' = 'earliest-offset'"
            + ")");

    tableEnv.executeSql(
        "CREATE TABLE result_table WITH ('connector' = 'print') LIKE input_table (EXCLUDING OPTIONS)");

    tableEnv
        .from("input_table")
        .groupBy($("location"))
        .select($("location").cast(DataTypes.CHAR(2)).as("location"), $("population").sum().as("population"))
        .executeInsert("result_table");
{code}

I am using a pre-populated Kafka topic called {{kafka_batching_input}} with these elements:
{code}
"Berlin",1
"Berlin",2
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)