Jark Wu created FLINK-12215:
-------------------------------
Summary: Introduce SqlProcessFunction for blink streaming runtime
Key: FLINK-12215
URL:
https://issues.apache.org/jira/browse/FLINK-12215 Project: Flink
Issue Type: New Feature
Components: Table SQL / Runtime
Reporter: Jark Wu
Currently, we are heavily using DataStream's {{KeyedProcessFunction}} when implementing Blink SQL runtime. But there are some disadvantages that lead us to introduce a SQL own ProcessFunction.
1. {{KeyedProcessFunction}} do not support {{endInput(Collector)}}.
This is needed to achieve a same semantic for batch and stream. For example: {{SELECT COUNT(\*) FROM T}} should return {{0}} when input is empty, but now there is no output result. That's why we need the {{endInput(Collector)}} to emit a final result. I know this is not a real world streaming use case, but is worth to do.
2. {{KeyedProcessFunction}} is an abstract class.
As discussed in FLINK-11409, if it is an interface it will be easy to extract some common logic to a base class and share it between ProcessFunction and CoProcessFunction and other functions. But it doesn't work when it is an abstract class. We also encountered this problem when we want to reuse some code. However, it's hard to make {{KeyedProcessFunction}} as an interface because of compatibility.
3. {{KeyedProcessFunction}} doesn't expose {{setCurrentKey}}.
We have some optimization about lazy state writing, i.e. buffer the changes in heap and flush them to state when doing snapshot. That needs to change current key of the operator/function.
That's why we want to introduce a SQL own {{ProcessFunction}} interface. Maybe we can call it {{SqlProcessFunction}}. The name can be discussed in the JIRA.
The initial idea of {{SqlPrcessFunction}}:
{code:java}
public interface SqlProcessFunction<K, I, O> extends Function {
void processElement(I value, Context<K> ctx, Collector<O> out) throws Exception;
void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<O> out) throws Exception;
void endInput(Collector<O> out) throws Exception;
interface Context<K> {
TimerService timerService();
K getCurrentKey();
void setCurrentKey(K key);
}
interface OnTimerContext<K> extends Context<K> {
TimeDomain timeDomain();
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)