Jark Wu created FLINK-21290:
------------------------------- Summary: Support Projection push down for Window TVF Key: FLINK-21290 URL: https://issues.apache.org/jira/browse/FLINK-21290 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Jark Wu {code:scala} @Test def testTumble_ProjectionPushDown(): Unit = { // TODO: [b, c, e, proctime] are never used, should be pruned val sql = """ |SELECT | a, | window_start, | window_end, | count(*), | sum(d) |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) |GROUP BY a, window_start, window_end """.stripMargin util.verifyRelPlan(sql) } {code} For the above test, currently we get the following plan: {code} Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4]) +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, d, rowtime]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) {code} It should be able to prune fields and get the following plan: {code} Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4]) +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a]]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, d, rowtime]) {code} The reason is we didn't transpose Project and WindowTableFunction in logical phase. {code} LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)]) +- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3]) +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |