Benoit Hanotte created FLINK-15577:
-------------------------------------- Summary: LoggicalWindowAggregate Rel nodes missing Window specs in digest Key: FLINK-15577 URL: https://issues.apache.org/jira/browse/FLINK-15577 Project: Flink Issue Type: Bug Reporter: Benoit Hanotte The RelNode's digest (AbstractRelNode.getDisgest()), along its RowType, is used by the Calcite HepPlanner to avoid adding duplicate Vertices to the graph. If an equivalent vertex was already present in the graph, then that vertex is used in place of the new generated one: https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828 This means that *the digest needs to contain all the information necessary to identify a vertex and distinguish it from similar - but not equivalent - vertices*. In the case of `LogicalWindowAggregation` and `FlinkLogicalWindowAggregation`, the window specs are currently not in the digest, meaning that two aggregations with the same signatures and expressions but different windows are considered equivalent by the planner, which is not correct and will lead to an invalid Physical Plan. For instance, the following query would give an invalid plan: {code} WITH window_1h AS ( SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as `timestamp` FROM my_table GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) ), window_2h AS ( SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp` FROM my_table GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) ) (SELECT * FROM window_1h) UNION ALL (SELECT * FROM window_2h) {code} The invalid plan generated by the planner is the following (*Please note the windows in the DataStreamGroupWindowAggregate being the same when they should be different*): {code} DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176 DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173 DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172 DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171 DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175 DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174 DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |