Caizhi Weng created FLINK-19959:
-----------------------------------
Summary: Multiple input creation algorithm will deduce an incorrect input order if the inputs are related under PIPELINED shuffle mode
Key: FLINK-19959
URL:
https://issues.apache.org/jira/browse/FLINK-19959 Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Caizhi Weng
Fix For: 1.12.0
Consider the following SQL
{code:sql}
WITH
T1 AS (SELECT x.a AS a, y.d AS b FROM y LEFT JOIN x ON y.d = x.a),
T2 AS (SELECT a, b FROM (SELECT a, b FROM T1) UNION ALL (SELECT x.a AS a, x.b AS b FROM x))
SELECT * FROM T2 LEFT JOIN t ON T2.a = t.a
{code}
The multiple input creation algorithm will currently deduce the following plan under the PIPELINED shuffle mode:
{code}
MultipleInputNode(readOrder=[0,1,1,0], members=[\nNestedLoopJoin(joinType=[LeftOuterJoin], where=[=(a, a0)], select=[a, b, a0, b0, c], build=[right])\n:- Union(all=[true], union=[a, b])\n: :- Calc(select=[a, CAST(d) AS b])\n: : +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[=(d, a)], select=[d, a], build=[right])\n: : :- [#3] Calc(select=[d])\n: : +- [#4] Exchange(distribution=[broadcast])\n: +- [#2] Calc(select=[a, b])\n+- [#1] Exchange(distribution=[broadcast])\n])
:- Exchange(distribution=[broadcast])
: +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
:- Calc(select=[a, b])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx], reuse_id=[1])
:- Calc(select=[d])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
+- Exchange(distribution=[broadcast])
+- Calc(select=[a])
+- Reused(reference_id=[1])
{code}
It's obvious that the 2nd and the 4th input for the multiple input node should have the same input priority, otherwise a deadlock will occur.
This is because the current algorithm fails to consider the case when the inputs are related out of the multiple input node.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)