Junning Liang created FLINK-20231:
------------------------------------- Summary: Sql UDTF subplan reuse on correlate Key: FLINK-20231 URL: https://issues.apache.org/jira/browse/FLINK-20231 Project: Flink Issue Type: Improvement Affects Versions: 1.10.1, 1.10.0 Reporter: Junning Liang Hi all, I would like to start a discussion for subplan reuse on correlate. when I wrote a test case for the UDTF with two sinks, I saw the relnode digest didn't reuse any except TableSourceScan. Code show as below. {code:java} CREATE VIEW tempTable1 as SELECT name, age, habit, length FROM sources , LATERAL TABLE(SplitStringUDTF(habits)) as T(habit, length); INSERT INTO sinks SELECT * FROM tempTable1; INSERT INTO sinks1 SELECT * FROM tempTable1; {code} And two sinks relnode digest as below. {code:java} Sink(name=[`default_catalog`.`default_database`.`sinks`], fields=[name, age, habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length) f1)] Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length)] Correlate(invocation=[SplitStringUDTF($cor1.habits)], correlate=[table(default_catalog.default_database.SplitStringUDTF($cor1.habits))], select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)] TableSourceScan(table=[[default_catalog, default_database, sources, source: [HDFSTbleSource(name, age, habits)]]], fields=[name, age, habits], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits)]{code} {code:java} // code placeholder Sink(name=[`default_catalog`.`default_database`.`sinks1`], fields=[name, age, habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length) f1)] Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habit, INTEGER length)] Correlate(invocation=[SplitStringUDTF($cor2.habits)], correlate=[table(default_catalog.default_database.SplitStringUDTF($cor2.habits))], select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)] TableSourceScan(table=[[default_catalog, default_database, sources, source: [HDFSTableSource(name, age, habits)]]], fields=[name, age, habits], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647) habits)] {code} As we see, only TableSourceScan plan was reused. And I found related tests in SubplanReuseTest.scala.but it would todo since 2019. I wish some solutions have been proposed. {code:java} // code placeholder @Test def testSubplanReuseOnCorrelate(): Unit = { util.addFunction("str_split", new StringSplit()) val sqlQuery = """ |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) AS T(v)) |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v """.stripMargin // TODO the sub-plan of Correlate should be reused, // however the digests of Correlates are different util.verifyPlan(sqlQuery) } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |