[jira] [Created] (FLINK-20231) Sql UDTF subplan reuse on correlate

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20231) Sql UDTF subplan reuse on correlate

Shang Yuanchun (Jira)
Junning Liang created FLINK-20231:
-------------------------------------

             Summary: Sql UDTF subplan reuse on correlate
                 Key: FLINK-20231
                 URL: https://issues.apache.org/jira/browse/FLINK-20231
             Project: Flink
          Issue Type: Improvement
    Affects Versions: 1.10.1, 1.10.0
            Reporter: Junning Liang


Hi all,

 

I would like to start a discussion for subplan reuse on correlate.

when I wrote a test case for the UDTF with two sinks, I saw the relnode digest didn't reuse any except TableSourceScan. Code show as below.
{code:java}
CREATE VIEW tempTable1 as SELECT name, age, habit, length FROM sources , LATERAL TABLE(SplitStringUDTF(habits)) as T(habit, length);

INSERT INTO sinks SELECT * FROM tempTable1;
INSERT INTO sinks1 SELECT * FROM tempTable1;
{code}
And two sinks relnode digest as below.
{code:java}
Sink(name=[`default_catalog`.`default_database`.`sinks`], fields=[name, age, habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length) f1)]
   Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length)]
      Correlate(invocation=[SplitStringUDTF($cor1.habits)], correlate=[table(default_catalog.default_database.SplitStringUDTF($cor1.habits))], select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]  
          TableSourceScan(table=[[default_catalog, default_database, sources, source: [HDFSTbleSource(name, age, habits)]]], fields=[name, age, habits], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits)]{code}
{code:java}
// code placeholder
Sink(name=[`default_catalog`.`default_database`.`sinks1`], fields=[name, age, habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length) f1)]
   Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length)]
      Correlate(invocation=[SplitStringUDTF($cor2.habits)], correlate=[table(default_catalog.default_database.SplitStringUDTF($cor2.habits))], select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]
          TableSourceScan(table=[[default_catalog, default_database, sources, source: [HDFSTableSource(name, age, habits)]]], fields=[name, age, habits], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits)]
{code}
As we see, only TableSourceScan plan was reused. And I found related tests in SubplanReuseTest.scala.but it would todo since 2019.

I wish some solutions have been proposed.
{code:java}
// code placeholder
@Test def testSubplanReuseOnCorrelate(): Unit = { 
   util.addFunction("str_split", new StringSplit())
   val sqlQuery = 
     """ 
       |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) AS T(v)) 
       |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v 
     """.stripMargin 
   // TODO the sub-plan of Correlate should be reused, 
   // however the digests of Correlates are different 
   util.verifyPlan(sqlQuery) 
}
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)