[jira] [Created] (FLINK-22038) Update TopN node to be without rowNumber if rowNumber field is projected out after TopN

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22038) Update TopN node to be without rowNumber if rowNumber field is projected out after TopN

Shang Yuanchun (Jira)
Andy created FLINK-22038:
----------------------------

             Summary: Update TopN node to be without rowNumber if rowNumber field is projected out after TopN
                 Key: FLINK-22038
                 URL: https://issues.apache.org/jira/browse/FLINK-22038
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Andy
         Attachments: image-2021-03-30-16-03-09-876.png

As describe in article [sql_queries|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql/queries.html#no-ranking-output-optimization], an optimization way to improve performance of TopN is omitting rownum field in the outer SELECT clause of the Top-N query. 

However, some queries generated unexpected plan, even though we have followed the instructions in the documentation.
{code:java}
@Test
def testRowNumberFiltered(): Unit = {
  util.addDataStream[(String, Long, Long, Long)](
    "T", 'uri, 'reqcount, 'start_time, 'bucket_id)

  val sql =
    """
      |SELECT
      |  uri,
      |  reqcount,
      |  start_time
      |FROM
      |  (
      |    SELECT
      |      uri,
      |      reqcount,
      |      rownum_2,
      |      start_time
      |    FROM
      |      (
      |        SELECT
      |          uri,
      |          reqcount,
      |          start_time,
      |          ROW_NUMBER() OVER (
      |            PARTITION BY start_time
      |            ORDER BY
      |              reqcount DESC
      |          ) AS rownum_2
      |        FROM
      |          (
      |            SELECT
      |            uri,
      |            reqcount,
      |            start_time,
      |            ROW_NUMBER() OVER (
      |                PARTITION BY start_time, bucket_id
      |                ORDER BY
      |                reqcount DESC
      |            ) AS rownum_1
      |            FROM T
      |          )
      |        WHERE
      |          rownum_1 <= 100000
      |      )
      |    WHERE
      |      rownum_2 <= 100000
      |)
      |""".stripMargin
  util.verifyExecPlan(sql)
}
{code}
For example, we expect both outer and inner TopN could use without rowNumber optimization in the above queries, however inner TopN is not as we expected.

The logical plan and physical plan as following, we could find even though the rowNumber field is projected out after inner topN, inner topN still with rowNumber.

 

!image-2021-03-30-16-03-09-876.png!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)