[jira] [Created] (FLINK-18626) the result of aggregate SQL on streaming cannot write to upsert table sink

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18626) the result of aggregate SQL on streaming cannot write to upsert table sink

Shang Yuanchun (Jira)
gaoling ma created FLINK-18626:
----------------------------------

             Summary: the result of aggregate SQL on streaming cannot write to upsert table sink
                 Key: FLINK-18626
                 URL: https://issues.apache.org/jira/browse/FLINK-18626
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / JDBC, Table SQL / API
    Affects Versions: 1.11.0
            Reporter: gaoling ma



{code:java}
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setParallelism(1);
......
bsTableEnv.executeSql("CREATE TABLE aaa(\n" +
                "    `area_code` VARCHAR,\n" +
                "    `stat_date` DATE,\n" +
                "    `index` BIGINT,\n" +
                "    PRIMARY KEY (area_code, stat_date) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector'  = 'jdbc',\n" +
                "  'url'        = 'jdbc:mysql://***/laowufp_data_test',\n" +
                "  'table-name' = 'aaa',\n" +
                "  'driver'     = 'com.mysql.cj.jdbc.Driver',\n" +
                "  'username'   = '***',\n" +
                "  'password'   = '***'\n" +
                ")");
               
                bsTableEnv.executeSql("INSERT INTO aaa SELECT area_code, CURRENT_DATE AS stat_date, count(*) AS index FROM bbb WHERE is_record = '是' GROUP BY area_code");
{code}
When I write the aggregate SQL results into upsert stream JDBC table sink, the program automatically exits with no hint.
The aggregate results suppose to be a restract stream, but another question is how to change the restract stream into upsert stream. Or there is a better way to continuous update the aggregate SQL results into JDBC table. Your comment is appreciated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)