[jira] [Created] (FLINK-12215) Introduce SqlProcessFunction for blink streaming runtime

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12215) Introduce SqlProcessFunction for blink streaming runtime

Shang Yuanchun (Jira)
Jark Wu created FLINK-12215:
-------------------------------

             Summary: Introduce SqlProcessFunction for blink streaming runtime
                 Key: FLINK-12215
                 URL: https://issues.apache.org/jira/browse/FLINK-12215
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / Runtime
            Reporter: Jark Wu


Currently, we are heavily using DataStream's {{KeyedProcessFunction}} when implementing Blink SQL runtime. But there are some disadvantages that lead us to introduce a SQL own ProcessFunction.

1. {{KeyedProcessFunction}} do not support {{endInput(Collector)}}.
This is needed to achieve a same semantic for batch and stream. For example: {{SELECT COUNT(\*) FROM T}} should return {{0}} when input is empty, but now there is no output result. That's why we need the {{endInput(Collector)}} to emit a final result. I know this is not a real world streaming use case, but is worth to do.

2. {{KeyedProcessFunction}} is an abstract class.
As discussed in FLINK-11409, if it is an interface it will be easy to extract some common logic to a base class and share it between ProcessFunction and CoProcessFunction and other functions. But it doesn't work when it is an abstract class. We also encountered this problem when we want to reuse some code. However, it's hard to make {{KeyedProcessFunction}} as an interface because of compatibility.

3. {{KeyedProcessFunction}} doesn't expose {{setCurrentKey}}.
We have some optimization about lazy state writing, i.e. buffer the changes in heap and flush them to state when doing snapshot. That needs to change current key of the operator/function.

That's why we want to introduce a SQL own {{ProcessFunction}} interface. Maybe we can call it {{SqlProcessFunction}}. The name can be discussed in the JIRA.

The initial idea of {{SqlPrcessFunction}}:


{code:java}
public interface SqlProcessFunction<K, I, O> extends Function {

        void processElement(I value, Context<K> ctx, Collector<O> out) throws Exception;

    void onTimer(long timestamp, OnTimerContext<K> ctx, Collector<O> out) throws Exception;
   
    void endInput(Collector<O> out) throws Exception;


        interface Context<K> {

                TimerService timerService();

        K getCurrentKey();
       
                void setCurrentKey(K key);
        }

        interface OnTimerContext<K> extends Context<K> {

                TimeDomain timeDomain();
        }

}
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)