[jira] [Created] (FLINK-18119) Fix unlimitedly growing state for time range bounded over aggregate

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18119) Fix unlimitedly growing state for time range bounded over aggregate

Shang Yuanchun (Jira)
Hyeonseop Lee created FLINK-18119:
-------------------------------------

             Summary: Fix unlimitedly growing state for time range bounded over aggregate
                 Key: FLINK-18119
                 URL: https://issues.apache.org/jira/browse/FLINK-18119
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Runtime
            Reporter: Hyeonseop Lee


For time range bounded over aggregation in streaming query, like below,

 
{code:java}
table
  .window(Over.partitionBy 'a orderBy 'rowtime preceding 1.hour as 'w)
  .groupBy('w)
  .select('a, aggregateFunction('b))
{code}
 

the operator must hold incoming records over the preceding time range in the state, but older records are no longer required and can be cleaned up.

Current implementation cleans the old records up only when newer records come in and so the operator knows that enough time has passed. However, the clean up never happens unless a new record with the same key comes in and this causes a state that perhaps will never be cleaned up, which leads to an unlimitedly growing state especially when the keyspace mutates over time.

Since aggregate over bounded preceding time interval doesn't require old records by its nature, we can improve this by adding a timer that notifies the operator to clean up old records, resulting in no changes in query result or severe performance degrade.

This is a distinct feature from state retention: state retention is to forget some states that are expected to be less important to reduce state memory, so it possibly changes query results. Enabling and disabling state retention both make sense with this change.

This issue applies to both row time range bound and proc time range bound. That is, we are going to have changes in both RowTimeRangeBoundedPrecedingFunction and ProcTimeRangeBoundedPrecedingFunction in flink-table-runtime-blink. I already have a running-in-production version with this change and would be glad to contribute.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)