godfrey he created FLINK-12321:
----------------------------------
Summary: Supports sub-plan reuse
Key: FLINK-12321
URL:
https://issues.apache.org/jira/browse/FLINK-12321 Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he
Many query plans have duplicated sub-plan, and their computing logic is identical. So we could reuse duplicated sub-plans to reduce computing.
Digest of RelNode is an identifier to distinguish RelNode, and the digest of sub-plan contains all inner-nodes' digests. we could use the digest of sub-plan to find duplicated sub-plan.
e.g.
{code:sql}
WITH r AS (SELECT a FROM x LIMIT 10)
SELECT r1.a FROM r r1, r r2 WHERE r1.a = r2.a
the physical plan after sub-plan reuse:
Calc(select=[a])
+- HashJoin(joinType=[InnerJoin], where=[=(a, a0)], select=[a, a0], isBroadcast=[true], build=[right])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a], reuse_id=[1])
: +- Limit(offset=[0], fetch=[10], global=[true])
: +- Exchange(distribution=[single])
: +- Limit(offset=[0], fetch=[10], global=[false])
: +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[broadcast])
+- Reused(reference_id=[1])
{code}
sub-plan: Calc-Limit-Exchange-Limit-TableSourceScan is reused.
For batch job, reused node might lead to a deadlock when a HashJoin or NestedLoopJoin have same reused inputs. The probe side of HashJoin could start to read data only after build side has finished. If there is no full dam (DamBehavior#FULL_DAM) operators (e.g. Sort, HashAggregate) in probe side, the data will be blocked by probe side. In this case, we could set Exchange node (if it does not exist, add new one) as BATCH mode to break up the deadlock.(The Exchange node is a full dam operator now)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)