[DISCUSS] Proposal: New feature of flink window UDF

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Proposal: New feature of flink window UDF

Suhan
Hi all


Currently, flink only supports limited window functions like TUMBLE, HOP, SESSION. They are very useful in many use cases but sometimes
cannot meet business requirements which need a more complicated window logic.
Here are some requirements which cannot be satisfied by the current window function and they are all keyed streams:

In ad system, we want to accumulate the show count of each ad and we need a fixed time window like 1 hour to aggregate the result. But we cannot start the ad window before some specified action of that ad happens. So each ad should have a different window range.

A more complicated scenario is that for each ad, we start the window after some specified event happens. But we need a global window to accumulative some metrics of that ad. Each time a new ad event metric comes in, the global window will be fired. Every hour, the global window will periodically be fired to notify the current status to downstream consumers even if there's no new ad events coming. The emitting result can be considered as (ad_id, window_start_time, current_time, metric_number)。Downstream consumer may be a strategy rule engine to take action according to the result. And more complicated, to avoid the OOM issue of global window count accumulating indefinitely, we can send a purge message of some special format to the stream to let the window function purge the global window if we do not need that ad anymore.


They are very classic use cases and cannot be supported purely in SQL right now. It will be very helpful to enable customized window udf and it definitely will enrich the functionality of flink SQL very much.




Public Interfaces
We need to extend current SQL semantics to support the definition of window udf.
create [TEMPORARY|TEMPORARY SYSTEM] window function [IF NOT EXISTS] [catalog_name.][db_name.]function_name AS identifier (col_name col_type, ....)

TEMPORARY
Create temporary catalog function that has catalog and database namespaces and overrides catalog functions.
TEMPORARY SYSTEM
Create temporary system function that has no namespace and overrides built-in functions
IF NOT EXISTS
If the function already exists, nothing will happen.
IDENTIFIER
The full package class name of window function.
col_name col_type...
The result columns of the window emitted.


For example:
create temporary window function ad_sum_window as 'com.examples.AdSumWindowFunction';
select 
ad_id,
ad_sum_window_window_start_time,
ad_sum_window_current_time,
ad_sum_window_sum_value as convert_cnt
from event_streams
group by
ad_id, ad_sum_window('start', convert_cnt, interval '1' hour)

ad_sum_window is a customized window function which is defined in com.examples.AdSumWindowFunction,a detailed interface design of this new architect of dunction will be introduced later. This window function will wait event_name="start" from event_streams to begin the accumulative process of convert_cnt metric. Also this window will periodically trigger every 1 hour to report the current status. We only need to know is that we can infer the produced columns of ad_sum_window in class definition. For example, ad_sum_window will produce (window_start_time timestamp, current_time timestamp, sum_value bigint), then flink will automatically produce <udf_name&gt;_<column_name&gt; in select list. This is a little tricky but in FLIP-145, some select columns like window_start and window_end are also generated from window semantics. It would be great if you have any other good ideas for this.




Proposed Changes
A new calcite syntax to create window function.
A new User-defined window function design just like SourceFunction or ScalarFunction. Below logic can be defined in it:

Customized window type

Window assigner

Trigger

Produced result format


Thanks
Suhan