Andy created FLINK-22038:
----------------------------
Summary: Update TopN node to be without rowNumber if rowNumber field is projected out after TopN
Key: FLINK-22038
URL:
https://issues.apache.org/jira/browse/FLINK-22038 Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Andy
Attachments: image-2021-03-30-16-03-09-876.png
As describe in article [sql_queries|
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql/queries.html#no-ranking-output-optimization], an optimization way to improve performance of TopN is omitting rownum field in the outer SELECT clause of the Top-N query.
However, some queries generated unexpected plan, even though we have followed the instructions in the documentation.
{code:java}
@Test
def testRowNumberFiltered(): Unit = {
util.addDataStream[(String, Long, Long, Long)](
"T", 'uri, 'reqcount, 'start_time, 'bucket_id)
val sql =
"""
|SELECT
| uri,
| reqcount,
| start_time
|FROM
| (
| SELECT
| uri,
| reqcount,
| rownum_2,
| start_time
| FROM
| (
| SELECT
| uri,
| reqcount,
| start_time,
| ROW_NUMBER() OVER (
| PARTITION BY start_time
| ORDER BY
| reqcount DESC
| ) AS rownum_2
| FROM
| (
| SELECT
| uri,
| reqcount,
| start_time,
| ROW_NUMBER() OVER (
| PARTITION BY start_time, bucket_id
| ORDER BY
| reqcount DESC
| ) AS rownum_1
| FROM T
| )
| WHERE
| rownum_1 <= 100000
| )
| WHERE
| rownum_2 <= 100000
|)
|""".stripMargin
util.verifyExecPlan(sql)
}
{code}
For example, we expect both outer and inner TopN could use without rowNumber optimization in the above queries, however inner TopN is not as we expected.
The logical plan and physical plan as following, we could find even though the rowNumber field is projected out after inner topN, inner topN still with rowNumber.
!image-2021-03-30-16-03-09-876.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)