[jira] [Created] (FLINK-19959) Multiple input creation algorithm will deduce an incorrect input order if the inputs are related under PIPELINED shuffle mode

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19959) Multiple input creation algorithm will deduce an incorrect input order if the inputs are related under PIPELINED shuffle mode

Shang Yuanchun (Jira)
Caizhi Weng created FLINK-19959:
-----------------------------------

             Summary: Multiple input creation algorithm will deduce an incorrect input order if the inputs are related under PIPELINED shuffle mode
                 Key: FLINK-19959
                 URL: https://issues.apache.org/jira/browse/FLINK-19959
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Caizhi Weng
             Fix For: 1.12.0


Consider the following SQL
{code:sql}
WITH
  T1 AS (SELECT x.a AS a, y.d AS b FROM y LEFT JOIN x ON y.d = x.a),
  T2 AS (SELECT a, b FROM (SELECT a, b FROM T1) UNION ALL (SELECT x.a AS a, x.b AS b FROM x))
SELECT * FROM T2 LEFT JOIN t ON T2.a = t.a
{code}

The multiple input creation algorithm will currently deduce the following plan under the PIPELINED shuffle mode:
{code}
MultipleInputNode(readOrder=[0,1,1,0], members=[\nNestedLoopJoin(joinType=[LeftOuterJoin], where=[=(a, a0)], select=[a, b, a0, b0, c], build=[right])\n:- Union(all=[true], union=[a, b])\n:  :- Calc(select=[a, CAST(d) AS b])\n:  :  +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[=(d, a)], select=[d, a], build=[right])\n:  :     :- [#3] Calc(select=[d])\n:  :     +- [#4] Exchange(distribution=[broadcast])\n:  +- [#2] Calc(select=[a, b])\n+- [#1] Exchange(distribution=[broadcast])\n])
:- Exchange(distribution=[broadcast])
:  +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
:- Calc(select=[a, b])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx], reuse_id=[1])
:- Calc(select=[d])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
+- Exchange(distribution=[broadcast])
   +- Calc(select=[a])
      +- Reused(reference_id=[1])
{code}

It's obvious that the 2nd and the 4th input for the multiple input node should have the same input priority, otherwise a deadlock will occur.

This is because the current algorithm fails to consider the case when the inputs are related out of the multiple input node.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)