[jira] [Created] (FLINK-18070) Time attribute been materialized after sub graph optimize

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18070) Time attribute been materialized after sub graph optimize

Shang Yuanchun (Jira)
YufeiLiu created FLINK-18070:
--------------------------------

             Summary: Time attribute been materialized after sub graph optimize
                 Key: FLINK-18070
                 URL: https://issues.apache.org/jira/browse/FLINK-18070
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.10.0
            Reporter: YufeiLiu


Hi, I want to use window aggregate after create temporary, and has multiple sinks. But throw exception:

{code:java}
java.lang.AssertionError: type mismatch:
ref:
TIME ATTRIBUTE(PROCTIME) NOT NULL
input:
TIMESTAMP(3) NOT NULL
{code}

I look into the optimizer logic, there is comment at {{CommonSubGraphBasedOptimizer}}:
"1. In general, for multi-sinks users tend to use VIEW which is a natural common sub-graph."

After sub graph optimize, time attribute from source have been convert to basic TIMESTAMP type according to {{FlinkRelTimeIndicatorProgram}}. But my create view sql is simple query, I think didn't need to materialized time attribute in theory.

Here is my code:

{code:java}
// connector.type COLLECTION is for debug use
tableEnv.sqlUpdate("CREATE TABLE source (\n" +
        "    `ts` AS PROCTIME(),\n" +
        "    `order_type` INT\n" +
        ") WITH (\n" +
        "    'connector.type' = 'COLLECTION',\n" +
        "    'format.type' = 'json'\n" +
        ")\n");
tableEnv.createTemporaryView("source_view", tableEnv.sqlQuery("SELECT * FROM source"));
tableEnv.sqlUpdate("CREATE TABLE sink (\n" +
        "    `result` BIGINT\n" +
        ") WITH (\n" +
        "    'connector.type' = 'COLLECTION',\n" +
        "    'format.type' = 'json'\n" +
        ")\n");
tableEnv.sqlUpdate("INSERT INTO sink \n" +
        "SELECT\n" +
        "    COUNT(1)\n" +
        "FROM\n" +
        "    `source_view`\n" +
        "WHERE\n" +
        "     `order_type` = 33\n" +
        "GROUP BY\n" +
        "    TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
tableEnv.sqlUpdate("INSERT INTO sink \n" +
        "SELECT\n" +
        "    COUNT(1)\n" +
        "FROM\n" +
        "    `source_view`\n" +
        "WHERE\n" +
        "     `order_type` = 34\n" +
        "GROUP BY\n" +
        "    TUMBLE(`ts`, INTERVAL '5' SECOND)\n");
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)