[jira] [Created] (FLINK-22121) FlinkLogicalRankRuleBase should check if name of rankNumberType already exists in the input

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22121) FlinkLogicalRankRuleBase should check if name of rankNumberType already exists in the input

Shang Yuanchun (Jira)
Caizhi Weng created FLINK-22121:
-----------------------------------

             Summary: FlinkLogicalRankRuleBase should check if name of rankNumberType already exists in the input
                 Key: FLINK-22121
                 URL: https://issues.apache.org/jira/browse/FLINK-22121
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.13.0
            Reporter: Caizhi Weng
             Fix For: 1.13.0


Add the following test case to {{org.apache.flink.table.planner.plan.stream.sql.RankTest}} to reproduce this issue.

{code:scala}
@Test
def myTest(): Unit = {
  val sql =
    """
      |SELECT CAST(rna AS INT) AS rn1, CAST(rnb AS INT) AS rn2 FROM (
      |  SELECT *, row_number() over (partition by a order by b desc) AS rnb
      |  FROM (
      |    SELECT *, row_number() over (partition by a, c order by b desc) AS rna
      |    FROM MyTable
      |  )
      |  WHERE rna <= 100
      |)
      |WHERE rnb <= 100
      |""".stripMargin
  util.verifyExecPlan(sql)
}
{code}

The exception stack is
{code}
org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [w0$o0]

        at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272)
        at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157)
        at org.apache.flink.table.types.logical.RowType.of(RowType.java:297)
        at org.apache.flink.table.types.logical.RowType.of(RowType.java:289)
        at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:632)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank.translateToExecNode(StreamPhysicalRank.scala:117)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:54)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToExecNodeGraph(PlannerBase.scala:314)
        at org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:895)
        at org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
        at org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:583)
{code}

This is because currently names of rank fields are all {{w0$o0}}, so if the input of a Rank is another Rank the exception will occur. To solve this, we should check if name of rank field has occurred in the input in {{FlinkLogicalRankRuleBase}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)