[jira] [Created] (FLINK-12321) Supports sub-plan reuse

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12321) Supports sub-plan reuse

Shang Yuanchun (Jira)
godfrey he created FLINK-12321:
----------------------------------

             Summary: Supports sub-plan reuse
                 Key: FLINK-12321
                 URL: https://issues.apache.org/jira/browse/FLINK-12321
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: godfrey he
            Assignee: godfrey he


Many query plans have duplicated sub-plan, and their computing logic is identical. So we could reuse duplicated sub-plans to reduce computing.

Digest of RelNode is an identifier to distinguish RelNode, and the digest of sub-plan contains all inner-nodes' digests. we could use the digest of sub-plan to find duplicated sub-plan.

e.g.
{code:sql}
WITH r AS (SELECT a FROM x LIMIT 10)
SELECT r1.a FROM r r1, r r2 WHERE r1.a = r2.a

the physical plan after sub-plan reuse:
Calc(select=[a])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], isBroadcast=[true], build=[right])
   :- Exchange(distribution=[hash[a]])
   :  +- Calc(select=[a], reuse_id=[1])
   :     +- Limit(offset=[0], fetch=[10], global=[true])
   :        +- Exchange(distribution=[single])
   :           +- Limit(offset=[0], fetch=[10], global=[false])
   :              +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
   +- Exchange(distribution=[broadcast])
      +- Reused(reference_id=[1])
{code}

sub-plan: Calc-Limit-Exchange-Limit-TableSourceScan is reused.

For batch job, reused node might lead to a deadlock when a HashJoin or NestedLoopJoin have same reused inputs. The probe side of HashJoin could start to read data only after build side has finished. If there is no full dam (DamBehavior#FULL_DAM) operators (e.g. Sort, HashAggregate) in probe side, the data will be blocked by probe side. In this case, we could set Exchange node (if it does not exist, add new one) as BATCH mode to break up the deadlock.(The Exchange node is a full dam operator now)





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)