Caizhi Weng created FLINK-19939:
----------------------------------- Summary: Remove redundant union from multiple input node Key: FLINK-19939 URL: https://issues.apache.org/jira/browse/FLINK-19939 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Caizhi Weng Fix For: 1.12.0 Consider the following SQL and the execution plan. {code:sql} WITH T1 AS (SELECT COUNT(*) AS cnt FROM x GROUP BY a), T2 AS (SELECT COUNT(*) AS cnt FROM y GROUP BY d), T3 AS (SELECT b AS cnt FROM x INNER JOIN y ON x.b = y.e) SELECT cnt FROM (SELECT cnt FROM T1) UNION ALL (SELECT cnt FROM T2) UNION ALL (SELECT cnt FROM T3) {code} {code} MultipleInputNode(readOrder=[1,0,0,0], members=[\nUnion(all=[true], union=[cnt])\n:- Union(all=[true], union=[cnt])\n: :- Calc(select=[CAST(cnt) AS cnt])\n: : +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt])\n: : +- [#3] Exchange(distribution=[hash[a]])\n: +- Calc(select=[CAST(cnt) AS cnt])\n: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0) AS cnt])\n: +- [#4] Exchange(distribution=[hash[d]])\n+- Calc(select=[b AS cnt])\n +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[b, e], build=[right])\n :- [#1] Exchange(distribution=[hash[b]])\n +- [#2] Exchange(distribution=[hash[e]])\n]) :- Exchange(distribution=[hash[b]]) : +- Calc(select=[b]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx]) :- Exchange(distribution=[hash[e]]) : +- Calc(select=[e]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny]) :- Exchange(distribution=[hash[a]]) : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0]) : +- Calc(select=[a]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx]) +- Exchange(distribution=[hash[d]]) +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS count1$0]) +- Calc(select=[d]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny]) {code} The two unions here in multiple input here is actually redundant, as the amount of data shuffled will not change even if they're moved out of the multiple input node. We should remove such redundant union from multiple input nodes. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |