godfrey he created FLINK-12575:
---------------------------------- Summary: Introduce planner rules to remove redundant shuffle and collation Key: FLINK-12575 URL: https://issues.apache.org/jira/browse/FLINK-12575 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he {{Exchange}} and {{Sort}} is the most heavy operator, they are created in {{FlinkExpandConversionRule}} when some operators require its inputs to satisfy distribution trait or collation trait in planner rules. However, many operators could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or {{BatchExecHashJoin}} could provide distribution on its shuffle keys, {{BatchExecSortMergeJoin}} could provide distribution and collation on its join keys. If the provided traits could satisfy the required traits, the {{Exchange}} or the {{Sort}} is redundant. e.g. {code:sql} schema: x: a int, b bigint, c varchar y: d int, e bigint, f varchar t1: a1 int, b1 bigint, c1 varchar t2: d1 int, e1 bigint, f1 varchar sql: select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join t2 on a1 = d1 and b1 = e1 the physical plan after redundant Exchange and Sort are removed: SortMergeJoin(joinType=[LeftOuterJoin], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...) :- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true], ...) : :- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(a, d), =(b, e))], ...) : : :- Exchange(distribution=[hash[a, b]]) : : : +- TableSourceScan(table=[[x]], ...) : : +- Exchange(distribution=[hash[d, e]]) : : +- TableSourceScan(table=[[y]], ...) : +- Exchange(distribution=[hash[a1, b1]]) : +- TableSourceScan(table=[[t1]], ...) +- Exchange(distribution=[hash[d1, e1]]) +- TableSourceScan(table=[[t2]], ...) {code} In above physical plan, the {{Exchange}}s between {{SortMergeJoin}}s are redundant due to their shuffle keys are same, the {{Sort}}s in the top tow {{SortMergeJoin}}s' left hand side are redundant due to its input is sorted. another situation is the shuffle and collation could be removed between multiple {{Over}}s. e.g. {code:sql} schema: MyTable: a int, b int, c varchar sql: SELECT COUNT(*) OVER (PARTITION BY c ORDER BY a), SUM(a) OVER (PARTITION BY b ORDER BY a), RANK() OVER (PARTITION BY c ORDER BY a, c), SUM(a) OVER (PARTITION BY b ORDER BY a), COUNT(*) OVER (PARTITION BY c ORDER BY c) FROM MyTable the physical plan after redundant Exchange and Sort are removed: Calc(select=[...]) +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) AS w3$o0 RANG ...]) +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w1$o0 RANG ...], window#1=[RANK(*) AS w2$o0 RANG ...], ...) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w1$o1, $SUM0(a) AS w0$o0 RANG ...], ...) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) +- TableSourceScan(table=[[MyTable]], ...) {code} the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are redundant due to their shuffle keys and sort keys are same. -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |