[jira] [Created] (FLINK-12192) Add support for generating optimized logical plan for grouping sets and distinct aggregate

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12192) Add support for generating optimized logical plan for grouping sets and distinct aggregate

Shang Yuanchun (Jira)
godfrey he created FLINK-12192:
----------------------------------

             Summary: Add support for generating optimized logical plan for grouping sets and distinct aggregate
                 Key: FLINK-12192
                 URL: https://issues.apache.org/jira/browse/FLINK-12192
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / Planner
            Reporter: godfrey he
            Assignee: godfrey he


This issue aims to supports generating optimized logical plan for grouping sets and distinct aggregate. (mentioned in [FLINK-12076|https://issues.apache.org/jira/browse/FLINK-12076] and [FLINK-12098|https://issues.apache.org/jira/browse/FLINK-12098])

for batch, query with distinct aggregate will be rewritten into two non-distinct aggregates by extended [AggregateExpandDistinctAggregatesRule|https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/rules/AggregateExpandDistinctAggregatesRule.java], the first aggregate computes the distinct result and non-distinct aggregate function result, and the second aggregate computes the distinct aggregate function result  based on first aggregate result. The first aggregate has grouping sets if there are more than one distinct aggregate on different fields.

for stream, query with distinct aggregate is handled by SplitAggregateRule in [FLINK-12161|https://issues.apache.org/jira/browse/FLINK-12161].

query with grouping sets (or cube, rollup) will be rewritten into a regular aggregate with expand.
The expand node will duplicates the input data for each simple group.
e.g.
{noformat}
schema:
MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)

 Original records:
+-----+-----+-----+-----+
|  a  |  b  |  c  |  d  |
+-----+-----+-----+-----+
|  1  |  1  |  c1 |  d1 |
+-----+-----+-----+-----+
|  1  |  2  |  c1 |  d2 |
+-----+-----+-----+-----+
|  2  |  1  |  c1 |  d1 |
+-----+-----+-----+-----+

SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)

logical plan after expanded:
LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
    LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
        LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], b=[$1], c=[$2], $e=[2]}])
            LogicalNativeTableScan(table=[[builtin, default, MyTable]])

notes:
'$e = 1' is equivalent to 'group by a'
'$e = 2' is equivalent to 'group by c'

expanded records:
+-----+-----+-----+-----+
|  a  |  b  |  c  | $e  |
+-----+-----+-----+-----+        ---+---
|  1  |  1  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record1
| null|  1  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
|  1  |  2  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record2
| null|  2  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
|  2  |  1  | null|  1  |           |
+-----+-----+-----+-----+  records expanded by record3
| null|  1  |  c1 |  2  |           |
+-----+-----+-----+-----+        ---+---
{noformat}







--
This message was sent by Atlassian JIRA
(v7.6.3#76005)