Andy created FLINK-21893:
---------------------------- Summary: A ValidationException will be thrown out when deriveRowType of Rank if partition key is an expression result of Input Node. Key: FLINK-21893 URL: https://issues.apache.org/jira/browse/FLINK-21893 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: Andy A ValidationException will be thrown out when deriveRowType of Rank if partition key is an expression result of Input Node. e.g If run the following sql, A validationException will be thrown out. {code:java} //代码占位符 @Test def test(): Unit = { val data = List( (2001L, 2L), (2002L, 3L) ) val ds = failingDataSource(data).toTable(tEnv, 'video_id, 'cnt, 'proctime.proctime) tEnv.registerTable("T", ds) val sql = """ |SELECT | video_id, | cnt, | rownum_2 |FROM |( | SELECT | video_id, | cnt, | ROW_NUMBER() OVER ( | ORDER BY cnt DESC | ) AS rownum_2 | FROM | ( | SELECT | video_id, | cnt, | ROW_NUMBER() OVER ( | PARTITION BY bucket_id | ORDER BY cnt DESC | ) AS rownum_1 | FROM | ( | SELECT | video_id, | cnt, | MOD(video_id, 64) as bucket_id | FROM T | ) | ) | WHERE rownum_1 <= 1000 |) |WHERE rownum_2 <= 1000 |""".stripMargin val sink = new TestingRetractSink tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) env.execute() } {code} Exception detail {code:java} //代码占位符 org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [$2]org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [$2] at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:277) at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:158) at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:162) at org.apache.flink.table.types.logical.RowType.of(RowType.java:294) at org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:503) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:212) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.scala:53) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRank.translateToPlan(StreamExecRank.scala:53) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |