Andy created FLINK-20909:
----------------------------
Summary: MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
Key: FLINK-20909
URL:
https://issues.apache.org/jira/browse/FLINK-20909 Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Andy
MiniBatch Interval derivation does not work well when enable miniBatch optimization in a job which contains deduplicate on row and unbounded aggregate.
{code:java}
@Test
def testLastRowOnRowtime1(): Unit = {
val t = env.fromCollection(rowtimeTestData)
.assignTimestampsAndWatermarks(new RowtimeExtractor)
.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
tEnv.registerTable("T", t)
tEnv.executeSql(
s"""
|CREATE TABLE rowtime_sink (
| cnt BIGINT
|) WITH (
| 'connector' = 'values',
| 'sink-insert-only' = 'false',
| 'changelog-mode' = 'I,UA,D'
|)
|""".stripMargin)
val sql =
"""
|INSERT INTO rowtime_sink
|SELECT COUNT(b) FROM (
| SELECT a, b, c, rowtime
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY b ORDER BY rowtime DESC) as rowNum
| FROM T
| )
| WHERE rowNum = 1
| )
""".stripMargin
tEnv.executeSql(sql).await()
val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
}{code}
E.g for the above sql, when enable MiniBatch optimization, the optimized plan is as following.
{code:java}
Sink(table=[default_catalog.default_database.rowtime_sink], fields=[EXPR$0])
+- GlobalGroupAggregate(select=[COUNT_RETRACT(count$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalGroupAggregate(select=[COUNT_RETRACT(b) AS count$0, COUNT_RETRACT(*) AS count1$1])
+- Calc(select=[b])
+- Deduplicate(keep=[LastRow], key=[b], order=[ROWTIME])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime])
+- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+- DataStreamScan(table=[[default_catalog, default_database, T]], fields=[a, b, c, rowtime]){code}
A MiniBatchAssigner will be inserted. The behavior is weird because Deduplicate depends on rowTime, however `ProcTimeMiniBatchAssignerOperator` will send watermark every each 5 second depends on process time. For Deduplicate, the incoming watermark does not has any relationship with rowTime of incoming record, it cannot indicate rowTime of all following input records are all larger than or equals to the current incoming watermark.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)