[DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Jinkui Shi
Hi, Fabian, Shaoxuan, Yuhong

- OVER RANGE for processing time
I think your design make sense. Only considering processing time will simplify the design, make it robust.
The state will be saved in a queue, and the incoming data line will apply the given and user defined function one by one.
Do I understand right?

- OVER RANGE for event time
Sorted state is better for out-of-order message to insert. Sorted state maybe use linked list, if the state is enough huge, maybe the re-calculation will be slow, because it's not sequential memory data.
@shoxuan Do I understand right?

Thanks a lot

Best
Jinkui Shi

> On Jan 25, 2017, at 17:55, Fabian Hueske <[hidden email]> wrote:
>
> Hi everybody,
>
> thanks for the great discussions so far. It's awesome to see so much
> interest in this topic!
>
> First, I'd like to comment on the development process for this feature and
> later on the design of the runtime:
>
> Dev Process
> ----
> @Shaoxuan, I completely agree with you. We should first come up with good
> designs for the runtime operators of the different window types. Once we
> have that, we can start implementing the operators and integrate them with
> Calcite's optimization. This will be an intermediate step and as a
> byproduct give us support for SQL OVER windows. Once this is done, we can
> extend the Table API and translate the Table API calls into the same
> RelNodes as Calcite's SQL parser does.
>
> Runtime Design
> ----
> I think it makes sense to distinguish the different types of OVER windows
> because they have different requirements which result in different runtime
> implementations (with different implementation complexity and performance).
> In a previous mail I proposed to split the support for OVER windows into
> the following subtasks:
>
> # bounded PRECEDING
> - OVER ROWS for processing time
>  - does not require sorted state (data always arrives in processing time
> order)
>  - no need to consider retraction (processing time is never late)
>  - defines windows on row count.
>  - A GlobalWindow with evictor + trigger might be the best implementation
> (basically the same as DataStream.countWindow(long, long). We need to add
> timeouts to clean up state for non-used keys though.
>
> - OVER RANGE for processing time
>  - does not require sorted state (data always arrives in processing time
> order)
>  - no need to consider retraction (processing time is never late)
>  - defines windows on row count
>  - I think this could also be implemented with a GlobalWindow with evictor
> + trigger (need to verify)
>
> - OVER RANGE for event time
>  - need for sorted state (late data possible)
>  - IMO, a ProcessFunction gives us the most flexibility in adding later
> features (retraction, update rate, etc.)
>  - @Shaoxuan, you sketched a good design. Would you like to continue with
> a design proposal?
>
> # UNBOUNDED PRECEDING
> Similar considerations apply for the UNBOUNDED PRECEDING cases of the above
> window types.
>
> If we all agree that the separation into six JIRAs (bounded/unbounded *
> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> discussions about the design of the implementation to the individual JIRAs.
>
> What do think?
>
> Best, Fabian
>
> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:
>
>> Hi Liuxinchun,
>> I am not sure where did you get the inception: anyone has suggested "to
>> process Event time window in Sliding Row Window". If you were referring my
>> post, there may be some misunderstanding there. I think you were asking the
>> similar question as Hongyuhong. I have just replied to him. Please take a
>> look and let me know if that makes sense to you. "Retraction" is an
>> important building block to compute correct incremental results in
>> streaming. It is another big topic, we should discuss this in another
>> thread.
>>
>> Regards,
>> Shaoxuan
>>
>>
>>
>> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]> wrote:
>>
>>> I don't think it is a good idea to process Event time window in Sliding
>>> Row Window. In Sliding Time window, when an element is late, we can
>> trigger
>>> the recalculation of the related windows. And the sliding period is
>>> coarse-gained, We only need to recalculate size/sliding number of
>> windows.
>>> But in Sliding Row Window, the calculation is triggered when every
>> element
>>> is coming. The sliding period is becoming fine-gained. When an element is
>>> late, there are so many "windows" are influenced. Even if we store all
>> the
>>> raw data, the computation is very large.
>>>
>>> I think if it is possible to set a standard to sliding Event Time Row
>>> Window, When certain elements are late, we can only recalculate partial
>>> windows and permit some error. For example, we can only recalculate the
>>> windows end in range between (lateElement.timestamp - leftDelta,
>>> lateElement.timestamp] and those windows begin in range between
>>> [lateElement.timestamp, lateElement.timestamp + rightDelta).
>>> ////////////////////////////////////////////////////////////
>>> //////////////////////////
>>> Hi everyone,
>>> Thanks for this great discussion, and glad to see more and more people
>> are
>>> interested on stream SQL & tableAPI.
>>>
>>> IMO, the key problems for Over window design are the SQL semantics and
>> the
>>> runtime design. I totally agree with Fabian that we should skip the
>> design
>>> of TumbleRows and SessionRows windows for now, as they are not well
>> defined
>>> in SQL semantics.
>>>
>>> Runtime design is the most crucial part we are interested in and
>>> volunteered to contribute into. We have thousands of machines running
>> flink
>>> streaming jobs. The costs in terms of CPU, memory, and state are the
>> vital
>>> factors that we have to taken into account. We have been working on the
>>> design of OVER window in the past months, and planning to send out a
>>> detailed design doc to DEV quite soon. But since Fabian started a good
>>> discussion on OVER window, I would like to share our ideas/thoughts about
>>> the runtime design for OVER window.
>>>
>>>   1. As SunJincheng pointed out earlier, sliding window does not work
>> for
>>>   unbounded preceding, we need alternative approach for unbound over
>>> window.
>>>   2. Though sliding window may work for some cases of bounded window,
>>>   it is not very efficient thereby should not be used for production. To
>>> the
>>>   best of my understanding, the current runtime implementation of
>> sliding
>>>   window has not leveraged the concepts of state Panes yet. This means
>>> that
>>>   if we use sliding window for OVER window,  there will be a backend
>> state
>>>   created per each group (partition by) and each row, and whenever a new
>>>   record arrives, it will be accumulated to all the existing windows
>> that
>>> has
>>>   not been closed. This would cause quite a lot of overhead in terms of
>>> both
>>>   CPU and memory&state.
>>>   3. Fabian has mentioned an approach of leveraging “ProcessFunction”
>> and
>>>   a “sortedState”. I like this idea. The design details on this are not
>>> quite
>>>   clear yet. So I would like to add more thoughts on this. Regardless
>>>   which dataStream API we are going to use (it is very likely that we
>> need
>>>   a new API), we should come out with an optimal approach. The purpose
>> of
>>>   grouping window and over window is to partition the data, such that we
>>> can
>>>   generate the aggregate results. So when we talk about the design of
>> OVER
>>>   window, we have to think about the aggregates. As we proposed in our
>>> recent
>>>   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will
>> be
>>>   stored in the aggregate state. Besides accumulator, we have also
>>> introduced
>>>   a retract API for UDAGG. With aggregate accumulator and retract API, I
>>> am
>>>   proposing a runtime approach to implement the OVER window as
>> followings.
>>>   4.
>>>      - We first implement a sorted state interface
>>>      - Per each group, we just create one sorted state. When a new
>> record
>>>      arrives, it will insert into this sorted state, in the meanwhile it
>>> will be
>>>      accumulated to the aggregate accumulator.
>>>      - For over window, we keep the aggregate accumulator for the entire
>>>      job lifelong time. This is different than the case where we delete
>>> the
>>>      accumulator for each group/window when a grouping-window is
>> finished.
>>>      - When an over window is up to trigger, we grab the
>>>      previous accumulator from the state and accumulate values onto it
>>> with all
>>>      the records till the upperBoundary of the current window, and
>>> retract all
>>>      the out of scope records till its lowerBoundary. We emit the
>>>      aggregate result and save the accumulator for the next window.
>>>
>>>
>>> Hello Fabian,
>>> I would suggest we should first start working on runtime design of over
>>> window and aggregate. Once we have a good design there, one can easily
>> add
>>> the support for SQL as well as tableAPI. What do you think?
>>>
>>> Regards,
>>> Shaoxuan
>>>
>>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
>> wrote:
>>>
>>>> Hi Radu,
>>>>
>>>> thanks for your comments!
>>>>
>>>> Yes, my intention is to open new JIRA issues to structure the
>>>> development process. Everybody is very welcome to pick up issues and
>>>> discuss the design proposals.
>>>> At the moment I see the following six issues to start with:
>>>>
>>>> - streaming SQL OVER ROW for processing time
>>>>  - bounded PRECEDING
>>>>  - unbounded PRECEDING
>>>>
>>>> - streaming SQL OVER RANGE for processing time
>>>>  - bounded PRECEDING
>>>>  - unbounded PRECEDING
>>>>
>>>> - streaming SQL OVER RANGE for event time
>>>>  - bounded PRECEDING
>>>>  - unbounded PRECEDING
>>>>
>>>> For each of these windows we need corresponding translation rules and
>>>> execution code.
>>>>
>>>> Subsequent JIRAs would be
>>>> - extending the Table API for supported SQL windows
>>>> - add support for FOLLOWING
>>>> - etc.
>>>>
>>>> Regarding the requirement for a sorted state. I am not sure if the
>>>> OVER windows should be implemented using Flink's DataStream window
>>> framework.
>>>> We need a good design document to figure out what is the best
>>>> approach. A ProcessFunction with a sorted state might be a good
>> solution
>>> as well.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Thanks for starting these discussion - it is very useful.
>>>>> It does make sense indeed to refactor all these and coordinate a bit
>>>>> the efforts not to have overlapping implementations and incompatible
>>>> solutions.
>>>>>
>>>>> If you close the 3 jira issues you mentioned - do you plan to
>>>>> redesign them and open new ones? Do you need help from our side - we
>>>>> can also pick the redesign of some of these new jira issues. For
>>>>> example we already
>>>> have
>>>>> an implementation for this and we can help with the design.
>>>>> Nevertheless, let's coordinate the effort.
>>>>>
>>>>> Regarding the support for the different types of window - I think
>>>>> the
>>>> best
>>>>> option is to split the implementation in small units. We can easily
>>>>> do
>>>> this
>>>>> from the transformation rule class and with this each particular
>>>>> type of window (session/sliding/sliderows/processing time/...) will
>>>>> have a clear implementation and a corresponding architecture within
>>> the jira issue?
>>>> What
>>>>> do you think about such a granularity?
>>>>>
>>>>> Regarding the issue of " Q4: The implementaion of SlideRows still
>>>>> need a custom operator that collects records in a priority queue
>>>>> ordered by the "rowtime", which is similar to the design we
>>>>> discussed in FLINK-4697, right? "
>>>>> Why would you need this operator? The window buffer can act to some
>>>> extent
>>>>> as a priority queue as long as the trigger and evictor is set to
>>>>> work
>>>> based
>>>>> on the rowtime - or maybe I am missing something... Can you please
>>>> clarify
>>>>> this.
>>>>>
>>>>>
>>>>> Dr. Radu Tudoran
>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
>>>>>
>>>>>
>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>> European Research Center
>>>>> Riesstrasse 25, 80992 München
>>>>>
>>>>> E-mail: [hidden email]
>>>>> Mobile: +49 15209084330
>>>>> Telephone: +49 891588344173
>>>>>
>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>> This e-mail and its attachments contain confidential information from
>>>>> HUAWEI, which is intended only for the person or entity whose address
>>> is
>>>>> listed above. Any use of the information contained herein in any way
>>>>> (including, but not limited to, total or partial disclosure,
>>>> reproduction,
>>>>> or dissemination) by persons other than the intended recipient(s) is
>>>>> prohibited. If you receive this e-mail in error, please notify the
>>> sender
>>>>> by phone or email immediately and delete it!
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> From: Jark Wu [mailto:[hidden email]]
>>>>> Sent: Tuesday, January 24, 2017 6:53 AM
>>>>> To: [hidden email]
>>>>> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
>> Windows
>>>> for
>>>>> streaming tables
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for bringing up this discussion and the nice approach to avoid
>>>>> overlapping contributions.
>>>>>
>>>>> All of these make sense to me. But I have some questions.
>>>>>
>>>>> Q1: If I understand correctly, we will not support TumbleRows and
>>>>> SessionRows at the beginning. But maybe support them as a syntax
>> sugar
>>>> (in
>>>>> Table API) when the SlideRows is supported in the future. Right ?
>>>>>
>>>>> Q2: How to support SessionRows based on SlideRows ?  I don't get how
>> to
>>>>> partition on "gap-separated".
>>>>>
>>>>> Q3: Should we break down the approach into smaller tasks for
>> streaming
>>>>> tables and batch tables ?
>>>>>
>>>>> Q4: The implementaion of SlideRows still need a custom operator that
>>>>> collects records in a priority queue ordered by the "rowtime", which
>> is
>>>>> similar to the design we discussed in FLINK-4697, right?
>>>>>
>>>>> +1 not support for OVER ROW for event time at this point.
>>>>>
>>>>> Regards, Jark
>>>>>
>>>>>
>>>>>> 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
>>>>>>
>>>>>> Hi,
>>>>>> We are also interested in streaming sql and very willing to
>>> participate
>>>>> and contribute.
>>>>>>
>>>>>> We are now in progress and we will also contribute to calcite to
>> push
>>>>> forward the window and stream-join support.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --------------
>>>>>> Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
>>> 2017年1月24日
>>>>>> 5:55
>>>>>> Receiver: [hidden email]
>>>>>> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
>> Windows
>>>>>> for streaming tables
>>>>>>
>>>>>> Hi Haohui,
>>>>>>
>>>>>> our plan was in fact to piggy-back on Calcite and use the TUMBLE
>>>>> function [1] once is it is available (CALCITE-1345 [2]).
>>>>>> Unfortunately, this issue does not seem to be very active, so I
>> don't
>>>>> know what the progress is.
>>>>>>
>>>>>> I would suggest to move the discussion about group windows to a
>>>> separate
>>>>> thread and keep this one focused on the organization of the SQL OVER
>>>>> windows.
>>>>>>
>>>>>> Best,
>>>>>> Fabian
>>>>>>
>>>>>> [1] http://calcite.apache.org/docs/stream.html)
>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
>>>>>>
>>>>>> 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>> FLINK-4692 has added the support for tumbling window and we are
>>>>>>> excited to try it out and expose it as a SQL construct.
>>>>>>>
>>>>>>> Just curious -- what's your thought on the SQL syntax on tumbling
>>>>> window?
>>>>>>>
>>>>>>> Implementation wise it might make sense to think tumbling window
>> as
>>> a
>>>>>>> special case of the sliding window.
>>>>>>>
>>>>>>> The problem I see is that the OVER construct might be insufficient
>>> to
>>>>>>> support all the use cases of tumbling windows. For example, it
>> fails
>>>>>>> to express tumbling windows that have fractional time units (as
>>>>>>> pointed out in http://calcite.apache.org/docs/stream.html).
>>>>>>>
>>>>>>> It looks to me that the Calcite / Azure Stream Analytics have
>>>>>>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address
>> this
>>>>> issue.
>>>>>>>
>>>>>>> Do you think it is a good idea to follow the same conventions?
>> Your
>>>>>>> ideas are appreciated.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Haohui
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
>>>> wrote:
>>>>>>>
>>>>>>>> +1
>>>>>>>>
>>>>>>>> We are also quite interested in these features and would love to
>>>>>>>> participate and contribute.
>>>>>>>>
>>>>>>>> ~Haohui
>>>>>>>>
>>>>>>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]
>>>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everybody,
>>>>>>>>>
>>>>>>>>> it seems that currently several contributors are working on new
>>>>>>>>> features for the streaming Table API / SQL around row windows
>> (as
>>>>>>>>> defined in
>>>>>>>>> FLIP-11
>>>>>>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
>>> FLINK-4680,
>>>>>>>>> FLINK-5584).
>>>>>>>>> Since these efforts overlap quite a bit I spent some time
>> thinking
>>>>>>>>> about how we can approach these features and how to avoid
>>>>>>>>> overlapping contributions.
>>>>>>>>>
>>>>>>>>> The challenge here is the following. Some of the Table API row
>>>>>>>>> windows
>>>>>>> as
>>>>>>>>> defined by FLIP-11 [1] are basically SQL OVER windows while
>> other
>>>>>>>>> cannot be easily expressed as such (TumbleRows for row-count
>>>>>>>>> intervals, SessionRows).
>>>>>>>>> However, since Calcite already supports SQL OVER windows, we can
>>>>>>>>> reuse
>>>>>>> the
>>>>>>>>> optimization logic for some of the Table API row windows. I also
>>>>>>>>> thought about the semantics of the TumbleRows and SessionRows
>>>>>>>>> windows as defined in
>>>>>>>>> FLIP-11 and came to the conclusion that these are not well
>> defined
>>>>>>>>> in
>>>>>>>>> FLIP-11 and should rather be defined as SlideRows windows with a
>>>>>>>>> special PARTITION BY clause.
>>>>>>>>>
>>>>>>>>> I propose to approach SQL OVER windows and Table API row windows
>>> as
>>>>>>>>> follows:
>>>>>>>>>
>>>>>>>>> We start with three simple cases for SQL OVER windows (not Table
>>>>>>>>> API
>>>>>>> yet):
>>>>>>>>>
>>>>>>>>> * OVER RANGE for event time
>>>>>>>>> * OVER RANGE for processing time
>>>>>>>>> * OVER ROW for processing time
>>>>>>>>>
>>>>>>>>> All cases fulfill the following restrictions:
>>>>>>>>> - All aggregations in SELECT must refer to the same window.
>>>>>>>>> - PARTITION BY may not contain the rowtime attribute.
>>>>>>>>> - ORDER BY must be on rowtime attribute (for event time) or on a
>>>>>>>>> marker function that indicates processing time. Additional sort
>>>>>>>>> attributes are not supported initially.
>>>>>>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
>> "BETWEEN
>>> x
>>>>>>>>> PRECEDING AND CURRENT ROW" are supported.
>>>>>>>>>
>>>>>>>>> OVER ROW for event time cannot be easily supported. With event
>>>>>>>>> time, we may have late records which need to be injected into
>> the
>>>>>>>>> order of records.
>>>>>>>>> When
>>>>>>>>> a record in injected in to the order where a row-count window
>> has
>>>>>>> already
>>>>>>>>> been computed, this and all following windows will change. We
>>> could
>>>>>>> either
>>>>>>>>> drop the record or sent out many retraction records. I think it
>> is
>>>>>>>>> best
>>>>>>> to
>>>>>>>>> not open this can of worms at this point.
>>>>>>>>>
>>>>>>>>> The rational for all of the above restrictions is to have first
>>>>>>>>> versions of OVER windows soon.
>>>>>>>>> Once we have the above cases covered we can extend and remove
>>>>>>> limitations
>>>>>>>>> as follows:
>>>>>>>>>
>>>>>>>>> - Table API SlideRow windows (with the same restrictions as
>>> above).
>>>>>>>>> This will be mostly API work since the execution part has been
>>>> solved
>>>>> before.
>>>>>>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>>>>>>>>> - Add support for different windows in SELECT. All windows must
>> be
>>>>>>>>> partitioned and ordered in the same way.
>>>>>>>>> - Add support for additional ORDER BY attributes (besides time).
>>>>>>>>>
>>>>>>>>> As I said before, TumbleRows and SessionRows windows as in
>> FLIP-11
>>>>>>>>> are
>>>>>>> not
>>>>>>>>> well defined, IMO.
>>>>>>>>> They can be expressed as SlideRows windows with special
>>>>>>>>> partitioning (partitioning on fixed, non-overlapping time ranges
>>>>>>>>> for TumbleRows, and gap-separated, non-overlapping time ranges
>> for
>>>>>>>>> SessionRows) I would not start to work on those yet.
>>>>>>>>>
>>>>>>>>> I would like to close all related JIRA issues (FLINK-4678,
>>>>>>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
>>> development
>>>>>>>>> of these
>>>>>>> features
>>>>>>>>> as outlined above with corresponding JIRA issues.
>>>>>>>>>
>>>>>>>>> What do others think? (I cc'ed the contributors assigned to the
>>>>>>>>> above
>>>>>>> JIRA
>>>>>>>>> issues)
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>>>>>>> 11%3A+Table+API+Stream+Aggregations
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Shaoxuan Wang
In reply to this post by Fabian Hueske-2
Yes Fabian,
I will complete my design with more thorough thoughts. BTW, I think the
incremental aggregate (the key point I suggested is to eliminate state per
each window) I proposed should work for both processing time and event
time. It just does not need a sorted state for the processing time
scenarios. (Need to verify).

Regards,
Shaoxuan


On Wed, Jan 25, 2017 at 5:55 PM, Fabian Hueske <[hidden email]> wrote:

> Hi everybody,
>
> thanks for the great discussions so far. It's awesome to see so much
> interest in this topic!
>
> First, I'd like to comment on the development process for this feature and
> later on the design of the runtime:
>
> Dev Process
> ----
> @Shaoxuan, I completely agree with you. We should first come up with good
> designs for the runtime operators of the different window types. Once we
> have that, we can start implementing the operators and integrate them with
> Calcite's optimization. This will be an intermediate step and as a
> byproduct give us support for SQL OVER windows. Once this is done, we can
> extend the Table API and translate the Table API calls into the same
> RelNodes as Calcite's SQL parser does.
>
> Runtime Design
> ----
> I think it makes sense to distinguish the different types of OVER windows
> because they have different requirements which result in different runtime
> implementations (with different implementation complexity and performance).
> In a previous mail I proposed to split the support for OVER windows into
> the following subtasks:
>
> # bounded PRECEDING
> - OVER ROWS for processing time
>   - does not require sorted state (data always arrives in processing time
> order)
>   - no need to consider retraction (processing time is never late)
>   - defines windows on row count.
>   - A GlobalWindow with evictor + trigger might be the best implementation
> (basically the same as DataStream.countWindow(long, long). We need to add
> timeouts to clean up state for non-used keys though.
>
> - OVER RANGE for processing time
>   - does not require sorted state (data always arrives in processing time
> order)
>   - no need to consider retraction (processing time is never late)
>   - defines windows on row count
>   - I think this could also be implemented with a GlobalWindow with evictor
> + trigger (need to verify)
>
> - OVER RANGE for event time
>   - need for sorted state (late data possible)
>   - IMO, a ProcessFunction gives us the most flexibility in adding later
> features (retraction, update rate, etc.)
>   - @Shaoxuan, you sketched a good design. Would you like to continue with
> a design proposal?
>
> # UNBOUNDED PRECEDING
> Similar considerations apply for the UNBOUNDED PRECEDING cases of the above
> window types.
>
> If we all agree that the separation into six JIRAs (bounded/unbounded *
> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> discussions about the design of the implementation to the individual JIRAs.
>
> What do think?
>
> Best, Fabian
>
> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:
>
> > Hi Liuxinchun,
> > I am not sure where did you get the inception: anyone has suggested "to
> > process Event time window in Sliding Row Window". If you were referring
> my
> > post, there may be some misunderstanding there. I think you were asking
> the
> > similar question as Hongyuhong. I have just replied to him. Please take a
> > look and let me know if that makes sense to you. "Retraction" is an
> > important building block to compute correct incremental results in
> > streaming. It is another big topic, we should discuss this in another
> > thread.
> >
> > Regards,
> > Shaoxuan
> >
> >
> >
> > On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]>
> wrote:
> >
> > > I don't think it is a good idea to process Event time window in Sliding
> > > Row Window. In Sliding Time window, when an element is late, we can
> > trigger
> > > the recalculation of the related windows. And the sliding period is
> > > coarse-gained, We only need to recalculate size/sliding number of
> > windows.
> > > But in Sliding Row Window, the calculation is triggered when every
> > element
> > > is coming. The sliding period is becoming fine-gained. When an element
> is
> > > late, there are so many "windows" are influenced. Even if we store all
> > the
> > > raw data, the computation is very large.
> > >
> > > I think if it is possible to set a standard to sliding Event Time Row
> > > Window, When certain elements are late, we can only recalculate partial
> > > windows and permit some error. For example, we can only recalculate the
> > > windows end in range between (lateElement.timestamp - leftDelta,
> > > lateElement.timestamp] and those windows begin in range between
> > > [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > > ////////////////////////////////////////////////////////////
> > > //////////////////////////
> > >  Hi everyone,
> > > Thanks for this great discussion, and glad to see more and more people
> > are
> > > interested on stream SQL & tableAPI.
> > >
> > > IMO, the key problems for Over window design are the SQL semantics and
> > the
> > > runtime design. I totally agree with Fabian that we should skip the
> > design
> > > of TumbleRows and SessionRows windows for now, as they are not well
> > defined
> > > in SQL semantics.
> > >
> > > Runtime design is the most crucial part we are interested in and
> > > volunteered to contribute into. We have thousands of machines running
> > flink
> > > streaming jobs. The costs in terms of CPU, memory, and state are the
> > vital
> > > factors that we have to taken into account. We have been working on the
> > > design of OVER window in the past months, and planning to send out a
> > > detailed design doc to DEV quite soon. But since Fabian started a good
> > > discussion on OVER window, I would like to share our ideas/thoughts
> about
> > > the runtime design for OVER window.
> > >
> > >    1. As SunJincheng pointed out earlier, sliding window does not work
> > for
> > >    unbounded preceding, we need alternative approach for unbound over
> > > window.
> > >    2. Though sliding window may work for some cases of bounded window,
> > >    it is not very efficient thereby should not be used for production.
> To
> > > the
> > >    best of my understanding, the current runtime implementation of
> > sliding
> > >    window has not leveraged the concepts of state Panes yet. This means
> > > that
> > >    if we use sliding window for OVER window,  there will be a backend
> > state
> > >    created per each group (partition by) and each row, and whenever a
> new
> > >    record arrives, it will be accumulated to all the existing windows
> > that
> > > has
> > >    not been closed. This would cause quite a lot of overhead in terms
> of
> > > both
> > >    CPU and memory&state.
> > >    3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> > and
> > >    a “sortedState”. I like this idea. The design details on this are
> not
> > > quite
> > >    clear yet. So I would like to add more thoughts on this. Regardless
> > >    which dataStream API we are going to use (it is very likely that we
> > need
> > >    a new API), we should come out with an optimal approach. The purpose
> > of
> > >    grouping window and over window is to partition the data, such that
> we
> > > can
> > >    generate the aggregate results. So when we talk about the design of
> > OVER
> > >    window, we have to think about the aggregates. As we proposed in our
> > > recent
> > >    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will
> > be
> > >    stored in the aggregate state. Besides accumulator, we have also
> > > introduced
> > >    a retract API for UDAGG. With aggregate accumulator and retract
> API, I
> > > am
> > >    proposing a runtime approach to implement the OVER window as
> > followings.
> > >    4.
> > >       - We first implement a sorted state interface
> > >       - Per each group, we just create one sorted state. When a new
> > record
> > >       arrives, it will insert into this sorted state, in the meanwhile
> it
> > > will be
> > >       accumulated to the aggregate accumulator.
> > >       - For over window, we keep the aggregate accumulator for the
> entire
> > >       job lifelong time. This is different than the case where we
> delete
> > > the
> > >       accumulator for each group/window when a grouping-window is
> > finished.
> > >       - When an over window is up to trigger, we grab the
> > >       previous accumulator from the state and accumulate values onto it
> > > with all
> > >       the records till the upperBoundary of the current window, and
> > > retract all
> > >       the out of scope records till its lowerBoundary. We emit the
> > >       aggregate result and save the accumulator for the next window.
> > >
> > >
> > > Hello Fabian,
> > > I would suggest we should first start working on runtime design of over
> > > window and aggregate. Once we have a good design there, one can easily
> > add
> > > the support for SQL as well as tableAPI. What do you think?
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
> > wrote:
> > >
> > > > Hi Radu,
> > > >
> > > > thanks for your comments!
> > > >
> > > > Yes, my intention is to open new JIRA issues to structure the
> > > > development process. Everybody is very welcome to pick up issues and
> > > > discuss the design proposals.
> > > > At the moment I see the following six issues to start with:
> > > >
> > > > - streaming SQL OVER ROW for processing time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > - streaming SQL OVER RANGE for processing time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > - streaming SQL OVER RANGE for event time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > For each of these windows we need corresponding translation rules and
> > > > execution code.
> > > >
> > > > Subsequent JIRAs would be
> > > > - extending the Table API for supported SQL windows
> > > > - add support for FOLLOWING
> > > > - etc.
> > > >
> > > > Regarding the requirement for a sorted state. I am not sure if the
> > > > OVER windows should be implemented using Flink's DataStream window
> > > framework.
> > > > We need a good design document to figure out what is the best
> > > > approach. A ProcessFunction with a sorted state might be a good
> > solution
> > > as well.
> > > >
> > > > Best, Fabian
> > > >
> > > >
> > > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for starting these discussion - it is very useful.
> > > > > It does make sense indeed to refactor all these and coordinate a
> bit
> > > > > the efforts not to have overlapping implementations and
> incompatible
> > > > solutions.
> > > > >
> > > > > If you close the 3 jira issues you mentioned - do you plan to
> > > > > redesign them and open new ones? Do you need help from our side -
> we
> > > > > can also pick the redesign of some of these new jira issues. For
> > > > > example we already
> > > > have
> > > > > an implementation for this and we can help with the design.
> > > > > Nevertheless, let's coordinate the effort.
> > > > >
> > > > > Regarding the support for the different types of window - I think
> > > > > the
> > > > best
> > > > > option is to split the implementation in small units. We can easily
> > > > > do
> > > > this
> > > > > from the transformation rule class and with this each particular
> > > > > type of window (session/sliding/sliderows/processing time/...)
> will
> > > > > have a clear implementation and a corresponding architecture within
> > > the jira issue?
> > > > What
> > > > > do you think about such a granularity?
> > > > >
> > > > > Regarding the issue of " Q4: The implementaion of SlideRows still
> > > > > need a custom operator that collects records in a priority queue
> > > > > ordered by the "rowtime", which is similar to the design we
> > > > > discussed in FLINK-4697, right? "
> > > > > Why would you need this operator? The window buffer can act to some
> > > > extent
> > > > > as a priority queue as long as the trigger and evictor is set to
> > > > > work
> > > > based
> > > > > on the rowtime - or maybe I am missing something... Can you please
> > > > clarify
> > > > > this.
> > > > >
> > > > >
> > > > > Dr. Radu Tudoran
> > > > > Senior Research Engineer - Big Data Expert IT R&D Division
> > > > >
> > > > >
> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > European Research Center
> > > > > Riesstrasse 25, 80992 München
> > > > >
> > > > > E-mail: [hidden email]
> > > > > Mobile: +49 15209084330
> > > > > Telephone: +49 891588344173
> > > > >
> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> 56063,
> > > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> 56063,
> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > > > This e-mail and its attachments contain confidential information
> from
> > > > > HUAWEI, which is intended only for the person or entity whose
> address
> > > is
> > > > > listed above. Any use of the information contained herein in any
> way
> > > > > (including, but not limited to, total or partial disclosure,
> > > > reproduction,
> > > > > or dissemination) by persons other than the intended recipient(s)
> is
> > > > > prohibited. If you receive this e-mail in error, please notify the
> > > sender
> > > > > by phone or email immediately and delete it!
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > > From: Jark Wu [mailto:[hidden email]]
> > > > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > > > To: [hidden email]
> > > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > Windows
> > > > for
> > > > > streaming tables
> > > > >
> > > > > Hi Fabian,
> > > > >
> > > > > Thanks for bringing up this discussion and the nice approach to
> avoid
> > > > > overlapping contributions.
> > > > >
> > > > > All of these make sense to me. But I have some questions.
> > > > >
> > > > > Q1: If I understand correctly, we will not support TumbleRows and
> > > > > SessionRows at the beginning. But maybe support them as a syntax
> > sugar
> > > > (in
> > > > > Table API) when the SlideRows is supported in the future. Right ?
> > > > >
> > > > > Q2: How to support SessionRows based on SlideRows ?  I don't get
> how
> > to
> > > > > partition on "gap-separated".
> > > > >
> > > > > Q3: Should we break down the approach into smaller tasks for
> > streaming
> > > > > tables and batch tables ?
> > > > >
> > > > > Q4: The implementaion of SlideRows still need a custom operator
> that
> > > > > collects records in a priority queue ordered by the "rowtime",
> which
> > is
> > > > > similar to the design we discussed in FLINK-4697, right?
> > > > >
> > > > > +1 not support for OVER ROW for event time at this point.
> > > > >
> > > > > Regards, Jark
> > > > >
> > > > >
> > > > > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > > > > >
> > > > > > Hi,
> > > > > > We are also interested in streaming sql and very willing to
> > > participate
> > > > > and contribute.
> > > > > >
> > > > > > We are now in progress and we will also contribute to calcite to
> > push
> > > > > forward the window and stream-join support.
> > > > > >
> > > > > >
> > > > > >
> > > > > > --------------
> > > > > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> > > 2017年1月24日
> > > > > > 5:55
> > > > > > Receiver: [hidden email]
> > > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > Windows
> > > > > > for streaming tables
> > > > > >
> > > > > > Hi Haohui,
> > > > > >
> > > > > > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > > > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > > > Unfortunately, this issue does not seem to be very active, so I
> > don't
> > > > > know what the progress is.
> > > > > >
> > > > > > I would suggest to move the discussion about group windows to a
> > > > separate
> > > > > thread and keep this one focused on the organization of the SQL
> OVER
> > > > > windows.
> > > > > >
> > > > > > Best,
> > > > > > Fabian
> > > > > >
> > > > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > > > >
> > > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > > > > >
> > > > > >> Hi Fabian,
> > > > > >>
> > > > > >> FLINK-4692 has added the support for tumbling window and we are
> > > > > >> excited to try it out and expose it as a SQL construct.
> > > > > >>
> > > > > >> Just curious -- what's your thought on the SQL syntax on
> tumbling
> > > > > window?
> > > > > >>
> > > > > >> Implementation wise it might make sense to think tumbling window
> > as
> > > a
> > > > > >> special case of the sliding window.
> > > > > >>
> > > > > >> The problem I see is that the OVER construct might be
> insufficient
> > > to
> > > > > >> support all the use cases of tumbling windows. For example, it
> > fails
> > > > > >> to express tumbling windows that have fractional time units (as
> > > > > >> pointed out in http://calcite.apache.org/docs/stream.html).
> > > > > >>
> > > > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address
> > this
> > > > > issue.
> > > > > >>
> > > > > >> Do you think it is a good idea to follow the same conventions?
> > Your
> > > > > >> ideas are appreciated.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Haohui
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> > > > wrote:
> > > > > >>
> > > > > >>> +1
> > > > > >>>
> > > > > >>> We are also quite interested in these features and would love
> to
> > > > > >>> participate and contribute.
> > > > > >>>
> > > > > >>> ~Haohui
> > > > > >>>
> > > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <
> [hidden email]
> > >
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Hi everybody,
> > > > > >>>>
> > > > > >>>> it seems that currently several contributors are working on
> new
> > > > > >>>> features for the streaming Table API / SQL around row windows
> > (as
> > > > > >>>> defined in
> > > > > >>>> FLIP-11
> > > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > > FLINK-4680,
> > > > > >>>> FLINK-5584).
> > > > > >>>> Since these efforts overlap quite a bit I spent some time
> > thinking
> > > > > >>>> about how we can approach these features and how to avoid
> > > > > >>>> overlapping contributions.
> > > > > >>>>
> > > > > >>>> The challenge here is the following. Some of the Table API row
> > > > > >>>> windows
> > > > > >> as
> > > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> > other
> > > > > >>>> cannot be easily expressed as such (TumbleRows for row-count
> > > > > >>>> intervals, SessionRows).
> > > > > >>>> However, since Calcite already supports SQL OVER windows, we
> can
> > > > > >>>> reuse
> > > > > >> the
> > > > > >>>> optimization logic for some of the Table API row windows. I
> also
> > > > > >>>> thought about the semantics of the TumbleRows and SessionRows
> > > > > >>>> windows as defined in
> > > > > >>>> FLIP-11 and came to the conclusion that these are not well
> > defined
> > > > > >>>> in
> > > > > >>>> FLIP-11 and should rather be defined as SlideRows windows
> with a
> > > > > >>>> special PARTITION BY clause.
> > > > > >>>>
> > > > > >>>> I propose to approach SQL OVER windows and Table API row
> windows
> > > as
> > > > > >>>> follows:
> > > > > >>>>
> > > > > >>>> We start with three simple cases for SQL OVER windows (not
> Table
> > > > > >>>> API
> > > > > >> yet):
> > > > > >>>>
> > > > > >>>> * OVER RANGE for event time
> > > > > >>>> * OVER RANGE for processing time
> > > > > >>>> * OVER ROW for processing time
> > > > > >>>>
> > > > > >>>> All cases fulfill the following restrictions:
> > > > > >>>> - All aggregations in SELECT must refer to the same window.
> > > > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or
> on a
> > > > > >>>> marker function that indicates processing time. Additional
> sort
> > > > > >>>> attributes are not supported initially.
> > > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> > "BETWEEN
> > > x
> > > > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > > > >>>>
> > > > > >>>> OVER ROW for event time cannot be easily supported. With event
> > > > > >>>> time, we may have late records which need to be injected into
> > the
> > > > > >>>> order of records.
> > > > > >>>> When
> > > > > >>>> a record in injected in to the order where a row-count window
> > has
> > > > > >> already
> > > > > >>>> been computed, this and all following windows will change. We
> > > could
> > > > > >> either
> > > > > >>>> drop the record or sent out many retraction records. I think
> it
> > is
> > > > > >>>> best
> > > > > >> to
> > > > > >>>> not open this can of worms at this point.
> > > > > >>>>
> > > > > >>>> The rational for all of the above restrictions is to have
> first
> > > > > >>>> versions of OVER windows soon.
> > > > > >>>> Once we have the above cases covered we can extend and remove
> > > > > >> limitations
> > > > > >>>> as follows:
> > > > > >>>>
> > > > > >>>> - Table API SlideRow windows (with the same restrictions as
> > > above).
> > > > > >>>> This will be mostly API work since the execution part has been
> > > > solved
> > > > > before.
> > > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > > > >>>> - Add support for different windows in SELECT. All windows
> must
> > be
> > > > > >>>> partitioned and ordered in the same way.
> > > > > >>>> - Add support for additional ORDER BY attributes (besides
> time).
> > > > > >>>>
> > > > > >>>> As I said before, TumbleRows and SessionRows windows as in
> > FLIP-11
> > > > > >>>> are
> > > > > >> not
> > > > > >>>> well defined, IMO.
> > > > > >>>> They can be expressed as SlideRows windows with special
> > > > > >>>> partitioning (partitioning on fixed, non-overlapping time
> ranges
> > > > > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges
> > for
> > > > > >>>> SessionRows) I would not start to work on those yet.
> > > > > >>>>
> > > > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > > development
> > > > > >>>> of these
> > > > > >> features
> > > > > >>>> as outlined above with corresponding JIRA issues.
> > > > > >>>>
> > > > > >>>> What do others think? (I cc'ed the contributors assigned to
> the
> > > > > >>>> above
> > > > > >> JIRA
> > > > > >>>> issues)
> > > > > >>>>
> > > > > >>>> Best, Fabian
> > > > > >>>>
> > > > > >>>> [1]
> > > > > >>>>
> > > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > > >> 11%3A+Table+API+Stream+Aggregations
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

伍翀(云邪)
Hi Fabian,

I completely aggree with the six JIRAs and different runtime implementations.
And I also aggree with @shaoxuan's proposal can work for both processing time and event time.

Hi Shaoxuan,

I really like the idea you proposed that using retraction to decrease computation.
It's a great optimization for incremental aggregation (only one reduced value is kept).
But may not work for non-incremental aggregation (e.g. max, min, and median) which
needs to buffer all the records in the group & window, and recalculate all the records
when retraction happen. That means we will get a worse performance for non-incremental
 aggregations when using retraction optimization here.

IMO, we still need a general design for OVER window as following:

1. we buffer records in a list state (maybe sorted) for each group
2. when an over window is up to trigger, create an accumulator and accumulate all
    the records in the boundary of the list state.
3. emit the aggregate result and delete the accumulator.

And the retraction mechanism that keeps the accumulator for the whole life without deleting, could be implemented as an optimization on it for increamental aggregations.

Regards, Jark


> 在 2017年1月26日,上午11:42,Shaoxuan Wang <[hidden email]> 写道:
>
> Yes Fabian,
> I will complete my design with more thorough thoughts. BTW, I think the
> incremental aggregate (the key point I suggested is to eliminate state per
> each window) I proposed should work for both processing time and event
> time. It just does not need a sorted state for the processing time
> scenarios. (Need to verify).
>
> Regards,
> Shaoxuan
>
>
> On Wed, Jan 25, 2017 at 5:55 PM, Fabian Hueske <[hidden email]> wrote:
>
>> Hi everybody,
>>
>> thanks for the great discussions so far. It's awesome to see so much
>> interest in this topic!
>>
>> First, I'd like to comment on the development process for this feature and
>> later on the design of the runtime:
>>
>> Dev Process
>> ----
>> @Shaoxuan, I completely agree with you. We should first come up with good
>> designs for the runtime operators of the different window types. Once we
>> have that, we can start implementing the operators and integrate them with
>> Calcite's optimization. This will be an intermediate step and as a
>> byproduct give us support for SQL OVER windows. Once this is done, we can
>> extend the Table API and translate the Table API calls into the same
>> RelNodes as Calcite's SQL parser does.
>>
>> Runtime Design
>> ----
>> I think it makes sense to distinguish the different types of OVER windows
>> because they have different requirements which result in different runtime
>> implementations (with different implementation complexity and performance).
>> In a previous mail I proposed to split the support for OVER windows into
>> the following subtasks:
>>
>> # bounded PRECEDING
>> - OVER ROWS for processing time
>>  - does not require sorted state (data always arrives in processing time
>> order)
>>  - no need to consider retraction (processing time is never late)
>>  - defines windows on row count.
>>  - A GlobalWindow with evictor + trigger might be the best implementation
>> (basically the same as DataStream.countWindow(long, long). We need to add
>> timeouts to clean up state for non-used keys though.
>>
>> - OVER RANGE for processing time
>>  - does not require sorted state (data always arrives in processing time
>> order)
>>  - no need to consider retraction (processing time is never late)
>>  - defines windows on row count
>>  - I think this could also be implemented with a GlobalWindow with evictor
>> + trigger (need to verify)
>>
>> - OVER RANGE for event time
>>  - need for sorted state (late data possible)
>>  - IMO, a ProcessFunction gives us the most flexibility in adding later
>> features (retraction, update rate, etc.)
>>  - @Shaoxuan, you sketched a good design. Would you like to continue with
>> a design proposal?
>>
>> # UNBOUNDED PRECEDING
>> Similar considerations apply for the UNBOUNDED PRECEDING cases of the above
>> window types.
>>
>> If we all agree that the separation into six JIRAs (bounded/unbounded *
>> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
>> discussions about the design of the implementation to the individual JIRAs.
>>
>> What do think?
>>
>> Best, Fabian
>>
>> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:
>>
>>> Hi Liuxinchun,
>>> I am not sure where did you get the inception: anyone has suggested "to
>>> process Event time window in Sliding Row Window". If you were referring
>> my
>>> post, there may be some misunderstanding there. I think you were asking
>> the
>>> similar question as Hongyuhong. I have just replied to him. Please take a
>>> look and let me know if that makes sense to you. "Retraction" is an
>>> important building block to compute correct incremental results in
>>> streaming. It is another big topic, we should discuss this in another
>>> thread.
>>>
>>> Regards,
>>> Shaoxuan
>>>
>>>
>>>
>>> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]>
>> wrote:
>>>
>>>> I don't think it is a good idea to process Event time window in Sliding
>>>> Row Window. In Sliding Time window, when an element is late, we can
>>> trigger
>>>> the recalculation of the related windows. And the sliding period is
>>>> coarse-gained, We only need to recalculate size/sliding number of
>>> windows.
>>>> But in Sliding Row Window, the calculation is triggered when every
>>> element
>>>> is coming. The sliding period is becoming fine-gained. When an element
>> is
>>>> late, there are so many "windows" are influenced. Even if we store all
>>> the
>>>> raw data, the computation is very large.
>>>>
>>>> I think if it is possible to set a standard to sliding Event Time Row
>>>> Window, When certain elements are late, we can only recalculate partial
>>>> windows and permit some error. For example, we can only recalculate the
>>>> windows end in range between (lateElement.timestamp - leftDelta,
>>>> lateElement.timestamp] and those windows begin in range between
>>>> [lateElement.timestamp, lateElement.timestamp + rightDelta).
>>>> ////////////////////////////////////////////////////////////
>>>> //////////////////////////
>>>> Hi everyone,
>>>> Thanks for this great discussion, and glad to see more and more people
>>> are
>>>> interested on stream SQL & tableAPI.
>>>>
>>>> IMO, the key problems for Over window design are the SQL semantics and
>>> the
>>>> runtime design. I totally agree with Fabian that we should skip the
>>> design
>>>> of TumbleRows and SessionRows windows for now, as they are not well
>>> defined
>>>> in SQL semantics.
>>>>
>>>> Runtime design is the most crucial part we are interested in and
>>>> volunteered to contribute into. We have thousands of machines running
>>> flink
>>>> streaming jobs. The costs in terms of CPU, memory, and state are the
>>> vital
>>>> factors that we have to taken into account. We have been working on the
>>>> design of OVER window in the past months, and planning to send out a
>>>> detailed design doc to DEV quite soon. But since Fabian started a good
>>>> discussion on OVER window, I would like to share our ideas/thoughts
>> about
>>>> the runtime design for OVER window.
>>>>
>>>>   1. As SunJincheng pointed out earlier, sliding window does not work
>>> for
>>>>   unbounded preceding, we need alternative approach for unbound over
>>>> window.
>>>>   2. Though sliding window may work for some cases of bounded window,
>>>>   it is not very efficient thereby should not be used for production.
>> To
>>>> the
>>>>   best of my understanding, the current runtime implementation of
>>> sliding
>>>>   window has not leveraged the concepts of state Panes yet. This means
>>>> that
>>>>   if we use sliding window for OVER window,  there will be a backend
>>> state
>>>>   created per each group (partition by) and each row, and whenever a
>> new
>>>>   record arrives, it will be accumulated to all the existing windows
>>> that
>>>> has
>>>>   not been closed. This would cause quite a lot of overhead in terms
>> of
>>>> both
>>>>   CPU and memory&state.
>>>>   3. Fabian has mentioned an approach of leveraging “ProcessFunction”
>>> and
>>>>   a “sortedState”. I like this idea. The design details on this are
>> not
>>>> quite
>>>>   clear yet. So I would like to add more thoughts on this. Regardless
>>>>   which dataStream API we are going to use (it is very likely that we
>>> need
>>>>   a new API), we should come out with an optimal approach. The purpose
>>> of
>>>>   grouping window and over window is to partition the data, such that
>> we
>>>> can
>>>>   generate the aggregate results. So when we talk about the design of
>>> OVER
>>>>   window, we have to think about the aggregates. As we proposed in our
>>>> recent
>>>>   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will
>>> be
>>>>   stored in the aggregate state. Besides accumulator, we have also
>>>> introduced
>>>>   a retract API for UDAGG. With aggregate accumulator and retract
>> API, I
>>>> am
>>>>   proposing a runtime approach to implement the OVER window as
>>> followings.
>>>>   4.
>>>>      - We first implement a sorted state interface
>>>>      - Per each group, we just create one sorted state. When a new
>>> record
>>>>      arrives, it will insert into this sorted state, in the meanwhile
>> it
>>>> will be
>>>>      accumulated to the aggregate accumulator.
>>>>      - For over window, we keep the aggregate accumulator for the
>> entire
>>>>      job lifelong time. This is different than the case where we
>> delete
>>>> the
>>>>      accumulator for each group/window when a grouping-window is
>>> finished.
>>>>      - When an over window is up to trigger, we grab the
>>>>      previous accumulator from the state and accumulate values onto it
>>>> with all
>>>>      the records till the upperBoundary of the current window, and
>>>> retract all
>>>>      the out of scope records till its lowerBoundary. We emit the
>>>>      aggregate result and save the accumulator for the next window.
>>>>
>>>>
>>>> Hello Fabian,
>>>> I would suggest we should first start working on runtime design of over
>>>> window and aggregate. Once we have a good design there, one can easily
>>> add
>>>> the support for SQL as well as tableAPI. What do you think?
>>>>
>>>> Regards,
>>>> Shaoxuan
>>>>
>>>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
>>> wrote:
>>>>
>>>>> Hi Radu,
>>>>>
>>>>> thanks for your comments!
>>>>>
>>>>> Yes, my intention is to open new JIRA issues to structure the
>>>>> development process. Everybody is very welcome to pick up issues and
>>>>> discuss the design proposals.
>>>>> At the moment I see the following six issues to start with:
>>>>>
>>>>> - streaming SQL OVER ROW for processing time
>>>>>  - bounded PRECEDING
>>>>>  - unbounded PRECEDING
>>>>>
>>>>> - streaming SQL OVER RANGE for processing time
>>>>>  - bounded PRECEDING
>>>>>  - unbounded PRECEDING
>>>>>
>>>>> - streaming SQL OVER RANGE for event time
>>>>>  - bounded PRECEDING
>>>>>  - unbounded PRECEDING
>>>>>
>>>>> For each of these windows we need corresponding translation rules and
>>>>> execution code.
>>>>>
>>>>> Subsequent JIRAs would be
>>>>> - extending the Table API for supported SQL windows
>>>>> - add support for FOLLOWING
>>>>> - etc.
>>>>>
>>>>> Regarding the requirement for a sorted state. I am not sure if the
>>>>> OVER windows should be implemented using Flink's DataStream window
>>>> framework.
>>>>> We need a good design document to figure out what is the best
>>>>> approach. A ProcessFunction with a sorted state might be a good
>>> solution
>>>> as well.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>>
>>>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Thanks for starting these discussion - it is very useful.
>>>>>> It does make sense indeed to refactor all these and coordinate a
>> bit
>>>>>> the efforts not to have overlapping implementations and
>> incompatible
>>>>> solutions.
>>>>>>
>>>>>> If you close the 3 jira issues you mentioned - do you plan to
>>>>>> redesign them and open new ones? Do you need help from our side -
>> we
>>>>>> can also pick the redesign of some of these new jira issues. For
>>>>>> example we already
>>>>> have
>>>>>> an implementation for this and we can help with the design.
>>>>>> Nevertheless, let's coordinate the effort.
>>>>>>
>>>>>> Regarding the support for the different types of window - I think
>>>>>> the
>>>>> best
>>>>>> option is to split the implementation in small units. We can easily
>>>>>> do
>>>>> this
>>>>>> from the transformation rule class and with this each particular
>>>>>> type of window (session/sliding/sliderows/processing time/...)
>> will
>>>>>> have a clear implementation and a corresponding architecture within
>>>> the jira issue?
>>>>> What
>>>>>> do you think about such a granularity?
>>>>>>
>>>>>> Regarding the issue of " Q4: The implementaion of SlideRows still
>>>>>> need a custom operator that collects records in a priority queue
>>>>>> ordered by the "rowtime", which is similar to the design we
>>>>>> discussed in FLINK-4697, right? "
>>>>>> Why would you need this operator? The window buffer can act to some
>>>>> extent
>>>>>> as a priority queue as long as the trigger and evictor is set to
>>>>>> work
>>>>> based
>>>>>> on the rowtime - or maybe I am missing something... Can you please
>>>>> clarify
>>>>>> this.
>>>>>>
>>>>>>
>>>>>> Dr. Radu Tudoran
>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
>>>>>>
>>>>>>
>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>>> European Research Center
>>>>>> Riesstrasse 25, 80992 München
>>>>>>
>>>>>> E-mail: [hidden email]
>>>>>> Mobile: +49 15209084330
>>>>>> Telephone: +49 891588344173
>>>>>>
>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
>> 56063,
>>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
>> 56063,
>>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>>> This e-mail and its attachments contain confidential information
>> from
>>>>>> HUAWEI, which is intended only for the person or entity whose
>> address
>>>> is
>>>>>> listed above. Any use of the information contained herein in any
>> way
>>>>>> (including, but not limited to, total or partial disclosure,
>>>>> reproduction,
>>>>>> or dissemination) by persons other than the intended recipient(s)
>> is
>>>>>> prohibited. If you receive this e-mail in error, please notify the
>>>> sender
>>>>>> by phone or email immediately and delete it!
>>>>>>
>>>>>>
>>>>>> -----Original Message-----
>>>>>> From: Jark Wu [mailto:[hidden email]]
>>>>>> Sent: Tuesday, January 24, 2017 6:53 AM
>>>>>> To: [hidden email]
>>>>>> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
>>> Windows
>>>>> for
>>>>>> streaming tables
>>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> Thanks for bringing up this discussion and the nice approach to
>> avoid
>>>>>> overlapping contributions.
>>>>>>
>>>>>> All of these make sense to me. But I have some questions.
>>>>>>
>>>>>> Q1: If I understand correctly, we will not support TumbleRows and
>>>>>> SessionRows at the beginning. But maybe support them as a syntax
>>> sugar
>>>>> (in
>>>>>> Table API) when the SlideRows is supported in the future. Right ?
>>>>>>
>>>>>> Q2: How to support SessionRows based on SlideRows ?  I don't get
>> how
>>> to
>>>>>> partition on "gap-separated".
>>>>>>
>>>>>> Q3: Should we break down the approach into smaller tasks for
>>> streaming
>>>>>> tables and batch tables ?
>>>>>>
>>>>>> Q4: The implementaion of SlideRows still need a custom operator
>> that
>>>>>> collects records in a priority queue ordered by the "rowtime",
>> which
>>> is
>>>>>> similar to the design we discussed in FLINK-4697, right?
>>>>>>
>>>>>> +1 not support for OVER ROW for event time at this point.
>>>>>>
>>>>>> Regards, Jark
>>>>>>
>>>>>>
>>>>>>> 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
>>>>>>>
>>>>>>> Hi,
>>>>>>> We are also interested in streaming sql and very willing to
>>>> participate
>>>>>> and contribute.
>>>>>>>
>>>>>>> We are now in progress and we will also contribute to calcite to
>>> push
>>>>>> forward the window and stream-join support.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --------------
>>>>>>> Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
>>>> 2017年1月24日
>>>>>>> 5:55
>>>>>>> Receiver: [hidden email]
>>>>>>> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
>>> Windows
>>>>>>> for streaming tables
>>>>>>>
>>>>>>> Hi Haohui,
>>>>>>>
>>>>>>> our plan was in fact to piggy-back on Calcite and use the TUMBLE
>>>>>> function [1] once is it is available (CALCITE-1345 [2]).
>>>>>>> Unfortunately, this issue does not seem to be very active, so I
>>> don't
>>>>>> know what the progress is.
>>>>>>>
>>>>>>> I would suggest to move the discussion about group windows to a
>>>>> separate
>>>>>> thread and keep this one focused on the organization of the SQL
>> OVER
>>>>>> windows.
>>>>>>>
>>>>>>> Best,
>>>>>>> Fabian
>>>>>>>
>>>>>>> [1] http://calcite.apache.org/docs/stream.html)
>>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
>>>>>>>
>>>>>>> 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
>>>>>>>
>>>>>>>> Hi Fabian,
>>>>>>>>
>>>>>>>> FLINK-4692 has added the support for tumbling window and we are
>>>>>>>> excited to try it out and expose it as a SQL construct.
>>>>>>>>
>>>>>>>> Just curious -- what's your thought on the SQL syntax on
>> tumbling
>>>>>> window?
>>>>>>>>
>>>>>>>> Implementation wise it might make sense to think tumbling window
>>> as
>>>> a
>>>>>>>> special case of the sliding window.
>>>>>>>>
>>>>>>>> The problem I see is that the OVER construct might be
>> insufficient
>>>> to
>>>>>>>> support all the use cases of tumbling windows. For example, it
>>> fails
>>>>>>>> to express tumbling windows that have fractional time units (as
>>>>>>>> pointed out in http://calcite.apache.org/docs/stream.html).
>>>>>>>>
>>>>>>>> It looks to me that the Calcite / Azure Stream Analytics have
>>>>>>>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address
>>> this
>>>>>> issue.
>>>>>>>>
>>>>>>>> Do you think it is a good idea to follow the same conventions?
>>> Your
>>>>>>>> ideas are appreciated.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Haohui
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> +1
>>>>>>>>>
>>>>>>>>> We are also quite interested in these features and would love
>> to
>>>>>>>>> participate and contribute.
>>>>>>>>>
>>>>>>>>> ~Haohui
>>>>>>>>>
>>>>>>>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <
>> [hidden email]
>>>>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi everybody,
>>>>>>>>>>
>>>>>>>>>> it seems that currently several contributors are working on
>> new
>>>>>>>>>> features for the streaming Table API / SQL around row windows
>>> (as
>>>>>>>>>> defined in
>>>>>>>>>> FLIP-11
>>>>>>>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
>>>> FLINK-4680,
>>>>>>>>>> FLINK-5584).
>>>>>>>>>> Since these efforts overlap quite a bit I spent some time
>>> thinking
>>>>>>>>>> about how we can approach these features and how to avoid
>>>>>>>>>> overlapping contributions.
>>>>>>>>>>
>>>>>>>>>> The challenge here is the following. Some of the Table API row
>>>>>>>>>> windows
>>>>>>>> as
>>>>>>>>>> defined by FLIP-11 [1] are basically SQL OVER windows while
>>> other
>>>>>>>>>> cannot be easily expressed as such (TumbleRows for row-count
>>>>>>>>>> intervals, SessionRows).
>>>>>>>>>> However, since Calcite already supports SQL OVER windows, we
>> can
>>>>>>>>>> reuse
>>>>>>>> the
>>>>>>>>>> optimization logic for some of the Table API row windows. I
>> also
>>>>>>>>>> thought about the semantics of the TumbleRows and SessionRows
>>>>>>>>>> windows as defined in
>>>>>>>>>> FLIP-11 and came to the conclusion that these are not well
>>> defined
>>>>>>>>>> in
>>>>>>>>>> FLIP-11 and should rather be defined as SlideRows windows
>> with a
>>>>>>>>>> special PARTITION BY clause.
>>>>>>>>>>
>>>>>>>>>> I propose to approach SQL OVER windows and Table API row
>> windows
>>>> as
>>>>>>>>>> follows:
>>>>>>>>>>
>>>>>>>>>> We start with three simple cases for SQL OVER windows (not
>> Table
>>>>>>>>>> API
>>>>>>>> yet):
>>>>>>>>>>
>>>>>>>>>> * OVER RANGE for event time
>>>>>>>>>> * OVER RANGE for processing time
>>>>>>>>>> * OVER ROW for processing time
>>>>>>>>>>
>>>>>>>>>> All cases fulfill the following restrictions:
>>>>>>>>>> - All aggregations in SELECT must refer to the same window.
>>>>>>>>>> - PARTITION BY may not contain the rowtime attribute.
>>>>>>>>>> - ORDER BY must be on rowtime attribute (for event time) or
>> on a
>>>>>>>>>> marker function that indicates processing time. Additional
>> sort
>>>>>>>>>> attributes are not supported initially.
>>>>>>>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
>>> "BETWEEN
>>>> x
>>>>>>>>>> PRECEDING AND CURRENT ROW" are supported.
>>>>>>>>>>
>>>>>>>>>> OVER ROW for event time cannot be easily supported. With event
>>>>>>>>>> time, we may have late records which need to be injected into
>>> the
>>>>>>>>>> order of records.
>>>>>>>>>> When
>>>>>>>>>> a record in injected in to the order where a row-count window
>>> has
>>>>>>>> already
>>>>>>>>>> been computed, this and all following windows will change. We
>>>> could
>>>>>>>> either
>>>>>>>>>> drop the record or sent out many retraction records. I think
>> it
>>> is
>>>>>>>>>> best
>>>>>>>> to
>>>>>>>>>> not open this can of worms at this point.
>>>>>>>>>>
>>>>>>>>>> The rational for all of the above restrictions is to have
>> first
>>>>>>>>>> versions of OVER windows soon.
>>>>>>>>>> Once we have the above cases covered we can extend and remove
>>>>>>>> limitations
>>>>>>>>>> as follows:
>>>>>>>>>>
>>>>>>>>>> - Table API SlideRow windows (with the same restrictions as
>>>> above).
>>>>>>>>>> This will be mostly API work since the execution part has been
>>>>> solved
>>>>>> before.
>>>>>>>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>>>>>>>>>> - Add support for different windows in SELECT. All windows
>> must
>>> be
>>>>>>>>>> partitioned and ordered in the same way.
>>>>>>>>>> - Add support for additional ORDER BY attributes (besides
>> time).
>>>>>>>>>>
>>>>>>>>>> As I said before, TumbleRows and SessionRows windows as in
>>> FLIP-11
>>>>>>>>>> are
>>>>>>>> not
>>>>>>>>>> well defined, IMO.
>>>>>>>>>> They can be expressed as SlideRows windows with special
>>>>>>>>>> partitioning (partitioning on fixed, non-overlapping time
>> ranges
>>>>>>>>>> for TumbleRows, and gap-separated, non-overlapping time ranges
>>> for
>>>>>>>>>> SessionRows) I would not start to work on those yet.
>>>>>>>>>>
>>>>>>>>>> I would like to close all related JIRA issues (FLINK-4678,
>>>>>>>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
>>>> development
>>>>>>>>>> of these
>>>>>>>> features
>>>>>>>>>> as outlined above with corresponding JIRA issues.
>>>>>>>>>>
>>>>>>>>>> What do others think? (I cc'ed the contributors assigned to
>> the
>>>>>>>>>> above
>>>>>>>> JIRA
>>>>>>>>>> issues)
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>>
>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>>>>>>>> 11%3A+Table+API+Stream+Aggregations
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
Hi everybody,

I created the following JIRAs:

- FLINK-5653: processing time OVER ROWS x PRECEDING
- FLINK-5654: processing time OVER RANGE x PRECEDING
- FLINK-5655: event time OVER RANGE x PRECEDING

- FLINK-5656: processing time OVER ROWS UNBOUNDED PRECEDING
- FLINK-5657: processing time OVER RANGE UNBOUNDED PRECEDING
- FLINK-5658: event time OVER RANGE UNBOUNDED PRECEDING

Let's move the discussions about the design of the runtime operators to
these issues.

Since some of you have already started working on some of the issues, it
would be good if you could pick the ones you plan to work on.
If there are overlapping interests, it would be great to collaborate, e.g,
design, coding, testing, code review.

A few more comments:

@Shaoxuan: You are right, we can implement the processing time windows with
ProcessFunction as well. A GlobalWindow is essentially a FIFO queue of
arriving records. With custom triggers and evictors, we could implement the
functionality of the processing time OVER windows. We could ask Aljoscha
(he knows every detail of Flink's window framework) if a ProcessFunction
has more optimization potential than a GlobalWindow.

@Jark: That's a good point. We need logic to compute non-retractable
aggregation functions as well.

@Radu: So far we had very coarse grained DataStreamRelNodes (e.g.,
DataStreamAggregate implements tumbling, sliding, and session windows for
processing and event time). However, it might make sense to start
implementing more fine-grained DataStreamRelNodes.

I'll go ahead and modify the previous JIRAs about SlideRow, TumbleRow, and
SessionRow to explicitly address the Table API and how they relate to the
new JIRAs.

Best,
Fabian

2017-01-26 7:05 GMT+01:00 Jark Wu <[hidden email]>:

> Hi Fabian,
>
> I completely aggree with the six JIRAs and different runtime
> implementations.
> And I also aggree with @shaoxuan's proposal can work for both processing
> time and event time.
>
> Hi Shaoxuan,
>
> I really like the idea you proposed that using retraction to decrease
> computation.
> It's a great optimization for incremental aggregation (only one reduced
> value is kept).
> But may not work for non-incremental aggregation (e.g. max, min, and
> median) which
> needs to buffer all the records in the group & window, and recalculate all
> the records
> when retraction happen. That means we will get a worse performance for
> non-incremental
>  aggregations when using retraction optimization here.
>
> IMO, we still need a general design for OVER window as following:
>
> 1. we buffer records in a list state (maybe sorted) for each group
> 2. when an over window is up to trigger, create an accumulator and
> accumulate all
>     the records in the boundary of the list state.
> 3. emit the aggregate result and delete the accumulator.
>
> And the retraction mechanism that keeps the accumulator for the whole life
> without deleting, could be implemented as an optimization on it for
> increamental aggregations.
>
> Regards, Jark
>
>
> > 在 2017年1月26日,上午11:42,Shaoxuan Wang <[hidden email]> 写道:
> >
> > Yes Fabian,
> > I will complete my design with more thorough thoughts. BTW, I think the
> > incremental aggregate (the key point I suggested is to eliminate state
> per
> > each window) I proposed should work for both processing time and event
> > time. It just does not need a sorted state for the processing time
> > scenarios. (Need to verify).
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Wed, Jan 25, 2017 at 5:55 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> >> Hi everybody,
> >>
> >> thanks for the great discussions so far. It's awesome to see so much
> >> interest in this topic!
> >>
> >> First, I'd like to comment on the development process for this feature
> and
> >> later on the design of the runtime:
> >>
> >> Dev Process
> >> ----
> >> @Shaoxuan, I completely agree with you. We should first come up with
> good
> >> designs for the runtime operators of the different window types. Once we
> >> have that, we can start implementing the operators and integrate them
> with
> >> Calcite's optimization. This will be an intermediate step and as a
> >> byproduct give us support for SQL OVER windows. Once this is done, we
> can
> >> extend the Table API and translate the Table API calls into the same
> >> RelNodes as Calcite's SQL parser does.
> >>
> >> Runtime Design
> >> ----
> >> I think it makes sense to distinguish the different types of OVER
> windows
> >> because they have different requirements which result in different
> runtime
> >> implementations (with different implementation complexity and
> performance).
> >> In a previous mail I proposed to split the support for OVER windows into
> >> the following subtasks:
> >>
> >> # bounded PRECEDING
> >> - OVER ROWS for processing time
> >>  - does not require sorted state (data always arrives in processing time
> >> order)
> >>  - no need to consider retraction (processing time is never late)
> >>  - defines windows on row count.
> >>  - A GlobalWindow with evictor + trigger might be the best
> implementation
> >> (basically the same as DataStream.countWindow(long, long). We need to
> add
> >> timeouts to clean up state for non-used keys though.
> >>
> >> - OVER RANGE for processing time
> >>  - does not require sorted state (data always arrives in processing time
> >> order)
> >>  - no need to consider retraction (processing time is never late)
> >>  - defines windows on row count
> >>  - I think this could also be implemented with a GlobalWindow with
> evictor
> >> + trigger (need to verify)
> >>
> >> - OVER RANGE for event time
> >>  - need for sorted state (late data possible)
> >>  - IMO, a ProcessFunction gives us the most flexibility in adding later
> >> features (retraction, update rate, etc.)
> >>  - @Shaoxuan, you sketched a good design. Would you like to continue
> with
> >> a design proposal?
> >>
> >> # UNBOUNDED PRECEDING
> >> Similar considerations apply for the UNBOUNDED PRECEDING cases of the
> above
> >> window types.
> >>
> >> If we all agree that the separation into six JIRAs (bounded/unbounded *
> >> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> >> discussions about the design of the implementation to the individual
> JIRAs.
> >>
> >> What do think?
> >>
> >> Best, Fabian
> >>
> >> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:
> >>
> >>> Hi Liuxinchun,
> >>> I am not sure where did you get the inception: anyone has suggested "to
> >>> process Event time window in Sliding Row Window". If you were referring
> >> my
> >>> post, there may be some misunderstanding there. I think you were asking
> >> the
> >>> similar question as Hongyuhong. I have just replied to him. Please
> take a
> >>> look and let me know if that makes sense to you. "Retraction" is an
> >>> important building block to compute correct incremental results in
> >>> streaming. It is another big topic, we should discuss this in another
> >>> thread.
> >>>
> >>> Regards,
> >>> Shaoxuan
> >>>
> >>>
> >>>
> >>> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]>
> >> wrote:
> >>>
> >>>> I don't think it is a good idea to process Event time window in
> Sliding
> >>>> Row Window. In Sliding Time window, when an element is late, we can
> >>> trigger
> >>>> the recalculation of the related windows. And the sliding period is
> >>>> coarse-gained, We only need to recalculate size/sliding number of
> >>> windows.
> >>>> But in Sliding Row Window, the calculation is triggered when every
> >>> element
> >>>> is coming. The sliding period is becoming fine-gained. When an element
> >> is
> >>>> late, there are so many "windows" are influenced. Even if we store all
> >>> the
> >>>> raw data, the computation is very large.
> >>>>
> >>>> I think if it is possible to set a standard to sliding Event Time Row
> >>>> Window, When certain elements are late, we can only recalculate
> partial
> >>>> windows and permit some error. For example, we can only recalculate
> the
> >>>> windows end in range between (lateElement.timestamp - leftDelta,
> >>>> lateElement.timestamp] and those windows begin in range between
> >>>> [lateElement.timestamp, lateElement.timestamp + rightDelta).
> >>>> ////////////////////////////////////////////////////////////
> >>>> //////////////////////////
> >>>> Hi everyone,
> >>>> Thanks for this great discussion, and glad to see more and more people
> >>> are
> >>>> interested on stream SQL & tableAPI.
> >>>>
> >>>> IMO, the key problems for Over window design are the SQL semantics and
> >>> the
> >>>> runtime design. I totally agree with Fabian that we should skip the
> >>> design
> >>>> of TumbleRows and SessionRows windows for now, as they are not well
> >>> defined
> >>>> in SQL semantics.
> >>>>
> >>>> Runtime design is the most crucial part we are interested in and
> >>>> volunteered to contribute into. We have thousands of machines running
> >>> flink
> >>>> streaming jobs. The costs in terms of CPU, memory, and state are the
> >>> vital
> >>>> factors that we have to taken into account. We have been working on
> the
> >>>> design of OVER window in the past months, and planning to send out a
> >>>> detailed design doc to DEV quite soon. But since Fabian started a good
> >>>> discussion on OVER window, I would like to share our ideas/thoughts
> >> about
> >>>> the runtime design for OVER window.
> >>>>
> >>>>   1. As SunJincheng pointed out earlier, sliding window does not work
> >>> for
> >>>>   unbounded preceding, we need alternative approach for unbound over
> >>>> window.
> >>>>   2. Though sliding window may work for some cases of bounded window,
> >>>>   it is not very efficient thereby should not be used for production.
> >> To
> >>>> the
> >>>>   best of my understanding, the current runtime implementation of
> >>> sliding
> >>>>   window has not leveraged the concepts of state Panes yet. This means
> >>>> that
> >>>>   if we use sliding window for OVER window,  there will be a backend
> >>> state
> >>>>   created per each group (partition by) and each row, and whenever a
> >> new
> >>>>   record arrives, it will be accumulated to all the existing windows
> >>> that
> >>>> has
> >>>>   not been closed. This would cause quite a lot of overhead in terms
> >> of
> >>>> both
> >>>>   CPU and memory&state.
> >>>>   3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> >>> and
> >>>>   a “sortedState”. I like this idea. The design details on this are
> >> not
> >>>> quite
> >>>>   clear yet. So I would like to add more thoughts on this. Regardless
> >>>>   which dataStream API we are going to use (it is very likely that we
> >>> need
> >>>>   a new API), we should come out with an optimal approach. The purpose
> >>> of
> >>>>   grouping window and over window is to partition the data, such that
> >> we
> >>>> can
> >>>>   generate the aggregate results. So when we talk about the design of
> >>> OVER
> >>>>   window, we have to think about the aggregates. As we proposed in our
> >>>> recent
> >>>>   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will
> >>> be
> >>>>   stored in the aggregate state. Besides accumulator, we have also
> >>>> introduced
> >>>>   a retract API for UDAGG. With aggregate accumulator and retract
> >> API, I
> >>>> am
> >>>>   proposing a runtime approach to implement the OVER window as
> >>> followings.
> >>>>   4.
> >>>>      - We first implement a sorted state interface
> >>>>      - Per each group, we just create one sorted state. When a new
> >>> record
> >>>>      arrives, it will insert into this sorted state, in the meanwhile
> >> it
> >>>> will be
> >>>>      accumulated to the aggregate accumulator.
> >>>>      - For over window, we keep the aggregate accumulator for the
> >> entire
> >>>>      job lifelong time. This is different than the case where we
> >> delete
> >>>> the
> >>>>      accumulator for each group/window when a grouping-window is
> >>> finished.
> >>>>      - When an over window is up to trigger, we grab the
> >>>>      previous accumulator from the state and accumulate values onto it
> >>>> with all
> >>>>      the records till the upperBoundary of the current window, and
> >>>> retract all
> >>>>      the out of scope records till its lowerBoundary. We emit the
> >>>>      aggregate result and save the accumulator for the next window.
> >>>>
> >>>>
> >>>> Hello Fabian,
> >>>> I would suggest we should first start working on runtime design of
> over
> >>>> window and aggregate. Once we have a good design there, one can easily
> >>> add
> >>>> the support for SQL as well as tableAPI. What do you think?
> >>>>
> >>>> Regards,
> >>>> Shaoxuan
> >>>>
> >>>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
> >>> wrote:
> >>>>
> >>>>> Hi Radu,
> >>>>>
> >>>>> thanks for your comments!
> >>>>>
> >>>>> Yes, my intention is to open new JIRA issues to structure the
> >>>>> development process. Everybody is very welcome to pick up issues and
> >>>>> discuss the design proposals.
> >>>>> At the moment I see the following six issues to start with:
> >>>>>
> >>>>> - streaming SQL OVER ROW for processing time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> - streaming SQL OVER RANGE for processing time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> - streaming SQL OVER RANGE for event time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> For each of these windows we need corresponding translation rules and
> >>>>> execution code.
> >>>>>
> >>>>> Subsequent JIRAs would be
> >>>>> - extending the Table API for supported SQL windows
> >>>>> - add support for FOLLOWING
> >>>>> - etc.
> >>>>>
> >>>>> Regarding the requirement for a sorted state. I am not sure if the
> >>>>> OVER windows should be implemented using Flink's DataStream window
> >>>> framework.
> >>>>> We need a good design document to figure out what is the best
> >>>>> approach. A ProcessFunction with a sorted state might be a good
> >>> solution
> >>>> as well.
> >>>>>
> >>>>> Best, Fabian
> >>>>>
> >>>>>
> >>>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> Thanks for starting these discussion - it is very useful.
> >>>>>> It does make sense indeed to refactor all these and coordinate a
> >> bit
> >>>>>> the efforts not to have overlapping implementations and
> >> incompatible
> >>>>> solutions.
> >>>>>>
> >>>>>> If you close the 3 jira issues you mentioned - do you plan to
> >>>>>> redesign them and open new ones? Do you need help from our side -
> >> we
> >>>>>> can also pick the redesign of some of these new jira issues. For
> >>>>>> example we already
> >>>>> have
> >>>>>> an implementation for this and we can help with the design.
> >>>>>> Nevertheless, let's coordinate the effort.
> >>>>>>
> >>>>>> Regarding the support for the different types of window - I think
> >>>>>> the
> >>>>> best
> >>>>>> option is to split the implementation in small units. We can easily
> >>>>>> do
> >>>>> this
> >>>>>> from the transformation rule class and with this each particular
> >>>>>> type of window (session/sliding/sliderows/processing time/...)
> >> will
> >>>>>> have a clear implementation and a corresponding architecture within
> >>>> the jira issue?
> >>>>> What
> >>>>>> do you think about such a granularity?
> >>>>>>
> >>>>>> Regarding the issue of " Q4: The implementaion of SlideRows still
> >>>>>> need a custom operator that collects records in a priority queue
> >>>>>> ordered by the "rowtime", which is similar to the design we
> >>>>>> discussed in FLINK-4697, right? "
> >>>>>> Why would you need this operator? The window buffer can act to some
> >>>>> extent
> >>>>>> as a priority queue as long as the trigger and evictor is set to
> >>>>>> work
> >>>>> based
> >>>>>> on the rowtime - or maybe I am missing something... Can you please
> >>>>> clarify
> >>>>>> this.
> >>>>>>
> >>>>>>
> >>>>>> Dr. Radu Tudoran
> >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> >>>>>>
> >>>>>>
> >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>>>>> European Research Center
> >>>>>> Riesstrasse 25, 80992 München
> >>>>>>
> >>>>>> E-mail: [hidden email]
> >>>>>> Mobile: +49 15209084330
> >>>>>> Telephone: +49 891588344173
> >>>>>>
> >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> >>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> >> 56063,
> >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> >> 56063,
> >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>>>>> This e-mail and its attachments contain confidential information
> >> from
> >>>>>> HUAWEI, which is intended only for the person or entity whose
> >> address
> >>>> is
> >>>>>> listed above. Any use of the information contained herein in any
> >> way
> >>>>>> (including, but not limited to, total or partial disclosure,
> >>>>> reproduction,
> >>>>>> or dissemination) by persons other than the intended recipient(s)
> >> is
> >>>>>> prohibited. If you receive this e-mail in error, please notify the
> >>>> sender
> >>>>>> by phone or email immediately and delete it!
> >>>>>>
> >>>>>>
> >>>>>> -----Original Message-----
> >>>>>> From: Jark Wu [mailto:[hidden email]]
> >>>>>> Sent: Tuesday, January 24, 2017 6:53 AM
> >>>>>> To: [hidden email]
> >>>>>> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> >>> Windows
> >>>>> for
> >>>>>> streaming tables
> >>>>>>
> >>>>>> Hi Fabian,
> >>>>>>
> >>>>>> Thanks for bringing up this discussion and the nice approach to
> >> avoid
> >>>>>> overlapping contributions.
> >>>>>>
> >>>>>> All of these make sense to me. But I have some questions.
> >>>>>>
> >>>>>> Q1: If I understand correctly, we will not support TumbleRows and
> >>>>>> SessionRows at the beginning. But maybe support them as a syntax
> >>> sugar
> >>>>> (in
> >>>>>> Table API) when the SlideRows is supported in the future. Right ?
> >>>>>>
> >>>>>> Q2: How to support SessionRows based on SlideRows ?  I don't get
> >> how
> >>> to
> >>>>>> partition on "gap-separated".
> >>>>>>
> >>>>>> Q3: Should we break down the approach into smaller tasks for
> >>> streaming
> >>>>>> tables and batch tables ?
> >>>>>>
> >>>>>> Q4: The implementaion of SlideRows still need a custom operator
> >> that
> >>>>>> collects records in a priority queue ordered by the "rowtime",
> >> which
> >>> is
> >>>>>> similar to the design we discussed in FLINK-4697, right?
> >>>>>>
> >>>>>> +1 not support for OVER ROW for event time at this point.
> >>>>>>
> >>>>>> Regards, Jark
> >>>>>>
> >>>>>>
> >>>>>>> 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>> We are also interested in streaming sql and very willing to
> >>>> participate
> >>>>>> and contribute.
> >>>>>>>
> >>>>>>> We are now in progress and we will also contribute to calcite to
> >>> push
> >>>>>> forward the window and stream-join support.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --------------
> >>>>>>> Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> >>>> 2017年1月24日
> >>>>>>> 5:55
> >>>>>>> Receiver: [hidden email]
> >>>>>>> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> >>> Windows
> >>>>>>> for streaming tables
> >>>>>>>
> >>>>>>> Hi Haohui,
> >>>>>>>
> >>>>>>> our plan was in fact to piggy-back on Calcite and use the TUMBLE
> >>>>>> function [1] once is it is available (CALCITE-1345 [2]).
> >>>>>>> Unfortunately, this issue does not seem to be very active, so I
> >>> don't
> >>>>>> know what the progress is.
> >>>>>>>
> >>>>>>> I would suggest to move the discussion about group windows to a
> >>>>> separate
> >>>>>> thread and keep this one focused on the organization of the SQL
> >> OVER
> >>>>>> windows.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Fabian
> >>>>>>>
> >>>>>>> [1] http://calcite.apache.org/docs/stream.html)
> >>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
> >>>>>>>
> >>>>>>> 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> >>>>>>>
> >>>>>>>> Hi Fabian,
> >>>>>>>>
> >>>>>>>> FLINK-4692 has added the support for tumbling window and we are
> >>>>>>>> excited to try it out and expose it as a SQL construct.
> >>>>>>>>
> >>>>>>>> Just curious -- what's your thought on the SQL syntax on
> >> tumbling
> >>>>>> window?
> >>>>>>>>
> >>>>>>>> Implementation wise it might make sense to think tumbling window
> >>> as
> >>>> a
> >>>>>>>> special case of the sliding window.
> >>>>>>>>
> >>>>>>>> The problem I see is that the OVER construct might be
> >> insufficient
> >>>> to
> >>>>>>>> support all the use cases of tumbling windows. For example, it
> >>> fails
> >>>>>>>> to express tumbling windows that have fractional time units (as
> >>>>>>>> pointed out in http://calcite.apache.org/docs/stream.html).
> >>>>>>>>
> >>>>>>>> It looks to me that the Calcite / Azure Stream Analytics have
> >>>>>>>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address
> >>> this
> >>>>>> issue.
> >>>>>>>>
> >>>>>>>> Do you think it is a good idea to follow the same conventions?
> >>> Your
> >>>>>>>> ideas are appreciated.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Haohui
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> +1
> >>>>>>>>>
> >>>>>>>>> We are also quite interested in these features and would love
> >> to
> >>>>>>>>> participate and contribute.
> >>>>>>>>>
> >>>>>>>>> ~Haohui
> >>>>>>>>>
> >>>>>>>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <
> >> [hidden email]
> >>>>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi everybody,
> >>>>>>>>>>
> >>>>>>>>>> it seems that currently several contributors are working on
> >> new
> >>>>>>>>>> features for the streaming Table API / SQL around row windows
> >>> (as
> >>>>>>>>>> defined in
> >>>>>>>>>> FLIP-11
> >>>>>>>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> >>>> FLINK-4680,
> >>>>>>>>>> FLINK-5584).
> >>>>>>>>>> Since these efforts overlap quite a bit I spent some time
> >>> thinking
> >>>>>>>>>> about how we can approach these features and how to avoid
> >>>>>>>>>> overlapping contributions.
> >>>>>>>>>>
> >>>>>>>>>> The challenge here is the following. Some of the Table API row
> >>>>>>>>>> windows
> >>>>>>>> as
> >>>>>>>>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> >>> other
> >>>>>>>>>> cannot be easily expressed as such (TumbleRows for row-count
> >>>>>>>>>> intervals, SessionRows).
> >>>>>>>>>> However, since Calcite already supports SQL OVER windows, we
> >> can
> >>>>>>>>>> reuse
> >>>>>>>> the
> >>>>>>>>>> optimization logic for some of the Table API row windows. I
> >> also
> >>>>>>>>>> thought about the semantics of the TumbleRows and SessionRows
> >>>>>>>>>> windows as defined in
> >>>>>>>>>> FLIP-11 and came to the conclusion that these are not well
> >>> defined
> >>>>>>>>>> in
> >>>>>>>>>> FLIP-11 and should rather be defined as SlideRows windows
> >> with a
> >>>>>>>>>> special PARTITION BY clause.
> >>>>>>>>>>
> >>>>>>>>>> I propose to approach SQL OVER windows and Table API row
> >> windows
> >>>> as
> >>>>>>>>>> follows:
> >>>>>>>>>>
> >>>>>>>>>> We start with three simple cases for SQL OVER windows (not
> >> Table
> >>>>>>>>>> API
> >>>>>>>> yet):
> >>>>>>>>>>
> >>>>>>>>>> * OVER RANGE for event time
> >>>>>>>>>> * OVER RANGE for processing time
> >>>>>>>>>> * OVER ROW for processing time
> >>>>>>>>>>
> >>>>>>>>>> All cases fulfill the following restrictions:
> >>>>>>>>>> - All aggregations in SELECT must refer to the same window.
> >>>>>>>>>> - PARTITION BY may not contain the rowtime attribute.
> >>>>>>>>>> - ORDER BY must be on rowtime attribute (for event time) or
> >> on a
> >>>>>>>>>> marker function that indicates processing time. Additional
> >> sort
> >>>>>>>>>> attributes are not supported initially.
> >>>>>>>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> >>> "BETWEEN
> >>>> x
> >>>>>>>>>> PRECEDING AND CURRENT ROW" are supported.
> >>>>>>>>>>
> >>>>>>>>>> OVER ROW for event time cannot be easily supported. With event
> >>>>>>>>>> time, we may have late records which need to be injected into
> >>> the
> >>>>>>>>>> order of records.
> >>>>>>>>>> When
> >>>>>>>>>> a record in injected in to the order where a row-count window
> >>> has
> >>>>>>>> already
> >>>>>>>>>> been computed, this and all following windows will change. We
> >>>> could
> >>>>>>>> either
> >>>>>>>>>> drop the record or sent out many retraction records. I think
> >> it
> >>> is
> >>>>>>>>>> best
> >>>>>>>> to
> >>>>>>>>>> not open this can of worms at this point.
> >>>>>>>>>>
> >>>>>>>>>> The rational for all of the above restrictions is to have
> >> first
> >>>>>>>>>> versions of OVER windows soon.
> >>>>>>>>>> Once we have the above cases covered we can extend and remove
> >>>>>>>> limitations
> >>>>>>>>>> as follows:
> >>>>>>>>>>
> >>>>>>>>>> - Table API SlideRow windows (with the same restrictions as
> >>>> above).
> >>>>>>>>>> This will be mostly API work since the execution part has been
> >>>>> solved
> >>>>>> before.
> >>>>>>>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> >>>>>>>>>> - Add support for different windows in SELECT. All windows
> >> must
> >>> be
> >>>>>>>>>> partitioned and ordered in the same way.
> >>>>>>>>>> - Add support for additional ORDER BY attributes (besides
> >> time).
> >>>>>>>>>>
> >>>>>>>>>> As I said before, TumbleRows and SessionRows windows as in
> >>> FLIP-11
> >>>>>>>>>> are
> >>>>>>>> not
> >>>>>>>>>> well defined, IMO.
> >>>>>>>>>> They can be expressed as SlideRows windows with special
> >>>>>>>>>> partitioning (partitioning on fixed, non-overlapping time
> >> ranges
> >>>>>>>>>> for TumbleRows, and gap-separated, non-overlapping time ranges
> >>> for
> >>>>>>>>>> SessionRows) I would not start to work on those yet.
> >>>>>>>>>>
> >>>>>>>>>> I would like to close all related JIRA issues (FLINK-4678,
> >>>>>>>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> >>>> development
> >>>>>>>>>> of these
> >>>>>>>> features
> >>>>>>>>>> as outlined above with corresponding JIRA issues.
> >>>>>>>>>>
> >>>>>>>>>> What do others think? (I cc'ed the contributors assigned to
> >> the
> >>>>>>>>>> above
> >>>>>>>> JIRA
> >>>>>>>>>> issues)
> >>>>>>>>>>
> >>>>>>>>>> Best, Fabian
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >>>>>>>> 11%3A+Table+API+Stream+Aggregations
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

RE: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Radu Tudoran
Thanks for this redesign Fabian,


I am interested in "- FLINK-5654: processing time OVER RANGE x PRECEDING"

However, I though the issue number is
https://issues.apache.org/jira/browse/FLINK-5654
am I wrong?

As you proposed  I will move the discussion about your remark in the comment section for this issue.


-----Original Message-----
From: Fabian Hueske [mailto:[hidden email]]
Sent: Thursday, January 26, 2017 3:14 PM
To: [hidden email]
Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Hi everybody,

I created the following JIRAs:

- FLINK-5653: processing time OVER ROWS x PRECEDING
- FLINK-5654: processing time OVER RANGE x PRECEDING
- FLINK-5655: event time OVER RANGE x PRECEDING

- FLINK-5656: processing time OVER ROWS UNBOUNDED PRECEDING
- FLINK-5657: processing time OVER RANGE UNBOUNDED PRECEDING
- FLINK-5658: event time OVER RANGE UNBOUNDED PRECEDING

Let's move the discussions about the design of the runtime operators to these issues.

Since some of you have already started working on some of the issues, it would be good if you could pick the ones you plan to work on.
If there are overlapping interests, it would be great to collaborate, e.g, design, coding, testing, code review.

A few more comments:

@Shaoxuan: You are right, we can implement the processing time windows with ProcessFunction as well. A GlobalWindow is essentially a FIFO queue of arriving records. With custom triggers and evictors, we could implement the functionality of the processing time OVER windows. We could ask Aljoscha (he knows every detail of Flink's window framework) if a ProcessFunction has more optimization potential than a GlobalWindow.

@Jark: That's a good point. We need logic to compute non-retractable aggregation functions as well.

@Radu: So far we had very coarse grained DataStreamRelNodes (e.g., DataStreamAggregate implements tumbling, sliding, and session windows for processing and event time). However, it might make sense to start implementing more fine-grained DataStreamRelNodes.

I'll go ahead and modify the previous JIRAs about SlideRow, TumbleRow, and SessionRow to explicitly address the Table API and how they relate to the new JIRAs.

Best,
Fabian

2017-01-26 7:05 GMT+01:00 Jark Wu <[hidden email]>:

> Hi Fabian,
>
> I completely aggree with the six JIRAs and different runtime
> implementations.
> And I also aggree with @shaoxuan's proposal can work for both
> processing time and event time.
>
> Hi Shaoxuan,
>
> I really like the idea you proposed that using retraction to decrease
> computation.
> It's a great optimization for incremental aggregation (only one
> reduced value is kept).
> But may not work for non-incremental aggregation (e.g. max, min, and
> median) which
> needs to buffer all the records in the group & window, and recalculate
> all the records when retraction happen. That means we will get a worse
> performance for non-incremental  aggregations when using retraction
> optimization here.
>
> IMO, we still need a general design for OVER window as following:
>
> 1. we buffer records in a list state (maybe sorted) for each group 2.
> when an over window is up to trigger, create an accumulator and
> accumulate all
>     the records in the boundary of the list state.
> 3. emit the aggregate result and delete the accumulator.
>
> And the retraction mechanism that keeps the accumulator for the whole
> life without deleting, could be implemented as an optimization on it
> for increamental aggregations.
>
> Regards, Jark
>
>
> > 在 2017年1月26日,上午11:42,Shaoxuan Wang <[hidden email]> 写道:
> >
> > Yes Fabian,
> > I will complete my design with more thorough thoughts. BTW, I think
> > the incremental aggregate (the key point I suggested is to eliminate
> > state
> per
> > each window) I proposed should work for both processing time and
> > event time. It just does not need a sorted state for the processing
> > time scenarios. (Need to verify).
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Wed, Jan 25, 2017 at 5:55 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> >> Hi everybody,
> >>
> >> thanks for the great discussions so far. It's awesome to see so
> >> much interest in this topic!
> >>
> >> First, I'd like to comment on the development process for this
> >> feature
> and
> >> later on the design of the runtime:
> >>
> >> Dev Process
> >> ----
> >> @Shaoxuan, I completely agree with you. We should first come up
> >> with
> good
> >> designs for the runtime operators of the different window types.
> >> Once we have that, we can start implementing the operators and
> >> integrate them
> with
> >> Calcite's optimization. This will be an intermediate step and as a
> >> byproduct give us support for SQL OVER windows. Once this is done,
> >> we
> can
> >> extend the Table API and translate the Table API calls into the
> >> same RelNodes as Calcite's SQL parser does.
> >>
> >> Runtime Design
> >> ----
> >> I think it makes sense to distinguish the different types of OVER
> windows
> >> because they have different requirements which result in different
> runtime
> >> implementations (with different implementation complexity and
> performance).
> >> In a previous mail I proposed to split the support for OVER windows
> >> into the following subtasks:
> >>
> >> # bounded PRECEDING
> >> - OVER ROWS for processing time
> >>  - does not require sorted state (data always arrives in processing
> >> time
> >> order)
> >>  - no need to consider retraction (processing time is never late)
> >>  - defines windows on row count.
> >>  - A GlobalWindow with evictor + trigger might be the best
> implementation
> >> (basically the same as DataStream.countWindow(long, long). We need
> >> to
> add
> >> timeouts to clean up state for non-used keys though.
> >>
> >> - OVER RANGE for processing time
> >>  - does not require sorted state (data always arrives in processing
> >> time
> >> order)
> >>  - no need to consider retraction (processing time is never late)
> >>  - defines windows on row count
> >>  - I think this could also be implemented with a GlobalWindow with
> evictor
> >> + trigger (need to verify)
> >>
> >> - OVER RANGE for event time
> >>  - need for sorted state (late data possible)
> >>  - IMO, a ProcessFunction gives us the most flexibility in adding
> >> later features (retraction, update rate, etc.)
> >>  - @Shaoxuan, you sketched a good design. Would you like to
> >> continue
> with
> >> a design proposal?
> >>
> >> # UNBOUNDED PRECEDING
> >> Similar considerations apply for the UNBOUNDED PRECEDING cases of
> >> the
> above
> >> window types.
> >>
> >> If we all agree that the separation into six JIRAs
> >> (bounded/unbounded * row-pt/range-pt/ range-et) makes sense, I
> >> would suggest to move the discussions about the design of the
> >> implementation to the individual
> JIRAs.
> >>
> >> What do think?
> >>
> >> Best, Fabian
> >>
> >> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:
> >>
> >>> Hi Liuxinchun,
> >>> I am not sure where did you get the inception: anyone has
> >>> suggested "to process Event time window in Sliding Row Window". If
> >>> you were referring
> >> my
> >>> post, there may be some misunderstanding there. I think you were
> >>> asking
> >> the
> >>> similar question as Hongyuhong. I have just replied to him. Please
> take a
> >>> look and let me know if that makes sense to you. "Retraction" is
> >>> an important building block to compute correct incremental results
> >>> in streaming. It is another big topic, we should discuss this in
> >>> another thread.
> >>>
> >>> Regards,
> >>> Shaoxuan
> >>>
> >>>
> >>>
> >>> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun
> >>> <[hidden email]>
> >> wrote:
> >>>
> >>>> I don't think it is a good idea to process Event time window in
> Sliding
> >>>> Row Window. In Sliding Time window, when an element is late, we
> >>>> can
> >>> trigger
> >>>> the recalculation of the related windows. And the sliding period
> >>>> is coarse-gained, We only need to recalculate size/sliding number
> >>>> of
> >>> windows.
> >>>> But in Sliding Row Window, the calculation is triggered when
> >>>> every
> >>> element
> >>>> is coming. The sliding period is becoming fine-gained. When an
> >>>> element
> >> is
> >>>> late, there are so many "windows" are influenced. Even if we
> >>>> store all
> >>> the
> >>>> raw data, the computation is very large.
> >>>>
> >>>> I think if it is possible to set a standard to sliding Event Time
> >>>> Row Window, When certain elements are late, we can only
> >>>> recalculate
> partial
> >>>> windows and permit some error. For example, we can only
> >>>> recalculate
> the
> >>>> windows end in range between (lateElement.timestamp - leftDelta,
> >>>> lateElement.timestamp] and those windows begin in range between
> >>>> [lateElement.timestamp, lateElement.timestamp + rightDelta).
> >>>> ////////////////////////////////////////////////////////////
> >>>> //////////////////////////
> >>>> Hi everyone,
> >>>> Thanks for this great discussion, and glad to see more and more
> >>>> people
> >>> are
> >>>> interested on stream SQL & tableAPI.
> >>>>
> >>>> IMO, the key problems for Over window design are the SQL
> >>>> semantics and
> >>> the
> >>>> runtime design. I totally agree with Fabian that we should skip
> >>>> the
> >>> design
> >>>> of TumbleRows and SessionRows windows for now, as they are not
> >>>> well
> >>> defined
> >>>> in SQL semantics.
> >>>>
> >>>> Runtime design is the most crucial part we are interested in and
> >>>> volunteered to contribute into. We have thousands of machines
> >>>> running
> >>> flink
> >>>> streaming jobs. The costs in terms of CPU, memory, and state are
> >>>> the
> >>> vital
> >>>> factors that we have to taken into account. We have been working
> >>>> on
> the
> >>>> design of OVER window in the past months, and planning to send
> >>>> out a detailed design doc to DEV quite soon. But since Fabian
> >>>> started a good discussion on OVER window, I would like to share
> >>>> our ideas/thoughts
> >> about
> >>>> the runtime design for OVER window.
> >>>>
> >>>>   1. As SunJincheng pointed out earlier, sliding window does not
> >>>> work
> >>> for
> >>>>   unbounded preceding, we need alternative approach for unbound
> >>>> over window.
> >>>>   2. Though sliding window may work for some cases of bounded window,
> >>>>   it is not very efficient thereby should not be used for production.
> >> To
> >>>> the
> >>>>   best of my understanding, the current runtime implementation of
> >>> sliding
> >>>>   window has not leveraged the concepts of state Panes yet. This
> >>>> means that
> >>>>   if we use sliding window for OVER window,  there will be a
> >>>> backend
> >>> state
> >>>>   created per each group (partition by) and each row, and
> >>>> whenever a
> >> new
> >>>>   record arrives, it will be accumulated to all the existing
> >>>> windows
> >>> that
> >>>> has
> >>>>   not been closed. This would cause quite a lot of overhead in
> >>>> terms
> >> of
> >>>> both
> >>>>   CPU and memory&state.
> >>>>   3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> >>> and
> >>>>   a “sortedState”. I like this idea. The design details on this
> >>>> are
> >> not
> >>>> quite
> >>>>   clear yet. So I would like to add more thoughts on this. Regardless
> >>>>   which dataStream API we are going to use (it is very likely
> >>>> that we
> >>> need
> >>>>   a new API), we should come out with an optimal approach. The
> >>>> purpose
> >>> of
> >>>>   grouping window and over window is to partition the data, such
> >>>> that
> >> we
> >>>> can
> >>>>   generate the aggregate results. So when we talk about the
> >>>> design of
> >>> OVER
> >>>>   window, we have to think about the aggregates. As we proposed
> >>>> in our recent
> >>>>   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator
> >>>> will
> >>> be
> >>>>   stored in the aggregate state. Besides accumulator, we have
> >>>> also introduced
> >>>>   a retract API for UDAGG. With aggregate accumulator and retract
> >> API, I
> >>>> am
> >>>>   proposing a runtime approach to implement the OVER window as
> >>> followings.
> >>>>   4.
> >>>>      - We first implement a sorted state interface
> >>>>      - Per each group, we just create one sorted state. When a
> >>>> new
> >>> record
> >>>>      arrives, it will insert into this sorted state, in the
> >>>> meanwhile
> >> it
> >>>> will be
> >>>>      accumulated to the aggregate accumulator.
> >>>>      - For over window, we keep the aggregate accumulator for the
> >> entire
> >>>>      job lifelong time. This is different than the case where we
> >> delete
> >>>> the
> >>>>      accumulator for each group/window when a grouping-window is
> >>> finished.
> >>>>      - When an over window is up to trigger, we grab the
> >>>>      previous accumulator from the state and accumulate values
> >>>> onto it with all
> >>>>      the records till the upperBoundary of the current window,
> >>>> and retract all
> >>>>      the out of scope records till its lowerBoundary. We emit the
> >>>>      aggregate result and save the accumulator for the next window.
> >>>>
> >>>>
> >>>> Hello Fabian,
> >>>> I would suggest we should first start working on runtime design
> >>>> of
> over
> >>>> window and aggregate. Once we have a good design there, one can
> >>>> easily
> >>> add
> >>>> the support for SQL as well as tableAPI. What do you think?
> >>>>
> >>>> Regards,
> >>>> Shaoxuan
> >>>>
> >>>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske
> >>>> <[hidden email]>
> >>> wrote:
> >>>>
> >>>>> Hi Radu,
> >>>>>
> >>>>> thanks for your comments!
> >>>>>
> >>>>> Yes, my intention is to open new JIRA issues to structure the
> >>>>> development process. Everybody is very welcome to pick up issues
> >>>>> and discuss the design proposals.
> >>>>> At the moment I see the following six issues to start with:
> >>>>>
> >>>>> - streaming SQL OVER ROW for processing time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> - streaming SQL OVER RANGE for processing time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> - streaming SQL OVER RANGE for event time
> >>>>>  - bounded PRECEDING
> >>>>>  - unbounded PRECEDING
> >>>>>
> >>>>> For each of these windows we need corresponding translation
> >>>>> rules and execution code.
> >>>>>
> >>>>> Subsequent JIRAs would be
> >>>>> - extending the Table API for supported SQL windows
> >>>>> - add support for FOLLOWING
> >>>>> - etc.
> >>>>>
> >>>>> Regarding the requirement for a sorted state. I am not sure if
> >>>>> the OVER windows should be implemented using Flink's DataStream
> >>>>> window
> >>>> framework.
> >>>>> We need a good design document to figure out what is the best
> >>>>> approach. A ProcessFunction with a sorted state might be a good
> >>> solution
> >>>> as well.
> >>>>>
> >>>>> Best, Fabian
> >>>>>
> >>>>>
> >>>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> Thanks for starting these discussion - it is very useful.
> >>>>>> It does make sense indeed to refactor all these and coordinate
> >>>>>> a
> >> bit
> >>>>>> the efforts not to have overlapping implementations and
> >> incompatible
> >>>>> solutions.
> >>>>>>
> >>>>>> If you close the 3 jira issues you mentioned - do you plan to
> >>>>>> redesign them and open new ones? Do you need help from our side
> >>>>>> -
> >> we
> >>>>>> can also pick the redesign of some of these new jira issues.
> >>>>>> For example we already
> >>>>> have
> >>>>>> an implementation for this and we can help with the design.
> >>>>>> Nevertheless, let's coordinate the effort.
> >>>>>>
> >>>>>> Regarding the support for the different types of window - I
> >>>>>> think the
> >>>>> best
> >>>>>> option is to split the implementation in small units. We can
> >>>>>> easily do
> >>>>> this
> >>>>>> from the transformation rule class and with this each
> >>>>>> particular type of window (session/sliding/sliderows/processing
> >>>>>> time/...)
> >> will
> >>>>>> have a clear implementation and a corresponding architecture
> >>>>>> within
> >>>> the jira issue?
> >>>>> What
> >>>>>> do you think about such a granularity?
> >>>>>>
> >>>>>> Regarding the issue of " Q4: The implementaion of SlideRows
> >>>>>> still need a custom operator that collects records in a
> >>>>>> priority queue ordered by the "rowtime", which is similar to
> >>>>>> the design we discussed in FLINK-4697, right? "
> >>>>>> Why would you need this operator? The window buffer can act to
> >>>>>> some
> >>>>> extent
> >>>>>> as a priority queue as long as the trigger and evictor is set
> >>>>>> to work
> >>>>> based
> >>>>>> on the rowtime - or maybe I am missing something... Can you
> >>>>>> please
> >>>>> clarify
> >>>>>> this.
> >>>>>>
> >>>>>>
> >>>>>> Dr. Radu Tudoran
> >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> >>>>>>
> >>>>>>
> >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center
> >>>>>> Riesstrasse 25, 80992 München
> >>>>>>
> >>>>>> E-mail: [hidden email]
> >>>>>> Mobile: +49 15209084330
> >>>>>> Telephone: +49 891588344173
> >>>>>>
> >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
> >>>>>> Düsseldorf, Germany, www.huawei.com Registered Office:
> >>>>>> Düsseldorf, Register Court Düsseldorf, HRB
> >> 56063,
> >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der
> >>>>>> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> >> 56063,
> >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail
> >>>>>> and its attachments contain confidential information
> >> from
> >>>>>> HUAWEI, which is intended only for the person or entity whose
> >> address
> >>>> is
> >>>>>> listed above. Any use of the information contained herein in
> >>>>>> any
> >> way
> >>>>>> (including, but not limited to, total or partial disclosure,
> >>>>> reproduction,
> >>>>>> or dissemination) by persons other than the intended
> >>>>>> recipient(s)
> >> is
> >>>>>> prohibited. If you receive this e-mail in error, please notify
> >>>>>> the
> >>>> sender
> >>>>>> by phone or email immediately and delete it!
> >>>>>>
> >>>>>>
> >>>>>> -----Original Message-----
> >>>>>> From: Jark Wu [mailto:[hidden email]]
> >>>>>> Sent: Tuesday, January 24, 2017 6:53 AM
> >>>>>> To: [hidden email]
> >>>>>> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> >>> Windows
> >>>>> for
> >>>>>> streaming tables
> >>>>>>
> >>>>>> Hi Fabian,
> >>>>>>
> >>>>>> Thanks for bringing up this discussion and the nice approach to
> >> avoid
> >>>>>> overlapping contributions.
> >>>>>>
> >>>>>> All of these make sense to me. But I have some questions.
> >>>>>>
> >>>>>> Q1: If I understand correctly, we will not support TumbleRows
> >>>>>> and SessionRows at the beginning. But maybe support them as a
> >>>>>> syntax
> >>> sugar
> >>>>> (in
> >>>>>> Table API) when the SlideRows is supported in the future. Right ?
> >>>>>>
> >>>>>> Q2: How to support SessionRows based on SlideRows ?  I don't
> >>>>>> get
> >> how
> >>> to
> >>>>>> partition on "gap-separated".
> >>>>>>
> >>>>>> Q3: Should we break down the approach into smaller tasks for
> >>> streaming
> >>>>>> tables and batch tables ?
> >>>>>>
> >>>>>> Q4: The implementaion of SlideRows still need a custom operator
> >> that
> >>>>>> collects records in a priority queue ordered by the "rowtime",
> >> which
> >>> is
> >>>>>> similar to the design we discussed in FLINK-4697, right?
> >>>>>>
> >>>>>> +1 not support for OVER ROW for event time at this point.
> >>>>>>
> >>>>>> Regards, Jark
> >>>>>>
> >>>>>>
> >>>>>>> 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>> We are also interested in streaming sql and very willing to
> >>>> participate
> >>>>>> and contribute.
> >>>>>>>
> >>>>>>> We are now in progress and we will also contribute to calcite
> >>>>>>> to
> >>> push
> >>>>>> forward the window and stream-join support.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --------------
> >>>>>>> Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> >>>> 2017年1月24日
> >>>>>>> 5:55
> >>>>>>> Receiver: [hidden email]
> >>>>>>> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> >>> Windows
> >>>>>>> for streaming tables
> >>>>>>>
> >>>>>>> Hi Haohui,
> >>>>>>>
> >>>>>>> our plan was in fact to piggy-back on Calcite and use the
> >>>>>>> TUMBLE
> >>>>>> function [1] once is it is available (CALCITE-1345 [2]).
> >>>>>>> Unfortunately, this issue does not seem to be very active, so
> >>>>>>> I
> >>> don't
> >>>>>> know what the progress is.
> >>>>>>>
> >>>>>>> I would suggest to move the discussion about group windows to
> >>>>>>> a
> >>>>> separate
> >>>>>> thread and keep this one focused on the organization of the SQL
> >> OVER
> >>>>>> windows.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Fabian
> >>>>>>>
> >>>>>>> [1] http://calcite.apache.org/docs/stream.html)
> >>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
> >>>>>>>
> >>>>>>> 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> >>>>>>>
> >>>>>>>> Hi Fabian,
> >>>>>>>>
> >>>>>>>> FLINK-4692 has added the support for tumbling window and we
> >>>>>>>> are excited to try it out and expose it as a SQL construct.
> >>>>>>>>
> >>>>>>>> Just curious -- what's your thought on the SQL syntax on
> >> tumbling
> >>>>>> window?
> >>>>>>>>
> >>>>>>>> Implementation wise it might make sense to think tumbling
> >>>>>>>> window
> >>> as
> >>>> a
> >>>>>>>> special case of the sliding window.
> >>>>>>>>
> >>>>>>>> The problem I see is that the OVER construct might be
> >> insufficient
> >>>> to
> >>>>>>>> support all the use cases of tumbling windows. For example,
> >>>>>>>> it
> >>> fails
> >>>>>>>> to express tumbling windows that have fractional time units
> >>>>>>>> (as pointed out in http://calcite.apache.org/docs/stream.html).
> >>>>>>>>
> >>>>>>>> It looks to me that the Calcite / Azure Stream Analytics have
> >>>>>>>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to
> >>>>>>>> address
> >>> this
> >>>>>> issue.
> >>>>>>>>
> >>>>>>>> Do you think it is a good idea to follow the same conventions?
> >>> Your
> >>>>>>>> ideas are appreciated.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Haohui
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai
> >>>>>>>> <[hidden email]>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> +1
> >>>>>>>>>
> >>>>>>>>> We are also quite interested in these features and would
> >>>>>>>>> love
> >> to
> >>>>>>>>> participate and contribute.
> >>>>>>>>>
> >>>>>>>>> ~Haohui
> >>>>>>>>>
> >>>>>>>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <
> >> [hidden email]
> >>>>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi everybody,
> >>>>>>>>>>
> >>>>>>>>>> it seems that currently several contributors are working on
> >> new
> >>>>>>>>>> features for the streaming Table API / SQL around row
> >>>>>>>>>> windows
> >>> (as
> >>>>>>>>>> defined in
> >>>>>>>>>> FLIP-11
> >>>>>>>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> >>>> FLINK-4680,
> >>>>>>>>>> FLINK-5584).
> >>>>>>>>>> Since these efforts overlap quite a bit I spent some time
> >>> thinking
> >>>>>>>>>> about how we can approach these features and how to avoid
> >>>>>>>>>> overlapping contributions.
> >>>>>>>>>>
> >>>>>>>>>> The challenge here is the following. Some of the Table API
> >>>>>>>>>> row windows
> >>>>>>>> as
> >>>>>>>>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> >>> other
> >>>>>>>>>> cannot be easily expressed as such (TumbleRows for
> >>>>>>>>>> row-count intervals, SessionRows).
> >>>>>>>>>> However, since Calcite already supports SQL OVER windows,
> >>>>>>>>>> we
> >> can
> >>>>>>>>>> reuse
> >>>>>>>> the
> >>>>>>>>>> optimization logic for some of the Table API row windows. I
> >> also
> >>>>>>>>>> thought about the semantics of the TumbleRows and
> >>>>>>>>>> SessionRows windows as defined in
> >>>>>>>>>> FLIP-11 and came to the conclusion that these are not well
> >>> defined
> >>>>>>>>>> in
> >>>>>>>>>> FLIP-11 and should rather be defined as SlideRows windows
> >> with a
> >>>>>>>>>> special PARTITION BY clause.
> >>>>>>>>>>
> >>>>>>>>>> I propose to approach SQL OVER windows and Table API row
> >> windows
> >>>> as
> >>>>>>>>>> follows:
> >>>>>>>>>>
> >>>>>>>>>> We start with three simple cases for SQL OVER windows (not
> >> Table
> >>>>>>>>>> API
> >>>>>>>> yet):
> >>>>>>>>>>
> >>>>>>>>>> * OVER RANGE for event time
> >>>>>>>>>> * OVER RANGE for processing time
> >>>>>>>>>> * OVER ROW for processing time
> >>>>>>>>>>
> >>>>>>>>>> All cases fulfill the following restrictions:
> >>>>>>>>>> - All aggregations in SELECT must refer to the same window.
> >>>>>>>>>> - PARTITION BY may not contain the rowtime attribute.
> >>>>>>>>>> - ORDER BY must be on rowtime attribute (for event time) or
> >> on a
> >>>>>>>>>> marker function that indicates processing time. Additional
> >> sort
> >>>>>>>>>> attributes are not supported initially.
> >>>>>>>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> >>> "BETWEEN
> >>>> x
> >>>>>>>>>> PRECEDING AND CURRENT ROW" are supported.
> >>>>>>>>>>
> >>>>>>>>>> OVER ROW for event time cannot be easily supported. With
> >>>>>>>>>> event time, we may have late records which need to be
> >>>>>>>>>> injected into
> >>> the
> >>>>>>>>>> order of records.
> >>>>>>>>>> When
> >>>>>>>>>> a record in injected in to the order where a row-count
> >>>>>>>>>> window
> >>> has
> >>>>>>>> already
> >>>>>>>>>> been computed, this and all following windows will change.
> >>>>>>>>>> We
> >>>> could
> >>>>>>>> either
> >>>>>>>>>> drop the record or sent out many retraction records. I
> >>>>>>>>>> think
> >> it
> >>> is
> >>>>>>>>>> best
> >>>>>>>> to
> >>>>>>>>>> not open this can of worms at this point.
> >>>>>>>>>>
> >>>>>>>>>> The rational for all of the above restrictions is to have
> >> first
> >>>>>>>>>> versions of OVER windows soon.
> >>>>>>>>>> Once we have the above cases covered we can extend and
> >>>>>>>>>> remove
> >>>>>>>> limitations
> >>>>>>>>>> as follows:
> >>>>>>>>>>
> >>>>>>>>>> - Table API SlideRow windows (with the same restrictions as
> >>>> above).
> >>>>>>>>>> This will be mostly API work since the execution part has
> >>>>>>>>>> been
> >>>>> solved
> >>>>>> before.
> >>>>>>>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> >>>>>>>>>> - Add support for different windows in SELECT. All windows
> >> must
> >>> be
> >>>>>>>>>> partitioned and ordered in the same way.
> >>>>>>>>>> - Add support for additional ORDER BY attributes (besides
> >> time).
> >>>>>>>>>>
> >>>>>>>>>> As I said before, TumbleRows and SessionRows windows as in
> >>> FLIP-11
> >>>>>>>>>> are
> >>>>>>>> not
> >>>>>>>>>> well defined, IMO.
> >>>>>>>>>> They can be expressed as SlideRows windows with special
> >>>>>>>>>> partitioning (partitioning on fixed, non-overlapping time
> >> ranges
> >>>>>>>>>> for TumbleRows, and gap-separated, non-overlapping time
> >>>>>>>>>> ranges
> >>> for
> >>>>>>>>>> SessionRows) I would not start to work on those yet.
> >>>>>>>>>>
> >>>>>>>>>> I would like to close all related JIRA issues (FLINK-4678,
> >>>>>>>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> >>>> development
> >>>>>>>>>> of these
> >>>>>>>> features
> >>>>>>>>>> as outlined above with corresponding JIRA issues.
> >>>>>>>>>>
> >>>>>>>>>> What do others think? (I cc'ed the contributors assigned to
> >> the
> >>>>>>>>>> above
> >>>>>>>> JIRA
> >>>>>>>>>> issues)
> >>>>>>>>>>
> >>>>>>>>>> Best, Fabian
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >>>>>>>> 11%3A+Table+API+Stream+Aggregations
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

答复: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

shijinkui
In reply to this post by Fabian Hueske-2
hi,Fabian, sunjincheng

Today is the first workday of 2017 in China. When we come back, I found the SQL issues had been assigned between New Year...
Yuhong Hong is interest in FLINK-5657. She had implemented it before. Can we reconsider to assign FLINK-5657 to her?

Thanks
Jinkui Shi

[1] https://issues.apache.org/jira/browse/FLINK-4557

-----邮件原件-----
发件人: Fabian Hueske [mailto:[hidden email]]
发送时间: 2017年1月25日 17:55
收件人: [hidden email]
主题: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Hi everybody,

thanks for the great discussions so far. It's awesome to see so much interest in this topic!

First, I'd like to comment on the development process for this feature and later on the design of the runtime:

Dev Process
----
@Shaoxuan, I completely agree with you. We should first come up with good designs for the runtime operators of the different window types. Once we have that, we can start implementing the operators and integrate them with Calcite's optimization. This will be an intermediate step and as a byproduct give us support for SQL OVER windows. Once this is done, we can extend the Table API and translate the Table API calls into the same RelNodes as Calcite's SQL parser does.

Runtime Design
----
I think it makes sense to distinguish the different types of OVER windows because they have different requirements which result in different runtime implementations (with different implementation complexity and performance).
In a previous mail I proposed to split the support for OVER windows into the following subtasks:

# bounded PRECEDING
- OVER ROWS for processing time
  - does not require sorted state (data always arrives in processing time
order)
  - no need to consider retraction (processing time is never late)
  - defines windows on row count.
  - A GlobalWindow with evictor + trigger might be the best implementation (basically the same as DataStream.countWindow(long, long). We need to add timeouts to clean up state for non-used keys though.

- OVER RANGE for processing time
  - does not require sorted state (data always arrives in processing time
order)
  - no need to consider retraction (processing time is never late)
  - defines windows on row count
  - I think this could also be implemented with a GlobalWindow with evictor
+ trigger (need to verify)

- OVER RANGE for event time
  - need for sorted state (late data possible)
  - IMO, a ProcessFunction gives us the most flexibility in adding later features (retraction, update rate, etc.)
  - @Shaoxuan, you sketched a good design. Would you like to continue with a design proposal?

# UNBOUNDED PRECEDING
Similar considerations apply for the UNBOUNDED PRECEDING cases of the above window types.

If we all agree that the separation into six JIRAs (bounded/unbounded * row-pt/range-pt/ range-et) makes sense, I would suggest to move the discussions about the design of the implementation to the individual JIRAs.

What do think?

Best, Fabian

2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:

> Hi Liuxinchun,
> I am not sure where did you get the inception: anyone has suggested
> "to process Event time window in Sliding Row Window". If you were
> referring my post, there may be some misunderstanding there. I think
> you were asking the similar question as Hongyuhong. I have just
> replied to him. Please take a look and let me know if that makes sense
> to you. "Retraction" is an important building block to compute correct
> incremental results in streaming. It is another big topic, we should
> discuss this in another thread.
>
> Regards,
> Shaoxuan
>
>
>
> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]> wrote:
>
> > I don't think it is a good idea to process Event time window in
> > Sliding Row Window. In Sliding Time window, when an element is late,
> > we can
> trigger
> > the recalculation of the related windows. And the sliding period is
> > coarse-gained, We only need to recalculate size/sliding number of
> windows.
> > But in Sliding Row Window, the calculation is triggered when every
> element
> > is coming. The sliding period is becoming fine-gained. When an
> > element is late, there are so many "windows" are influenced. Even if
> > we store all
> the
> > raw data, the computation is very large.
> >
> > I think if it is possible to set a standard to sliding Event Time
> > Row Window, When certain elements are late, we can only recalculate
> > partial windows and permit some error. For example, we can only
> > recalculate the windows end in range between (lateElement.timestamp
> > - leftDelta, lateElement.timestamp] and those windows begin in range
> > between [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > ////////////////////////////////////////////////////////////
> > //////////////////////////
> >  Hi everyone,
> > Thanks for this great discussion, and glad to see more and more
> > people
> are
> > interested on stream SQL & tableAPI.
> >
> > IMO, the key problems for Over window design are the SQL semantics
> > and
> the
> > runtime design. I totally agree with Fabian that we should skip the
> design
> > of TumbleRows and SessionRows windows for now, as they are not well
> defined
> > in SQL semantics.
> >
> > Runtime design is the most crucial part we are interested in and
> > volunteered to contribute into. We have thousands of machines
> > running
> flink
> > streaming jobs. The costs in terms of CPU, memory, and state are the
> vital
> > factors that we have to taken into account. We have been working on
> > the design of OVER window in the past months, and planning to send
> > out a detailed design doc to DEV quite soon. But since Fabian
> > started a good discussion on OVER window, I would like to share our
> > ideas/thoughts about the runtime design for OVER window.
> >
> >    1. As SunJincheng pointed out earlier, sliding window does not
> > work
> for
> >    unbounded preceding, we need alternative approach for unbound
> > over window.
> >    2. Though sliding window may work for some cases of bounded window,
> >    it is not very efficient thereby should not be used for
> > production. To the
> >    best of my understanding, the current runtime implementation of
> sliding
> >    window has not leveraged the concepts of state Panes yet. This
> > means that
> >    if we use sliding window for OVER window,  there will be a
> > backend
> state
> >    created per each group (partition by) and each row, and whenever a new
> >    record arrives, it will be accumulated to all the existing
> > windows
> that
> > has
> >    not been closed. This would cause quite a lot of overhead in
> > terms of both
> >    CPU and memory&state.
> >    3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> and
> >    a “sortedState”. I like this idea. The design details on this are
> > not quite
> >    clear yet. So I would like to add more thoughts on this. Regardless
> >    which dataStream API we are going to use (it is very likely that
> > we
> need
> >    a new API), we should come out with an optimal approach. The
> > purpose
> of
> >    grouping window and over window is to partition the data, such
> > that we can
> >    generate the aggregate results. So when we talk about the design
> > of
> OVER
> >    window, we have to think about the aggregates. As we proposed in
> > our recent
> >    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator
> > will
> be
> >    stored in the aggregate state. Besides accumulator, we have also
> > introduced
> >    a retract API for UDAGG. With aggregate accumulator and retract
> > API, I am
> >    proposing a runtime approach to implement the OVER window as
> followings.
> >    4.
> >       - We first implement a sorted state interface
> >       - Per each group, we just create one sorted state. When a new
> record
> >       arrives, it will insert into this sorted state, in the
> > meanwhile it will be
> >       accumulated to the aggregate accumulator.
> >       - For over window, we keep the aggregate accumulator for the entire
> >       job lifelong time. This is different than the case where we
> > delete the
> >       accumulator for each group/window when a grouping-window is
> finished.
> >       - When an over window is up to trigger, we grab the
> >       previous accumulator from the state and accumulate values onto
> > it with all
> >       the records till the upperBoundary of the current window, and
> > retract all
> >       the out of scope records till its lowerBoundary. We emit the
> >       aggregate result and save the accumulator for the next window.
> >
> >
> > Hello Fabian,
> > I would suggest we should first start working on runtime design of
> > over window and aggregate. Once we have a good design there, one can
> > easily
> add
> > the support for SQL as well as tableAPI. What do you think?
> >
> > Regards,
> > Shaoxuan
> >
> > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> > > Hi Radu,
> > >
> > > thanks for your comments!
> > >
> > > Yes, my intention is to open new JIRA issues to structure the
> > > development process. Everybody is very welcome to pick up issues
> > > and discuss the design proposals.
> > > At the moment I see the following six issues to start with:
> > >
> > > - streaming SQL OVER ROW for processing time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > - streaming SQL OVER RANGE for processing time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > - streaming SQL OVER RANGE for event time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > For each of these windows we need corresponding translation rules
> > > and execution code.
> > >
> > > Subsequent JIRAs would be
> > > - extending the Table API for supported SQL windows
> > > - add support for FOLLOWING
> > > - etc.
> > >
> > > Regarding the requirement for a sorted state. I am not sure if the
> > > OVER windows should be implemented using Flink's DataStream window
> > framework.
> > > We need a good design document to figure out what is the best
> > > approach. A ProcessFunction with a sorted state might be a good
> solution
> > as well.
> > >
> > > Best, Fabian
> > >
> > >
> > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for starting these discussion - it is very useful.
> > > > It does make sense indeed to refactor all these and coordinate a
> > > > bit the efforts not to have overlapping implementations and
> > > > incompatible
> > > solutions.
> > > >
> > > > If you close the 3 jira issues you mentioned - do you plan to
> > > > redesign them and open new ones? Do you need help from our side
> > > > - we can also pick the redesign of some of these new jira
> > > > issues. For example we already
> > > have
> > > > an implementation for this and we can help with the design.
> > > > Nevertheless, let's coordinate the effort.
> > > >
> > > > Regarding the support for the different types of window - I
> > > > think the
> > > best
> > > > option is to split the implementation in small units. We can
> > > > easily do
> > > this
> > > > from the transformation rule class and with this each particular
> > > > type of window (session/sliding/sliderows/processing time/...)
> > > > will have a clear implementation and a corresponding
> > > > architecture within
> > the jira issue?
> > > What
> > > > do you think about such a granularity?
> > > >
> > > > Regarding the issue of " Q4: The implementaion of SlideRows
> > > > still need a custom operator that collects records in a priority
> > > > queue ordered by the "rowtime", which is similar to the design
> > > > we discussed in FLINK-4697, right? "
> > > > Why would you need this operator? The window buffer can act to
> > > > some
> > > extent
> > > > as a priority queue as long as the trigger and evictor is set to
> > > > work
> > > based
> > > > on the rowtime - or maybe I am missing something... Can you
> > > > please
> > > clarify
> > > > this.
> > > >
> > > >
> > > > Dr. Radu Tudoran
> > > > Senior Research Engineer - Big Data Expert IT R&D Division
> > > >
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center
> > > > Riesstrasse 25, 80992 München
> > > >
> > > > E-mail: [hidden email]
> > > > Mobile: +49 15209084330
> > > > Telephone: +49 891588344173
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
> > > > Düsseldorf, Germany, www.huawei.com Registered Office:
> > > > Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der
> > > > Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail
> > > > and its attachments contain confidential information from
> > > > HUAWEI, which is intended only for the person or entity whose
> > > > address
> > is
> > > > listed above. Any use of the information contained herein in any
> > > > way (including, but not limited to, total or partial disclosure,
> > > reproduction,
> > > > or dissemination) by persons other than the intended
> > > > recipient(s) is prohibited. If you receive this e-mail in error,
> > > > please notify the
> > sender
> > > > by phone or email immediately and delete it!
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Jark Wu [mailto:[hidden email]]
> > > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > > To: [hidden email]
> > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> Windows
> > > for
> > > > streaming tables
> > > >
> > > > Hi Fabian,
> > > >
> > > > Thanks for bringing up this discussion and the nice approach to
> > > > avoid overlapping contributions.
> > > >
> > > > All of these make sense to me. But I have some questions.
> > > >
> > > > Q1: If I understand correctly, we will not support TumbleRows
> > > > and SessionRows at the beginning. But maybe support them as a
> > > > syntax
> sugar
> > > (in
> > > > Table API) when the SlideRows is supported in the future. Right ?
> > > >
> > > > Q2: How to support SessionRows based on SlideRows ?  I don't get
> > > > how
> to
> > > > partition on "gap-separated".
> > > >
> > > > Q3: Should we break down the approach into smaller tasks for
> streaming
> > > > tables and batch tables ?
> > > >
> > > > Q4: The implementaion of SlideRows still need a custom operator
> > > > that collects records in a priority queue ordered by the
> > > > "rowtime", which
> is
> > > > similar to the design we discussed in FLINK-4697, right?
> > > >
> > > > +1 not support for OVER ROW for event time at this point.
> > > >
> > > > Regards, Jark
> > > >
> > > >
> > > > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > > > >
> > > > > Hi,
> > > > > We are also interested in streaming sql and very willing to
> > participate
> > > > and contribute.
> > > > >
> > > > > We are now in progress and we will also contribute to calcite
> > > > > to
> push
> > > > forward the window and stream-join support.
> > > > >
> > > > >
> > > > >
> > > > > --------------
> > > > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> > 2017年1月24日
> > > > > 5:55
> > > > > Receiver: [hidden email]
> > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> Windows
> > > > > for streaming tables
> > > > >
> > > > > Hi Haohui,
> > > > >
> > > > > our plan was in fact to piggy-back on Calcite and use the
> > > > > TUMBLE
> > > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > > Unfortunately, this issue does not seem to be very active, so
> > > > > I
> don't
> > > > know what the progress is.
> > > > >
> > > > > I would suggest to move the discussion about group windows to
> > > > > a
> > > separate
> > > > thread and keep this one focused on the organization of the SQL
> > > > OVER windows.
> > > > >
> > > > > Best,
> > > > > Fabian
> > > > >
> > > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > > >
> > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > > > >
> > > > >> Hi Fabian,
> > > > >>
> > > > >> FLINK-4692 has added the support for tumbling window and we
> > > > >> are excited to try it out and expose it as a SQL construct.
> > > > >>
> > > > >> Just curious -- what's your thought on the SQL syntax on
> > > > >> tumbling
> > > > window?
> > > > >>
> > > > >> Implementation wise it might make sense to think tumbling
> > > > >> window
> as
> > a
> > > > >> special case of the sliding window.
> > > > >>
> > > > >> The problem I see is that the OVER construct might be
> > > > >> insufficient
> > to
> > > > >> support all the use cases of tumbling windows. For example,
> > > > >> it
> fails
> > > > >> to express tumbling windows that have fractional time units
> > > > >> (as pointed out in http://calcite.apache.org/docs/stream.html).
> > > > >>
> > > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to
> > > > >> address
> this
> > > > issue.
> > > > >>
> > > > >> Do you think it is a good idea to follow the same conventions?
> Your
> > > > >> ideas are appreciated.
> > > > >>
> > > > >> Regards,
> > > > >> Haohui
> > > > >>
> > > > >>
> > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai
> > > > >> <[hidden email]>
> > > wrote:
> > > > >>
> > > > >>> +1
> > > > >>>
> > > > >>> We are also quite interested in these features and would
> > > > >>> love to participate and contribute.
> > > > >>>
> > > > >>> ~Haohui
> > > > >>>
> > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske
> > > > >>> <[hidden email]
> >
> > > > wrote:
> > > > >>>
> > > > >>>> Hi everybody,
> > > > >>>>
> > > > >>>> it seems that currently several contributors are working on
> > > > >>>> new features for the streaming Table API / SQL around row
> > > > >>>> windows
> (as
> > > > >>>> defined in
> > > > >>>> FLIP-11
> > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > FLINK-4680,
> > > > >>>> FLINK-5584).
> > > > >>>> Since these efforts overlap quite a bit I spent some time
> thinking
> > > > >>>> about how we can approach these features and how to avoid
> > > > >>>> overlapping contributions.
> > > > >>>>
> > > > >>>> The challenge here is the following. Some of the Table API
> > > > >>>> row windows
> > > > >> as
> > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> other
> > > > >>>> cannot be easily expressed as such (TumbleRows for
> > > > >>>> row-count intervals, SessionRows).
> > > > >>>> However, since Calcite already supports SQL OVER windows,
> > > > >>>> we can reuse
> > > > >> the
> > > > >>>> optimization logic for some of the Table API row windows. I
> > > > >>>> also thought about the semantics of the TumbleRows and
> > > > >>>> SessionRows windows as defined in
> > > > >>>> FLIP-11 and came to the conclusion that these are not well
> defined
> > > > >>>> in
> > > > >>>> FLIP-11 and should rather be defined as SlideRows windows
> > > > >>>> with a special PARTITION BY clause.
> > > > >>>>
> > > > >>>> I propose to approach SQL OVER windows and Table API row
> > > > >>>> windows
> > as
> > > > >>>> follows:
> > > > >>>>
> > > > >>>> We start with three simple cases for SQL OVER windows (not
> > > > >>>> Table API
> > > > >> yet):
> > > > >>>>
> > > > >>>> * OVER RANGE for event time
> > > > >>>> * OVER RANGE for processing time
> > > > >>>> * OVER ROW for processing time
> > > > >>>>
> > > > >>>> All cases fulfill the following restrictions:
> > > > >>>> - All aggregations in SELECT must refer to the same window.
> > > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or
> > > > >>>> on a marker function that indicates processing time.
> > > > >>>> Additional sort attributes are not supported initially.
> > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> "BETWEEN
> > x
> > > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > > >>>>
> > > > >>>> OVER ROW for event time cannot be easily supported. With
> > > > >>>> event time, we may have late records which need to be
> > > > >>>> injected into
> the
> > > > >>>> order of records.
> > > > >>>> When
> > > > >>>> a record in injected in to the order where a row-count
> > > > >>>> window
> has
> > > > >> already
> > > > >>>> been computed, this and all following windows will change.
> > > > >>>> We
> > could
> > > > >> either
> > > > >>>> drop the record or sent out many retraction records. I
> > > > >>>> think it
> is
> > > > >>>> best
> > > > >> to
> > > > >>>> not open this can of worms at this point.
> > > > >>>>
> > > > >>>> The rational for all of the above restrictions is to have
> > > > >>>> first versions of OVER windows soon.
> > > > >>>> Once we have the above cases covered we can extend and
> > > > >>>> remove
> > > > >> limitations
> > > > >>>> as follows:
> > > > >>>>
> > > > >>>> - Table API SlideRow windows (with the same restrictions as
> > above).
> > > > >>>> This will be mostly API work since the execution part has
> > > > >>>> been
> > > solved
> > > > before.
> > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > > >>>> - Add support for different windows in SELECT. All windows
> > > > >>>> must
> be
> > > > >>>> partitioned and ordered in the same way.
> > > > >>>> - Add support for additional ORDER BY attributes (besides time).
> > > > >>>>
> > > > >>>> As I said before, TumbleRows and SessionRows windows as in
> FLIP-11
> > > > >>>> are
> > > > >> not
> > > > >>>> well defined, IMO.
> > > > >>>> They can be expressed as SlideRows windows with special
> > > > >>>> partitioning (partitioning on fixed, non-overlapping time
> > > > >>>> ranges for TumbleRows, and gap-separated, non-overlapping
> > > > >>>> time ranges
> for
> > > > >>>> SessionRows) I would not start to work on those yet.
> > > > >>>>
> > > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > development
> > > > >>>> of these
> > > > >> features
> > > > >>>> as outlined above with corresponding JIRA issues.
> > > > >>>>
> > > > >>>> What do others think? (I cc'ed the contributors assigned to
> > > > >>>> the above
> > > > >> JIRA
> > > > >>>> issues)
> > > > >>>>
> > > > >>>> Best, Fabian
> > > > >>>>
> > > > >>>> [1]
> > > > >>>>
> > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > >> 11%3A+Table+API+Stream+Aggregations
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: 答复: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
Hi Jinkui Shi, Yuhong Hong, Sunjincheng,

I'd suggest to discuss this on the actual JIRA issue.
I think it would help to describe the design and status of the
implementation.

Thanks, Fabian

2017-02-06 3:24 GMT+01:00 shijinkui <[hidden email]>:

> hi,Fabian, sunjincheng
>
> Today is the first workday of 2017 in China. When we come back, I found
> the SQL issues had been assigned between New Year...
> Yuhong Hong is interest in FLINK-5657. She had implemented it before. Can
> we reconsider to assign FLINK-5657 to her?
>
> Thanks
> Jinkui Shi
>
> [1] https://issues.apache.org/jira/browse/FLINK-4557
>
> -----邮件原件-----
> 发件人: Fabian Hueske [mailto:[hidden email]]
> 发送时间: 2017年1月25日 17:55
> 收件人: [hidden email]
> 主题: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for
> streaming tables
>
> Hi everybody,
>
> thanks for the great discussions so far. It's awesome to see so much
> interest in this topic!
>
> First, I'd like to comment on the development process for this feature and
> later on the design of the runtime:
>
> Dev Process
> ----
> @Shaoxuan, I completely agree with you. We should first come up with good
> designs for the runtime operators of the different window types. Once we
> have that, we can start implementing the operators and integrate them with
> Calcite's optimization. This will be an intermediate step and as a
> byproduct give us support for SQL OVER windows. Once this is done, we can
> extend the Table API and translate the Table API calls into the same
> RelNodes as Calcite's SQL parser does.
>
> Runtime Design
> ----
> I think it makes sense to distinguish the different types of OVER windows
> because they have different requirements which result in different runtime
> implementations (with different implementation complexity and performance).
> In a previous mail I proposed to split the support for OVER windows into
> the following subtasks:
>
> # bounded PRECEDING
> - OVER ROWS for processing time
>   - does not require sorted state (data always arrives in processing time
> order)
>   - no need to consider retraction (processing time is never late)
>   - defines windows on row count.
>   - A GlobalWindow with evictor + trigger might be the best implementation
> (basically the same as DataStream.countWindow(long, long). We need to add
> timeouts to clean up state for non-used keys though.
>
> - OVER RANGE for processing time
>   - does not require sorted state (data always arrives in processing time
> order)
>   - no need to consider retraction (processing time is never late)
>   - defines windows on row count
>   - I think this could also be implemented with a GlobalWindow with evictor
> + trigger (need to verify)
>
> - OVER RANGE for event time
>   - need for sorted state (late data possible)
>   - IMO, a ProcessFunction gives us the most flexibility in adding later
> features (retraction, update rate, etc.)
>   - @Shaoxuan, you sketched a good design. Would you like to continue with
> a design proposal?
>
> # UNBOUNDED PRECEDING
> Similar considerations apply for the UNBOUNDED PRECEDING cases of the
> above window types.
>
> If we all agree that the separation into six JIRAs (bounded/unbounded *
> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> discussions about the design of the implementation to the individual JIRAs.
>
> What do think?
>
> Best, Fabian
>
> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:
>
> > Hi Liuxinchun,
> > I am not sure where did you get the inception: anyone has suggested
> > "to process Event time window in Sliding Row Window". If you were
> > referring my post, there may be some misunderstanding there. I think
> > you were asking the similar question as Hongyuhong. I have just
> > replied to him. Please take a look and let me know if that makes sense
> > to you. "Retraction" is an important building block to compute correct
> > incremental results in streaming. It is another big topic, we should
> > discuss this in another thread.
> >
> > Regards,
> > Shaoxuan
> >
> >
> >
> > On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]>
> wrote:
> >
> > > I don't think it is a good idea to process Event time window in
> > > Sliding Row Window. In Sliding Time window, when an element is late,
> > > we can
> > trigger
> > > the recalculation of the related windows. And the sliding period is
> > > coarse-gained, We only need to recalculate size/sliding number of
> > windows.
> > > But in Sliding Row Window, the calculation is triggered when every
> > element
> > > is coming. The sliding period is becoming fine-gained. When an
> > > element is late, there are so many "windows" are influenced. Even if
> > > we store all
> > the
> > > raw data, the computation is very large.
> > >
> > > I think if it is possible to set a standard to sliding Event Time
> > > Row Window, When certain elements are late, we can only recalculate
> > > partial windows and permit some error. For example, we can only
> > > recalculate the windows end in range between (lateElement.timestamp
> > > - leftDelta, lateElement.timestamp] and those windows begin in range
> > > between [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > > ////////////////////////////////////////////////////////////
> > > //////////////////////////
> > >  Hi everyone,
> > > Thanks for this great discussion, and glad to see more and more
> > > people
> > are
> > > interested on stream SQL & tableAPI.
> > >
> > > IMO, the key problems for Over window design are the SQL semantics
> > > and
> > the
> > > runtime design. I totally agree with Fabian that we should skip the
> > design
> > > of TumbleRows and SessionRows windows for now, as they are not well
> > defined
> > > in SQL semantics.
> > >
> > > Runtime design is the most crucial part we are interested in and
> > > volunteered to contribute into. We have thousands of machines
> > > running
> > flink
> > > streaming jobs. The costs in terms of CPU, memory, and state are the
> > vital
> > > factors that we have to taken into account. We have been working on
> > > the design of OVER window in the past months, and planning to send
> > > out a detailed design doc to DEV quite soon. But since Fabian
> > > started a good discussion on OVER window, I would like to share our
> > > ideas/thoughts about the runtime design for OVER window.
> > >
> > >    1. As SunJincheng pointed out earlier, sliding window does not
> > > work
> > for
> > >    unbounded preceding, we need alternative approach for unbound
> > > over window.
> > >    2. Though sliding window may work for some cases of bounded window,
> > >    it is not very efficient thereby should not be used for
> > > production. To the
> > >    best of my understanding, the current runtime implementation of
> > sliding
> > >    window has not leveraged the concepts of state Panes yet. This
> > > means that
> > >    if we use sliding window for OVER window,  there will be a
> > > backend
> > state
> > >    created per each group (partition by) and each row, and whenever a
> new
> > >    record arrives, it will be accumulated to all the existing
> > > windows
> > that
> > > has
> > >    not been closed. This would cause quite a lot of overhead in
> > > terms of both
> > >    CPU and memory&state.
> > >    3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> > and
> > >    a “sortedState”. I like this idea. The design details on this are
> > > not quite
> > >    clear yet. So I would like to add more thoughts on this. Regardless
> > >    which dataStream API we are going to use (it is very likely that
> > > we
> > need
> > >    a new API), we should come out with an optimal approach. The
> > > purpose
> > of
> > >    grouping window and over window is to partition the data, such
> > > that we can
> > >    generate the aggregate results. So when we talk about the design
> > > of
> > OVER
> > >    window, we have to think about the aggregates. As we proposed in
> > > our recent
> > >    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator
> > > will
> > be
> > >    stored in the aggregate state. Besides accumulator, we have also
> > > introduced
> > >    a retract API for UDAGG. With aggregate accumulator and retract
> > > API, I am
> > >    proposing a runtime approach to implement the OVER window as
> > followings.
> > >    4.
> > >       - We first implement a sorted state interface
> > >       - Per each group, we just create one sorted state. When a new
> > record
> > >       arrives, it will insert into this sorted state, in the
> > > meanwhile it will be
> > >       accumulated to the aggregate accumulator.
> > >       - For over window, we keep the aggregate accumulator for the
> entire
> > >       job lifelong time. This is different than the case where we
> > > delete the
> > >       accumulator for each group/window when a grouping-window is
> > finished.
> > >       - When an over window is up to trigger, we grab the
> > >       previous accumulator from the state and accumulate values onto
> > > it with all
> > >       the records till the upperBoundary of the current window, and
> > > retract all
> > >       the out of scope records till its lowerBoundary. We emit the
> > >       aggregate result and save the accumulator for the next window.
> > >
> > >
> > > Hello Fabian,
> > > I would suggest we should first start working on runtime design of
> > > over window and aggregate. Once we have a good design there, one can
> > > easily
> > add
> > > the support for SQL as well as tableAPI. What do you think?
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
> > wrote:
> > >
> > > > Hi Radu,
> > > >
> > > > thanks for your comments!
> > > >
> > > > Yes, my intention is to open new JIRA issues to structure the
> > > > development process. Everybody is very welcome to pick up issues
> > > > and discuss the design proposals.
> > > > At the moment I see the following six issues to start with:
> > > >
> > > > - streaming SQL OVER ROW for processing time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > - streaming SQL OVER RANGE for processing time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > - streaming SQL OVER RANGE for event time
> > > >   - bounded PRECEDING
> > > >   - unbounded PRECEDING
> > > >
> > > > For each of these windows we need corresponding translation rules
> > > > and execution code.
> > > >
> > > > Subsequent JIRAs would be
> > > > - extending the Table API for supported SQL windows
> > > > - add support for FOLLOWING
> > > > - etc.
> > > >
> > > > Regarding the requirement for a sorted state. I am not sure if the
> > > > OVER windows should be implemented using Flink's DataStream window
> > > framework.
> > > > We need a good design document to figure out what is the best
> > > > approach. A ProcessFunction with a sorted state might be a good
> > solution
> > > as well.
> > > >
> > > > Best, Fabian
> > > >
> > > >
> > > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Thanks for starting these discussion - it is very useful.
> > > > > It does make sense indeed to refactor all these and coordinate a
> > > > > bit the efforts not to have overlapping implementations and
> > > > > incompatible
> > > > solutions.
> > > > >
> > > > > If you close the 3 jira issues you mentioned - do you plan to
> > > > > redesign them and open new ones? Do you need help from our side
> > > > > - we can also pick the redesign of some of these new jira
> > > > > issues. For example we already
> > > > have
> > > > > an implementation for this and we can help with the design.
> > > > > Nevertheless, let's coordinate the effort.
> > > > >
> > > > > Regarding the support for the different types of window - I
> > > > > think the
> > > > best
> > > > > option is to split the implementation in small units. We can
> > > > > easily do
> > > > this
> > > > > from the transformation rule class and with this each particular
> > > > > type of window (session/sliding/sliderows/processing time/...)
> > > > > will have a clear implementation and a corresponding
> > > > > architecture within
> > > the jira issue?
> > > > What
> > > > > do you think about such a granularity?
> > > > >
> > > > > Regarding the issue of " Q4: The implementaion of SlideRows
> > > > > still need a custom operator that collects records in a priority
> > > > > queue ordered by the "rowtime", which is similar to the design
> > > > > we discussed in FLINK-4697, right? "
> > > > > Why would you need this operator? The window buffer can act to
> > > > > some
> > > > extent
> > > > > as a priority queue as long as the trigger and evictor is set to
> > > > > work
> > > > based
> > > > > on the rowtime - or maybe I am missing something... Can you
> > > > > please
> > > > clarify
> > > > > this.
> > > > >
> > > > >
> > > > > Dr. Radu Tudoran
> > > > > Senior Research Engineer - Big Data Expert IT R&D Division
> > > > >
> > > > >
> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center
> > > > > Riesstrasse 25, 80992 München
> > > > >
> > > > > E-mail: [hidden email]
> > > > > Mobile: +49 15209084330
> > > > > Telephone: +49 891588344173
> > > > >
> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
> > > > > Düsseldorf, Germany, www.huawei.com Registered Office:
> > > > > Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> > > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der
> > > > > Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail
> > > > > and its attachments contain confidential information from
> > > > > HUAWEI, which is intended only for the person or entity whose
> > > > > address
> > > is
> > > > > listed above. Any use of the information contained herein in any
> > > > > way (including, but not limited to, total or partial disclosure,
> > > > reproduction,
> > > > > or dissemination) by persons other than the intended
> > > > > recipient(s) is prohibited. If you receive this e-mail in error,
> > > > > please notify the
> > > sender
> > > > > by phone or email immediately and delete it!
> > > > >
> > > > >
> > > > > -----Original Message-----
> > > > > From: Jark Wu [mailto:[hidden email]]
> > > > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > > > To: [hidden email]
> > > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > Windows
> > > > for
> > > > > streaming tables
> > > > >
> > > > > Hi Fabian,
> > > > >
> > > > > Thanks for bringing up this discussion and the nice approach to
> > > > > avoid overlapping contributions.
> > > > >
> > > > > All of these make sense to me. But I have some questions.
> > > > >
> > > > > Q1: If I understand correctly, we will not support TumbleRows
> > > > > and SessionRows at the beginning. But maybe support them as a
> > > > > syntax
> > sugar
> > > > (in
> > > > > Table API) when the SlideRows is supported in the future. Right ?
> > > > >
> > > > > Q2: How to support SessionRows based on SlideRows ?  I don't get
> > > > > how
> > to
> > > > > partition on "gap-separated".
> > > > >
> > > > > Q3: Should we break down the approach into smaller tasks for
> > streaming
> > > > > tables and batch tables ?
> > > > >
> > > > > Q4: The implementaion of SlideRows still need a custom operator
> > > > > that collects records in a priority queue ordered by the
> > > > > "rowtime", which
> > is
> > > > > similar to the design we discussed in FLINK-4697, right?
> > > > >
> > > > > +1 not support for OVER ROW for event time at this point.
> > > > >
> > > > > Regards, Jark
> > > > >
> > > > >
> > > > > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > > > > >
> > > > > > Hi,
> > > > > > We are also interested in streaming sql and very willing to
> > > participate
> > > > > and contribute.
> > > > > >
> > > > > > We are now in progress and we will also contribute to calcite
> > > > > > to
> > push
> > > > > forward the window and stream-join support.
> > > > > >
> > > > > >
> > > > > >
> > > > > > --------------
> > > > > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> > > 2017年1月24日
> > > > > > 5:55
> > > > > > Receiver: [hidden email]
> > > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > Windows
> > > > > > for streaming tables
> > > > > >
> > > > > > Hi Haohui,
> > > > > >
> > > > > > our plan was in fact to piggy-back on Calcite and use the
> > > > > > TUMBLE
> > > > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > > > Unfortunately, this issue does not seem to be very active, so
> > > > > > I
> > don't
> > > > > know what the progress is.
> > > > > >
> > > > > > I would suggest to move the discussion about group windows to
> > > > > > a
> > > > separate
> > > > > thread and keep this one focused on the organization of the SQL
> > > > > OVER windows.
> > > > > >
> > > > > > Best,
> > > > > > Fabian
> > > > > >
> > > > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > > > >
> > > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > > > > >
> > > > > >> Hi Fabian,
> > > > > >>
> > > > > >> FLINK-4692 has added the support for tumbling window and we
> > > > > >> are excited to try it out and expose it as a SQL construct.
> > > > > >>
> > > > > >> Just curious -- what's your thought on the SQL syntax on
> > > > > >> tumbling
> > > > > window?
> > > > > >>
> > > > > >> Implementation wise it might make sense to think tumbling
> > > > > >> window
> > as
> > > a
> > > > > >> special case of the sliding window.
> > > > > >>
> > > > > >> The problem I see is that the OVER construct might be
> > > > > >> insufficient
> > > to
> > > > > >> support all the use cases of tumbling windows. For example,
> > > > > >> it
> > fails
> > > > > >> to express tumbling windows that have fractional time units
> > > > > >> (as pointed out in http://calcite.apache.org/docs/stream.html).
> > > > > >>
> > > > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to
> > > > > >> address
> > this
> > > > > issue.
> > > > > >>
> > > > > >> Do you think it is a good idea to follow the same conventions?
> > Your
> > > > > >> ideas are appreciated.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Haohui
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai
> > > > > >> <[hidden email]>
> > > > wrote:
> > > > > >>
> > > > > >>> +1
> > > > > >>>
> > > > > >>> We are also quite interested in these features and would
> > > > > >>> love to participate and contribute.
> > > > > >>>
> > > > > >>> ~Haohui
> > > > > >>>
> > > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske
> > > > > >>> <[hidden email]
> > >
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Hi everybody,
> > > > > >>>>
> > > > > >>>> it seems that currently several contributors are working on
> > > > > >>>> new features for the streaming Table API / SQL around row
> > > > > >>>> windows
> > (as
> > > > > >>>> defined in
> > > > > >>>> FLIP-11
> > > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > > FLINK-4680,
> > > > > >>>> FLINK-5584).
> > > > > >>>> Since these efforts overlap quite a bit I spent some time
> > thinking
> > > > > >>>> about how we can approach these features and how to avoid
> > > > > >>>> overlapping contributions.
> > > > > >>>>
> > > > > >>>> The challenge here is the following. Some of the Table API
> > > > > >>>> row windows
> > > > > >> as
> > > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> > other
> > > > > >>>> cannot be easily expressed as such (TumbleRows for
> > > > > >>>> row-count intervals, SessionRows).
> > > > > >>>> However, since Calcite already supports SQL OVER windows,
> > > > > >>>> we can reuse
> > > > > >> the
> > > > > >>>> optimization logic for some of the Table API row windows. I
> > > > > >>>> also thought about the semantics of the TumbleRows and
> > > > > >>>> SessionRows windows as defined in
> > > > > >>>> FLIP-11 and came to the conclusion that these are not well
> > defined
> > > > > >>>> in
> > > > > >>>> FLIP-11 and should rather be defined as SlideRows windows
> > > > > >>>> with a special PARTITION BY clause.
> > > > > >>>>
> > > > > >>>> I propose to approach SQL OVER windows and Table API row
> > > > > >>>> windows
> > > as
> > > > > >>>> follows:
> > > > > >>>>
> > > > > >>>> We start with three simple cases for SQL OVER windows (not
> > > > > >>>> Table API
> > > > > >> yet):
> > > > > >>>>
> > > > > >>>> * OVER RANGE for event time
> > > > > >>>> * OVER RANGE for processing time
> > > > > >>>> * OVER ROW for processing time
> > > > > >>>>
> > > > > >>>> All cases fulfill the following restrictions:
> > > > > >>>> - All aggregations in SELECT must refer to the same window.
> > > > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or
> > > > > >>>> on a marker function that indicates processing time.
> > > > > >>>> Additional sort attributes are not supported initially.
> > > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> > "BETWEEN
> > > x
> > > > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > > > >>>>
> > > > > >>>> OVER ROW for event time cannot be easily supported. With
> > > > > >>>> event time, we may have late records which need to be
> > > > > >>>> injected into
> > the
> > > > > >>>> order of records.
> > > > > >>>> When
> > > > > >>>> a record in injected in to the order where a row-count
> > > > > >>>> window
> > has
> > > > > >> already
> > > > > >>>> been computed, this and all following windows will change.
> > > > > >>>> We
> > > could
> > > > > >> either
> > > > > >>>> drop the record or sent out many retraction records. I
> > > > > >>>> think it
> > is
> > > > > >>>> best
> > > > > >> to
> > > > > >>>> not open this can of worms at this point.
> > > > > >>>>
> > > > > >>>> The rational for all of the above restrictions is to have
> > > > > >>>> first versions of OVER windows soon.
> > > > > >>>> Once we have the above cases covered we can extend and
> > > > > >>>> remove
> > > > > >> limitations
> > > > > >>>> as follows:
> > > > > >>>>
> > > > > >>>> - Table API SlideRow windows (with the same restrictions as
> > > above).
> > > > > >>>> This will be mostly API work since the execution part has
> > > > > >>>> been
> > > > solved
> > > > > before.
> > > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > > > >>>> - Add support for different windows in SELECT. All windows
> > > > > >>>> must
> > be
> > > > > >>>> partitioned and ordered in the same way.
> > > > > >>>> - Add support for additional ORDER BY attributes (besides
> time).
> > > > > >>>>
> > > > > >>>> As I said before, TumbleRows and SessionRows windows as in
> > FLIP-11
> > > > > >>>> are
> > > > > >> not
> > > > > >>>> well defined, IMO.
> > > > > >>>> They can be expressed as SlideRows windows with special
> > > > > >>>> partitioning (partitioning on fixed, non-overlapping time
> > > > > >>>> ranges for TumbleRows, and gap-separated, non-overlapping
> > > > > >>>> time ranges
> > for
> > > > > >>>> SessionRows) I would not start to work on those yet.
> > > > > >>>>
> > > > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > > development
> > > > > >>>> of these
> > > > > >> features
> > > > > >>>> as outlined above with corresponding JIRA issues.
> > > > > >>>>
> > > > > >>>> What do others think? (I cc'ed the contributors assigned to
> > > > > >>>> the above
> > > > > >> JIRA
> > > > > >>>> issues)
> > > > > >>>>
> > > > > >>>> Best, Fabian
> > > > > >>>>
> > > > > >>>> [1]
> > > > > >>>>
> > > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > > >> 11%3A+Table+API+Stream+Aggregations
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Shaoxuan Wang
In reply to this post by Fabian Hueske-2
 Sorry for the late response.

Hi Jark,
Thanks for raising a good question - my proposal “may not work for
non-incremental aggregation (e.g. max, min, and median)”, but I have
some different opinions.
Yes, I have proposed a concept of “accumulate on getValue” in my UDAGG
proposal https://goo.gl/6ntclB. But after giving this some thorough
thought, I think this concept is unfortunately not necessary. All
aggregates should be suitable for incremental aggregate (even for max, min
and median). One can choose to accumulate all records at the same time when
the window is completed. But it will still execute the accumulate method to
update the accumulator state for each record. The way it executes
accumulate function to accumulate each record already implies that
the aggregation is incremental. Whether it is accumulated once at each
record arrival (incremental) or accumulated all records when the window is
completed (non-incremental), really does not matter in terms of the
correctness and the complexity. On the other hand, the non-incremental
approach will introduce CPU jitter and latency overhead, so I would like to
propose to always apply incremental mode for all streaming aggregations.

Fabian,
Yes, my proposal will not work for the aggregate if it does not have
retract function. But I believe we can always implement the retract
functions (as it is just the opposite operation to accumulate, i.e. the
retract will exist if an accumulate exists) for all aggregates. If this
makes sense to all of you, I would like to propose that UDAGG should be
forced to provision the retract function.

What do you think? I have updated my UDAGG proposal and UDAGG Jira, we can
move the discussion there if you think that is more appropriate.

Regards,
Shaoxuan


On Thu, Jan 26, 2017 at 10:14 PM, Fabian Hueske <[hidden email]> wrote:

> Hi everybody,
>
> I created the following JIRAs:
>
> - FLINK-5653: processing time OVER ROWS x PRECEDING
> - FLINK-5654: processing time OVER RANGE x PRECEDING
> - FLINK-5655: event time OVER RANGE x PRECEDING
>
> - FLINK-5656: processing time OVER ROWS UNBOUNDED PRECEDING
> - FLINK-5657: processing time OVER RANGE UNBOUNDED PRECEDING
> - FLINK-5658: event time OVER RANGE UNBOUNDED PRECEDING
>
> Let's move the discussions about the design of the runtime operators to
> these issues.
>
> Since some of you have already started working on some of the issues, it
> would be good if you could pick the ones you plan to work on.
> If there are overlapping interests, it would be great to collaborate, e.g,
> design, coding, testing, code review.
>
> A few more comments:
>
> @Shaoxuan: You are right, we can implement the processing time windows with
> ProcessFunction as well. A GlobalWindow is essentially a FIFO queue of
> arriving records. With custom triggers and evictors, we could implement the
> functionality of the processing time OVER windows. We could ask Aljoscha
> (he knows every detail of Flink's window framework) if a ProcessFunction
> has more optimization potential than a GlobalWindow.
>
> @Jark: That's a good point. We need logic to compute non-retractable
> aggregation functions as well.
>
> @Radu: So far we had very coarse grained DataStreamRelNodes (e.g.,
> DataStreamAggregate implements tumbling, sliding, and session windows for
> processing and event time). However, it might make sense to start
> implementing more fine-grained DataStreamRelNodes.
>
> I'll go ahead and modify the previous JIRAs about SlideRow, TumbleRow, and
> SessionRow to explicitly address the Table API and how they relate to the
> new JIRAs.
>
> Best,
> Fabian
>
> 2017-01-26 7:05 GMT+01:00 Jark Wu <[hidden email]>:
>
> > Hi Fabian,
> >
> > I completely aggree with the six JIRAs and different runtime
> > implementations.
> > And I also aggree with @shaoxuan's proposal can work for both processing
> > time and event time.
> >
> > Hi Shaoxuan,
> >
> > I really like the idea you proposed that using retraction to decrease
> > computation.
> > It's a great optimization for incremental aggregation (only one reduced
> > value is kept).
> > But may not work for non-incremental aggregation (e.g. max, min, and
> > median) which
> > needs to buffer all the records in the group & window, and recalculate
> all
> > the records
> > when retraction happen. That means we will get a worse performance for
> > non-incremental
> >  aggregations when using retraction optimization here.
> >
> > IMO, we still need a general design for OVER window as following:
> >
> > 1. we buffer records in a list state (maybe sorted) for each group
> > 2. when an over window is up to trigger, create an accumulator and
> > accumulate all
> >     the records in the boundary of the list state.
> > 3. emit the aggregate result and delete the accumulator.
> >
> > And the retraction mechanism that keeps the accumulator for the whole
> life
> > without deleting, could be implemented as an optimization on it for
> > increamental aggregations.
> >
> > Regards, Jark
> >
> >
> > > 在 2017年1月26日,上午11:42,Shaoxuan Wang <[hidden email]> 写道:
> > >
> > > Yes Fabian,
> > > I will complete my design with more thorough thoughts. BTW, I think the
> > > incremental aggregate (the key point I suggested is to eliminate state
> > per
> > > each window) I proposed should work for both processing time and event
> > > time. It just does not need a sorted state for the processing time
> > > scenarios. (Need to verify).
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > >
> > > On Wed, Jan 25, 2017 at 5:55 PM, Fabian Hueske <[hidden email]>
> > wrote:
> > >
> > >> Hi everybody,
> > >>
> > >> thanks for the great discussions so far. It's awesome to see so much
> > >> interest in this topic!
> > >>
> > >> First, I'd like to comment on the development process for this feature
> > and
> > >> later on the design of the runtime:
> > >>
> > >> Dev Process
> > >> ----
> > >> @Shaoxuan, I completely agree with you. We should first come up with
> > good
> > >> designs for the runtime operators of the different window types. Once
> we
> > >> have that, we can start implementing the operators and integrate them
> > with
> > >> Calcite's optimization. This will be an intermediate step and as a
> > >> byproduct give us support for SQL OVER windows. Once this is done, we
> > can
> > >> extend the Table API and translate the Table API calls into the same
> > >> RelNodes as Calcite's SQL parser does.
> > >>
> > >> Runtime Design
> > >> ----
> > >> I think it makes sense to distinguish the different types of OVER
> > windows
> > >> because they have different requirements which result in different
> > runtime
> > >> implementations (with different implementation complexity and
> > performance).
> > >> In a previous mail I proposed to split the support for OVER windows
> into
> > >> the following subtasks:
> > >>
> > >> # bounded PRECEDING
> > >> - OVER ROWS for processing time
> > >>  - does not require sorted state (data always arrives in processing
> time
> > >> order)
> > >>  - no need to consider retraction (processing time is never late)
> > >>  - defines windows on row count.
> > >>  - A GlobalWindow with evictor + trigger might be the best
> > implementation
> > >> (basically the same as DataStream.countWindow(long, long). We need to
> > add
> > >> timeouts to clean up state for non-used keys though.
> > >>
> > >> - OVER RANGE for processing time
> > >>  - does not require sorted state (data always arrives in processing
> time
> > >> order)
> > >>  - no need to consider retraction (processing time is never late)
> > >>  - defines windows on row count
> > >>  - I think this could also be implemented with a GlobalWindow with
> > evictor
> > >> + trigger (need to verify)
> > >>
> > >> - OVER RANGE for event time
> > >>  - need for sorted state (late data possible)
> > >>  - IMO, a ProcessFunction gives us the most flexibility in adding
> later
> > >> features (retraction, update rate, etc.)
> > >>  - @Shaoxuan, you sketched a good design. Would you like to continue
> > with
> > >> a design proposal?
> > >>
> > >> # UNBOUNDED PRECEDING
> > >> Similar considerations apply for the UNBOUNDED PRECEDING cases of the
> > above
> > >> window types.
> > >>
> > >> If we all agree that the separation into six JIRAs (bounded/unbounded
> *
> > >> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> > >> discussions about the design of the implementation to the individual
> > JIRAs.
> > >>
> > >> What do think?
> > >>
> > >> Best, Fabian
> > >>
> > >> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:
> > >>
> > >>> Hi Liuxinchun,
> > >>> I am not sure where did you get the inception: anyone has suggested
> "to
> > >>> process Event time window in Sliding Row Window". If you were
> referring
> > >> my
> > >>> post, there may be some misunderstanding there. I think you were
> asking
> > >> the
> > >>> similar question as Hongyuhong. I have just replied to him. Please
> > take a
> > >>> look and let me know if that makes sense to you. "Retraction" is an
> > >>> important building block to compute correct incremental results in
> > >>> streaming. It is another big topic, we should discuss this in another
> > >>> thread.
> > >>>
> > >>> Regards,
> > >>> Shaoxuan
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]>
> > >> wrote:
> > >>>
> > >>>> I don't think it is a good idea to process Event time window in
> > Sliding
> > >>>> Row Window. In Sliding Time window, when an element is late, we can
> > >>> trigger
> > >>>> the recalculation of the related windows. And the sliding period is
> > >>>> coarse-gained, We only need to recalculate size/sliding number of
> > >>> windows.
> > >>>> But in Sliding Row Window, the calculation is triggered when every
> > >>> element
> > >>>> is coming. The sliding period is becoming fine-gained. When an
> element
> > >> is
> > >>>> late, there are so many "windows" are influenced. Even if we store
> all
> > >>> the
> > >>>> raw data, the computation is very large.
> > >>>>
> > >>>> I think if it is possible to set a standard to sliding Event Time
> Row
> > >>>> Window, When certain elements are late, we can only recalculate
> > partial
> > >>>> windows and permit some error. For example, we can only recalculate
> > the
> > >>>> windows end in range between (lateElement.timestamp - leftDelta,
> > >>>> lateElement.timestamp] and those windows begin in range between
> > >>>> [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > >>>> ////////////////////////////////////////////////////////////
> > >>>> //////////////////////////
> > >>>> Hi everyone,
> > >>>> Thanks for this great discussion, and glad to see more and more
> people
> > >>> are
> > >>>> interested on stream SQL & tableAPI.
> > >>>>
> > >>>> IMO, the key problems for Over window design are the SQL semantics
> and
> > >>> the
> > >>>> runtime design. I totally agree with Fabian that we should skip the
> > >>> design
> > >>>> of TumbleRows and SessionRows windows for now, as they are not well
> > >>> defined
> > >>>> in SQL semantics.
> > >>>>
> > >>>> Runtime design is the most crucial part we are interested in and
> > >>>> volunteered to contribute into. We have thousands of machines
> running
> > >>> flink
> > >>>> streaming jobs. The costs in terms of CPU, memory, and state are the
> > >>> vital
> > >>>> factors that we have to taken into account. We have been working on
> > the
> > >>>> design of OVER window in the past months, and planning to send out a
> > >>>> detailed design doc to DEV quite soon. But since Fabian started a
> good
> > >>>> discussion on OVER window, I would like to share our ideas/thoughts
> > >> about
> > >>>> the runtime design for OVER window.
> > >>>>
> > >>>>   1. As SunJincheng pointed out earlier, sliding window does not
> work
> > >>> for
> > >>>>   unbounded preceding, we need alternative approach for unbound over
> > >>>> window.
> > >>>>   2. Though sliding window may work for some cases of bounded
> window,
> > >>>>   it is not very efficient thereby should not be used for
> production.
> > >> To
> > >>>> the
> > >>>>   best of my understanding, the current runtime implementation of
> > >>> sliding
> > >>>>   window has not leveraged the concepts of state Panes yet. This
> means
> > >>>> that
> > >>>>   if we use sliding window for OVER window,  there will be a backend
> > >>> state
> > >>>>   created per each group (partition by) and each row, and whenever a
> > >> new
> > >>>>   record arrives, it will be accumulated to all the existing windows
> > >>> that
> > >>>> has
> > >>>>   not been closed. This would cause quite a lot of overhead in terms
> > >> of
> > >>>> both
> > >>>>   CPU and memory&state.
> > >>>>   3. Fabian has mentioned an approach of leveraging
> “ProcessFunction”
> > >>> and
> > >>>>   a “sortedState”. I like this idea. The design details on this are
> > >> not
> > >>>> quite
> > >>>>   clear yet. So I would like to add more thoughts on this.
> Regardless
> > >>>>   which dataStream API we are going to use (it is very likely that
> we
> > >>> need
> > >>>>   a new API), we should come out with an optimal approach. The
> purpose
> > >>> of
> > >>>>   grouping window and over window is to partition the data, such
> that
> > >> we
> > >>>> can
> > >>>>   generate the aggregate results. So when we talk about the design
> of
> > >>> OVER
> > >>>>   window, we have to think about the aggregates. As we proposed in
> our
> > >>>> recent
> > >>>>   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator
> will
> > >>> be
> > >>>>   stored in the aggregate state. Besides accumulator, we have also
> > >>>> introduced
> > >>>>   a retract API for UDAGG. With aggregate accumulator and retract
> > >> API, I
> > >>>> am
> > >>>>   proposing a runtime approach to implement the OVER window as
> > >>> followings.
> > >>>>   4.
> > >>>>      - We first implement a sorted state interface
> > >>>>      - Per each group, we just create one sorted state. When a new
> > >>> record
> > >>>>      arrives, it will insert into this sorted state, in the
> meanwhile
> > >> it
> > >>>> will be
> > >>>>      accumulated to the aggregate accumulator.
> > >>>>      - For over window, we keep the aggregate accumulator for the
> > >> entire
> > >>>>      job lifelong time. This is different than the case where we
> > >> delete
> > >>>> the
> > >>>>      accumulator for each group/window when a grouping-window is
> > >>> finished.
> > >>>>      - When an over window is up to trigger, we grab the
> > >>>>      previous accumulator from the state and accumulate values onto
> it
> > >>>> with all
> > >>>>      the records till the upperBoundary of the current window, and
> > >>>> retract all
> > >>>>      the out of scope records till its lowerBoundary. We emit the
> > >>>>      aggregate result and save the accumulator for the next window.
> > >>>>
> > >>>>
> > >>>> Hello Fabian,
> > >>>> I would suggest we should first start working on runtime design of
> > over
> > >>>> window and aggregate. Once we have a good design there, one can
> easily
> > >>> add
> > >>>> the support for SQL as well as tableAPI. What do you think?
> > >>>>
> > >>>> Regards,
> > >>>> Shaoxuan
> > >>>>
> > >>>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
> > >>> wrote:
> > >>>>
> > >>>>> Hi Radu,
> > >>>>>
> > >>>>> thanks for your comments!
> > >>>>>
> > >>>>> Yes, my intention is to open new JIRA issues to structure the
> > >>>>> development process. Everybody is very welcome to pick up issues
> and
> > >>>>> discuss the design proposals.
> > >>>>> At the moment I see the following six issues to start with:
> > >>>>>
> > >>>>> - streaming SQL OVER ROW for processing time
> > >>>>>  - bounded PRECEDING
> > >>>>>  - unbounded PRECEDING
> > >>>>>
> > >>>>> - streaming SQL OVER RANGE for processing time
> > >>>>>  - bounded PRECEDING
> > >>>>>  - unbounded PRECEDING
> > >>>>>
> > >>>>> - streaming SQL OVER RANGE for event time
> > >>>>>  - bounded PRECEDING
> > >>>>>  - unbounded PRECEDING
> > >>>>>
> > >>>>> For each of these windows we need corresponding translation rules
> and
> > >>>>> execution code.
> > >>>>>
> > >>>>> Subsequent JIRAs would be
> > >>>>> - extending the Table API for supported SQL windows
> > >>>>> - add support for FOLLOWING
> > >>>>> - etc.
> > >>>>>
> > >>>>> Regarding the requirement for a sorted state. I am not sure if the
> > >>>>> OVER windows should be implemented using Flink's DataStream window
> > >>>> framework.
> > >>>>> We need a good design document to figure out what is the best
> > >>>>> approach. A ProcessFunction with a sorted state might be a good
> > >>> solution
> > >>>> as well.
> > >>>>>
> > >>>>> Best, Fabian
> > >>>>>
> > >>>>>
> > >>>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> > >>>>>
> > >>>>>> Hi all,
> > >>>>>>
> > >>>>>> Thanks for starting these discussion - it is very useful.
> > >>>>>> It does make sense indeed to refactor all these and coordinate a
> > >> bit
> > >>>>>> the efforts not to have overlapping implementations and
> > >> incompatible
> > >>>>> solutions.
> > >>>>>>
> > >>>>>> If you close the 3 jira issues you mentioned - do you plan to
> > >>>>>> redesign them and open new ones? Do you need help from our side -
> > >> we
> > >>>>>> can also pick the redesign of some of these new jira issues. For
> > >>>>>> example we already
> > >>>>> have
> > >>>>>> an implementation for this and we can help with the design.
> > >>>>>> Nevertheless, let's coordinate the effort.
> > >>>>>>
> > >>>>>> Regarding the support for the different types of window - I think
> > >>>>>> the
> > >>>>> best
> > >>>>>> option is to split the implementation in small units. We can
> easily
> > >>>>>> do
> > >>>>> this
> > >>>>>> from the transformation rule class and with this each particular
> > >>>>>> type of window (session/sliding/sliderows/processing time/...)
> > >> will
> > >>>>>> have a clear implementation and a corresponding architecture
> within
> > >>>> the jira issue?
> > >>>>> What
> > >>>>>> do you think about such a granularity?
> > >>>>>>
> > >>>>>> Regarding the issue of " Q4: The implementaion of SlideRows still
> > >>>>>> need a custom operator that collects records in a priority queue
> > >>>>>> ordered by the "rowtime", which is similar to the design we
> > >>>>>> discussed in FLINK-4697, right? "
> > >>>>>> Why would you need this operator? The window buffer can act to
> some
> > >>>>> extent
> > >>>>>> as a priority queue as long as the trigger and evictor is set to
> > >>>>>> work
> > >>>>> based
> > >>>>>> on the rowtime - or maybe I am missing something... Can you please
> > >>>>> clarify
> > >>>>>> this.
> > >>>>>>
> > >>>>>>
> > >>>>>> Dr. Radu Tudoran
> > >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> > >>>>>>
> > >>>>>>
> > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >>>>>> European Research Center
> > >>>>>> Riesstrasse 25, 80992 München
> > >>>>>>
> > >>>>>> E-mail: [hidden email]
> > >>>>>> Mobile: +49 15209084330
> > >>>>>> Telephone: +49 891588344173
> > >>>>>>
> > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > >>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> > >> 56063,
> > >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > >> 56063,
> > >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >>>>>> This e-mail and its attachments contain confidential information
> > >> from
> > >>>>>> HUAWEI, which is intended only for the person or entity whose
> > >> address
> > >>>> is
> > >>>>>> listed above. Any use of the information contained herein in any
> > >> way
> > >>>>>> (including, but not limited to, total or partial disclosure,
> > >>>>> reproduction,
> > >>>>>> or dissemination) by persons other than the intended recipient(s)
> > >> is
> > >>>>>> prohibited. If you receive this e-mail in error, please notify the
> > >>>> sender
> > >>>>>> by phone or email immediately and delete it!
> > >>>>>>
> > >>>>>>
> > >>>>>> -----Original Message-----
> > >>>>>> From: Jark Wu [mailto:[hidden email]]
> > >>>>>> Sent: Tuesday, January 24, 2017 6:53 AM
> > >>>>>> To: [hidden email]
> > >>>>>> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > >>> Windows
> > >>>>> for
> > >>>>>> streaming tables
> > >>>>>>
> > >>>>>> Hi Fabian,
> > >>>>>>
> > >>>>>> Thanks for bringing up this discussion and the nice approach to
> > >> avoid
> > >>>>>> overlapping contributions.
> > >>>>>>
> > >>>>>> All of these make sense to me. But I have some questions.
> > >>>>>>
> > >>>>>> Q1: If I understand correctly, we will not support TumbleRows and
> > >>>>>> SessionRows at the beginning. But maybe support them as a syntax
> > >>> sugar
> > >>>>> (in
> > >>>>>> Table API) when the SlideRows is supported in the future. Right ?
> > >>>>>>
> > >>>>>> Q2: How to support SessionRows based on SlideRows ?  I don't get
> > >> how
> > >>> to
> > >>>>>> partition on "gap-separated".
> > >>>>>>
> > >>>>>> Q3: Should we break down the approach into smaller tasks for
> > >>> streaming
> > >>>>>> tables and batch tables ?
> > >>>>>>
> > >>>>>> Q4: The implementaion of SlideRows still need a custom operator
> > >> that
> > >>>>>> collects records in a priority queue ordered by the "rowtime",
> > >> which
> > >>> is
> > >>>>>> similar to the design we discussed in FLINK-4697, right?
> > >>>>>>
> > >>>>>> +1 not support for OVER ROW for event time at this point.
> > >>>>>>
> > >>>>>> Regards, Jark
> > >>>>>>
> > >>>>>>
> > >>>>>>> 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > >>>>>>>
> > >>>>>>> Hi,
> > >>>>>>> We are also interested in streaming sql and very willing to
> > >>>> participate
> > >>>>>> and contribute.
> > >>>>>>>
> > >>>>>>> We are now in progress and we will also contribute to calcite to
> > >>> push
> > >>>>>> forward the window and stream-join support.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> --------------
> > >>>>>>> Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> > >>>> 2017年1月24日
> > >>>>>>> 5:55
> > >>>>>>> Receiver: [hidden email]
> > >>>>>>> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > >>> Windows
> > >>>>>>> for streaming tables
> > >>>>>>>
> > >>>>>>> Hi Haohui,
> > >>>>>>>
> > >>>>>>> our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > >>>>>> function [1] once is it is available (CALCITE-1345 [2]).
> > >>>>>>> Unfortunately, this issue does not seem to be very active, so I
> > >>> don't
> > >>>>>> know what the progress is.
> > >>>>>>>
> > >>>>>>> I would suggest to move the discussion about group windows to a
> > >>>>> separate
> > >>>>>> thread and keep this one focused on the organization of the SQL
> > >> OVER
> > >>>>>> windows.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Fabian
> > >>>>>>>
> > >>>>>>> [1] http://calcite.apache.org/docs/stream.html)
> > >>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > >>>>>>>
> > >>>>>>> 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > >>>>>>>
> > >>>>>>>> Hi Fabian,
> > >>>>>>>>
> > >>>>>>>> FLINK-4692 has added the support for tumbling window and we are
> > >>>>>>>> excited to try it out and expose it as a SQL construct.
> > >>>>>>>>
> > >>>>>>>> Just curious -- what's your thought on the SQL syntax on
> > >> tumbling
> > >>>>>> window?
> > >>>>>>>>
> > >>>>>>>> Implementation wise it might make sense to think tumbling window
> > >>> as
> > >>>> a
> > >>>>>>>> special case of the sliding window.
> > >>>>>>>>
> > >>>>>>>> The problem I see is that the OVER construct might be
> > >> insufficient
> > >>>> to
> > >>>>>>>> support all the use cases of tumbling windows. For example, it
> > >>> fails
> > >>>>>>>> to express tumbling windows that have fractional time units (as
> > >>>>>>>> pointed out in http://calcite.apache.org/docs/stream.html).
> > >>>>>>>>
> > >>>>>>>> It looks to me that the Calcite / Azure Stream Analytics have
> > >>>>>>>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address
> > >>> this
> > >>>>>> issue.
> > >>>>>>>>
> > >>>>>>>> Do you think it is a good idea to follow the same conventions?
> > >>> Your
> > >>>>>>>> ideas are appreciated.
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>> Haohui
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> +1
> > >>>>>>>>>
> > >>>>>>>>> We are also quite interested in these features and would love
> > >> to
> > >>>>>>>>> participate and contribute.
> > >>>>>>>>>
> > >>>>>>>>> ~Haohui
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <
> > >> [hidden email]
> > >>>>
> > >>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi everybody,
> > >>>>>>>>>>
> > >>>>>>>>>> it seems that currently several contributors are working on
> > >> new
> > >>>>>>>>>> features for the streaming Table API / SQL around row windows
> > >>> (as
> > >>>>>>>>>> defined in
> > >>>>>>>>>> FLIP-11
> > >>>>>>>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > >>>> FLINK-4680,
> > >>>>>>>>>> FLINK-5584).
> > >>>>>>>>>> Since these efforts overlap quite a bit I spent some time
> > >>> thinking
> > >>>>>>>>>> about how we can approach these features and how to avoid
> > >>>>>>>>>> overlapping contributions.
> > >>>>>>>>>>
> > >>>>>>>>>> The challenge here is the following. Some of the Table API row
> > >>>>>>>>>> windows
> > >>>>>>>> as
> > >>>>>>>>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> > >>> other
> > >>>>>>>>>> cannot be easily expressed as such (TumbleRows for row-count
> > >>>>>>>>>> intervals, SessionRows).
> > >>>>>>>>>> However, since Calcite already supports SQL OVER windows, we
> > >> can
> > >>>>>>>>>> reuse
> > >>>>>>>> the
> > >>>>>>>>>> optimization logic for some of the Table API row windows. I
> > >> also
> > >>>>>>>>>> thought about the semantics of the TumbleRows and SessionRows
> > >>>>>>>>>> windows as defined in
> > >>>>>>>>>> FLIP-11 and came to the conclusion that these are not well
> > >>> defined
> > >>>>>>>>>> in
> > >>>>>>>>>> FLIP-11 and should rather be defined as SlideRows windows
> > >> with a
> > >>>>>>>>>> special PARTITION BY clause.
> > >>>>>>>>>>
> > >>>>>>>>>> I propose to approach SQL OVER windows and Table API row
> > >> windows
> > >>>> as
> > >>>>>>>>>> follows:
> > >>>>>>>>>>
> > >>>>>>>>>> We start with three simple cases for SQL OVER windows (not
> > >> Table
> > >>>>>>>>>> API
> > >>>>>>>> yet):
> > >>>>>>>>>>
> > >>>>>>>>>> * OVER RANGE for event time
> > >>>>>>>>>> * OVER RANGE for processing time
> > >>>>>>>>>> * OVER ROW for processing time
> > >>>>>>>>>>
> > >>>>>>>>>> All cases fulfill the following restrictions:
> > >>>>>>>>>> - All aggregations in SELECT must refer to the same window.
> > >>>>>>>>>> - PARTITION BY may not contain the rowtime attribute.
> > >>>>>>>>>> - ORDER BY must be on rowtime attribute (for event time) or
> > >> on a
> > >>>>>>>>>> marker function that indicates processing time. Additional
> > >> sort
> > >>>>>>>>>> attributes are not supported initially.
> > >>>>>>>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> > >>> "BETWEEN
> > >>>> x
> > >>>>>>>>>> PRECEDING AND CURRENT ROW" are supported.
> > >>>>>>>>>>
> > >>>>>>>>>> OVER ROW for event time cannot be easily supported. With event
> > >>>>>>>>>> time, we may have late records which need to be injected into
> > >>> the
> > >>>>>>>>>> order of records.
> > >>>>>>>>>> When
> > >>>>>>>>>> a record in injected in to the order where a row-count window
> > >>> has
> > >>>>>>>> already
> > >>>>>>>>>> been computed, this and all following windows will change. We
> > >>>> could
> > >>>>>>>> either
> > >>>>>>>>>> drop the record or sent out many retraction records. I think
> > >> it
> > >>> is
> > >>>>>>>>>> best
> > >>>>>>>> to
> > >>>>>>>>>> not open this can of worms at this point.
> > >>>>>>>>>>
> > >>>>>>>>>> The rational for all of the above restrictions is to have
> > >> first
> > >>>>>>>>>> versions of OVER windows soon.
> > >>>>>>>>>> Once we have the above cases covered we can extend and remove
> > >>>>>>>> limitations
> > >>>>>>>>>> as follows:
> > >>>>>>>>>>
> > >>>>>>>>>> - Table API SlideRow windows (with the same restrictions as
> > >>>> above).
> > >>>>>>>>>> This will be mostly API work since the execution part has been
> > >>>>> solved
> > >>>>>> before.
> > >>>>>>>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > >>>>>>>>>> - Add support for different windows in SELECT. All windows
> > >> must
> > >>> be
> > >>>>>>>>>> partitioned and ordered in the same way.
> > >>>>>>>>>> - Add support for additional ORDER BY attributes (besides
> > >> time).
> > >>>>>>>>>>
> > >>>>>>>>>> As I said before, TumbleRows and SessionRows windows as in
> > >>> FLIP-11
> > >>>>>>>>>> are
> > >>>>>>>> not
> > >>>>>>>>>> well defined, IMO.
> > >>>>>>>>>> They can be expressed as SlideRows windows with special
> > >>>>>>>>>> partitioning (partitioning on fixed, non-overlapping time
> > >> ranges
> > >>>>>>>>>> for TumbleRows, and gap-separated, non-overlapping time ranges
> > >>> for
> > >>>>>>>>>> SessionRows) I would not start to work on those yet.
> > >>>>>>>>>>
> > >>>>>>>>>> I would like to close all related JIRA issues (FLINK-4678,
> > >>>>>>>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > >>>> development
> > >>>>>>>>>> of these
> > >>>>>>>> features
> > >>>>>>>>>> as outlined above with corresponding JIRA issues.
> > >>>>>>>>>>
> > >>>>>>>>>> What do others think? (I cc'ed the contributors assigned to
> > >> the
> > >>>>>>>>>> above
> > >>>>>>>> JIRA
> > >>>>>>>>>> issues)
> > >>>>>>>>>>
> > >>>>>>>>>> Best, Fabian
> > >>>>>>>>>>
> > >>>>>>>>>> [1]
> > >>>>>>>>>>
> > >>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > >>>>>>>> 11%3A+Table+API+Stream+Aggregations
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
Hi Shaoxuan,

I think you are right.
The UDAGG interface allows to collect all input values in the accumulation
buffer, so values can also be removed from there in case of an retraction.
This makes all functions retractable at the cost of materializing a
potentially large accumulation buffer, but that's basically the same as a
generic WindowFunction would do.
Moreover, the UDAGG interface allows to immediately remove retracted values
which would not be possible with a WindowFunction.

Best, Fabian

2017-02-06 16:04 GMT+01:00 Shaoxuan Wang <[hidden email]>:

>  Sorry for the late response.
>
> Hi Jark,
> Thanks for raising a good question - my proposal “may not work for
> non-incremental aggregation (e.g. max, min, and median)”, but I have
> some different opinions.
> Yes, I have proposed a concept of “accumulate on getValue” in my UDAGG
> proposal https://goo.gl/6ntclB. But after giving this some thorough
> thought, I think this concept is unfortunately not necessary. All
> aggregates should be suitable for incremental aggregate (even for max, min
> and median). One can choose to accumulate all records at the same time when
> the window is completed. But it will still execute the accumulate method to
> update the accumulator state for each record. The way it executes
> accumulate function to accumulate each record already implies that
> the aggregation is incremental. Whether it is accumulated once at each
> record arrival (incremental) or accumulated all records when the window is
> completed (non-incremental), really does not matter in terms of the
> correctness and the complexity. On the other hand, the non-incremental
> approach will introduce CPU jitter and latency overhead, so I would like to
> propose to always apply incremental mode for all streaming aggregations.
>
> Fabian,
> Yes, my proposal will not work for the aggregate if it does not have
> retract function. But I believe we can always implement the retract
> functions (as it is just the opposite operation to accumulate, i.e. the
> retract will exist if an accumulate exists) for all aggregates. If this
> makes sense to all of you, I would like to propose that UDAGG should be
> forced to provision the retract function.
>
> What do you think? I have updated my UDAGG proposal and UDAGG Jira, we can
> move the discussion there if you think that is more appropriate.
>
> Regards,
> Shaoxuan
>
>
> On Thu, Jan 26, 2017 at 10:14 PM, Fabian Hueske <[hidden email]> wrote:
>
> > Hi everybody,
> >
> > I created the following JIRAs:
> >
> > - FLINK-5653: processing time OVER ROWS x PRECEDING
> > - FLINK-5654: processing time OVER RANGE x PRECEDING
> > - FLINK-5655: event time OVER RANGE x PRECEDING
> >
> > - FLINK-5656: processing time OVER ROWS UNBOUNDED PRECEDING
> > - FLINK-5657: processing time OVER RANGE UNBOUNDED PRECEDING
> > - FLINK-5658: event time OVER RANGE UNBOUNDED PRECEDING
> >
> > Let's move the discussions about the design of the runtime operators to
> > these issues.
> >
> > Since some of you have already started working on some of the issues, it
> > would be good if you could pick the ones you plan to work on.
> > If there are overlapping interests, it would be great to collaborate,
> e.g,
> > design, coding, testing, code review.
> >
> > A few more comments:
> >
> > @Shaoxuan: You are right, we can implement the processing time windows
> with
> > ProcessFunction as well. A GlobalWindow is essentially a FIFO queue of
> > arriving records. With custom triggers and evictors, we could implement
> the
> > functionality of the processing time OVER windows. We could ask Aljoscha
> > (he knows every detail of Flink's window framework) if a ProcessFunction
> > has more optimization potential than a GlobalWindow.
> >
> > @Jark: That's a good point. We need logic to compute non-retractable
> > aggregation functions as well.
> >
> > @Radu: So far we had very coarse grained DataStreamRelNodes (e.g.,
> > DataStreamAggregate implements tumbling, sliding, and session windows for
> > processing and event time). However, it might make sense to start
> > implementing more fine-grained DataStreamRelNodes.
> >
> > I'll go ahead and modify the previous JIRAs about SlideRow, TumbleRow,
> and
> > SessionRow to explicitly address the Table API and how they relate to the
> > new JIRAs.
> >
> > Best,
> > Fabian
> >
> > 2017-01-26 7:05 GMT+01:00 Jark Wu <[hidden email]>:
> >
> > > Hi Fabian,
> > >
> > > I completely aggree with the six JIRAs and different runtime
> > > implementations.
> > > And I also aggree with @shaoxuan's proposal can work for both
> processing
> > > time and event time.
> > >
> > > Hi Shaoxuan,
> > >
> > > I really like the idea you proposed that using retraction to decrease
> > > computation.
> > > It's a great optimization for incremental aggregation (only one reduced
> > > value is kept).
> > > But may not work for non-incremental aggregation (e.g. max, min, and
> > > median) which
> > > needs to buffer all the records in the group & window, and recalculate
> > all
> > > the records
> > > when retraction happen. That means we will get a worse performance for
> > > non-incremental
> > >  aggregations when using retraction optimization here.
> > >
> > > IMO, we still need a general design for OVER window as following:
> > >
> > > 1. we buffer records in a list state (maybe sorted) for each group
> > > 2. when an over window is up to trigger, create an accumulator and
> > > accumulate all
> > >     the records in the boundary of the list state.
> > > 3. emit the aggregate result and delete the accumulator.
> > >
> > > And the retraction mechanism that keeps the accumulator for the whole
> > life
> > > without deleting, could be implemented as an optimization on it for
> > > increamental aggregations.
> > >
> > > Regards, Jark
> > >
> > >
> > > > 在 2017年1月26日,上午11:42,Shaoxuan Wang <[hidden email]> 写道:
> > > >
> > > > Yes Fabian,
> > > > I will complete my design with more thorough thoughts. BTW, I think
> the
> > > > incremental aggregate (the key point I suggested is to eliminate
> state
> > > per
> > > > each window) I proposed should work for both processing time and
> event
> > > > time. It just does not need a sorted state for the processing time
> > > > scenarios. (Need to verify).
> > > >
> > > > Regards,
> > > > Shaoxuan
> > > >
> > > >
> > > > On Wed, Jan 25, 2017 at 5:55 PM, Fabian Hueske <[hidden email]>
> > > wrote:
> > > >
> > > >> Hi everybody,
> > > >>
> > > >> thanks for the great discussions so far. It's awesome to see so much
> > > >> interest in this topic!
> > > >>
> > > >> First, I'd like to comment on the development process for this
> feature
> > > and
> > > >> later on the design of the runtime:
> > > >>
> > > >> Dev Process
> > > >> ----
> > > >> @Shaoxuan, I completely agree with you. We should first come up with
> > > good
> > > >> designs for the runtime operators of the different window types.
> Once
> > we
> > > >> have that, we can start implementing the operators and integrate
> them
> > > with
> > > >> Calcite's optimization. This will be an intermediate step and as a
> > > >> byproduct give us support for SQL OVER windows. Once this is done,
> we
> > > can
> > > >> extend the Table API and translate the Table API calls into the same
> > > >> RelNodes as Calcite's SQL parser does.
> > > >>
> > > >> Runtime Design
> > > >> ----
> > > >> I think it makes sense to distinguish the different types of OVER
> > > windows
> > > >> because they have different requirements which result in different
> > > runtime
> > > >> implementations (with different implementation complexity and
> > > performance).
> > > >> In a previous mail I proposed to split the support for OVER windows
> > into
> > > >> the following subtasks:
> > > >>
> > > >> # bounded PRECEDING
> > > >> - OVER ROWS for processing time
> > > >>  - does not require sorted state (data always arrives in processing
> > time
> > > >> order)
> > > >>  - no need to consider retraction (processing time is never late)
> > > >>  - defines windows on row count.
> > > >>  - A GlobalWindow with evictor + trigger might be the best
> > > implementation
> > > >> (basically the same as DataStream.countWindow(long, long). We need
> to
> > > add
> > > >> timeouts to clean up state for non-used keys though.
> > > >>
> > > >> - OVER RANGE for processing time
> > > >>  - does not require sorted state (data always arrives in processing
> > time
> > > >> order)
> > > >>  - no need to consider retraction (processing time is never late)
> > > >>  - defines windows on row count
> > > >>  - I think this could also be implemented with a GlobalWindow with
> > > evictor
> > > >> + trigger (need to verify)
> > > >>
> > > >> - OVER RANGE for event time
> > > >>  - need for sorted state (late data possible)
> > > >>  - IMO, a ProcessFunction gives us the most flexibility in adding
> > later
> > > >> features (retraction, update rate, etc.)
> > > >>  - @Shaoxuan, you sketched a good design. Would you like to continue
> > > with
> > > >> a design proposal?
> > > >>
> > > >> # UNBOUNDED PRECEDING
> > > >> Similar considerations apply for the UNBOUNDED PRECEDING cases of
> the
> > > above
> > > >> window types.
> > > >>
> > > >> If we all agree that the separation into six JIRAs
> (bounded/unbounded
> > *
> > > >> row-pt/range-pt/ range-et) makes sense, I would suggest to move the
> > > >> discussions about the design of the implementation to the individual
> > > JIRAs.
> > > >>
> > > >> What do think?
> > > >>
> > > >> Best, Fabian
> > > >>
> > > >> 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:
> > > >>
> > > >>> Hi Liuxinchun,
> > > >>> I am not sure where did you get the inception: anyone has suggested
> > "to
> > > >>> process Event time window in Sliding Row Window". If you were
> > referring
> > > >> my
> > > >>> post, there may be some misunderstanding there. I think you were
> > asking
> > > >> the
> > > >>> similar question as Hongyuhong. I have just replied to him. Please
> > > take a
> > > >>> look and let me know if that makes sense to you. "Retraction" is an
> > > >>> important building block to compute correct incremental results in
> > > >>> streaming. It is another big topic, we should discuss this in
> another
> > > >>> thread.
> > > >>>
> > > >>> Regards,
> > > >>> Shaoxuan
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]
> >
> > > >> wrote:
> > > >>>
> > > >>>> I don't think it is a good idea to process Event time window in
> > > Sliding
> > > >>>> Row Window. In Sliding Time window, when an element is late, we
> can
> > > >>> trigger
> > > >>>> the recalculation of the related windows. And the sliding period
> is
> > > >>>> coarse-gained, We only need to recalculate size/sliding number of
> > > >>> windows.
> > > >>>> But in Sliding Row Window, the calculation is triggered when every
> > > >>> element
> > > >>>> is coming. The sliding period is becoming fine-gained. When an
> > element
> > > >> is
> > > >>>> late, there are so many "windows" are influenced. Even if we store
> > all
> > > >>> the
> > > >>>> raw data, the computation is very large.
> > > >>>>
> > > >>>> I think if it is possible to set a standard to sliding Event Time
> > Row
> > > >>>> Window, When certain elements are late, we can only recalculate
> > > partial
> > > >>>> windows and permit some error. For example, we can only
> recalculate
> > > the
> > > >>>> windows end in range between (lateElement.timestamp - leftDelta,
> > > >>>> lateElement.timestamp] and those windows begin in range between
> > > >>>> [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > > >>>> ////////////////////////////////////////////////////////////
> > > >>>> //////////////////////////
> > > >>>> Hi everyone,
> > > >>>> Thanks for this great discussion, and glad to see more and more
> > people
> > > >>> are
> > > >>>> interested on stream SQL & tableAPI.
> > > >>>>
> > > >>>> IMO, the key problems for Over window design are the SQL semantics
> > and
> > > >>> the
> > > >>>> runtime design. I totally agree with Fabian that we should skip
> the
> > > >>> design
> > > >>>> of TumbleRows and SessionRows windows for now, as they are not
> well
> > > >>> defined
> > > >>>> in SQL semantics.
> > > >>>>
> > > >>>> Runtime design is the most crucial part we are interested in and
> > > >>>> volunteered to contribute into. We have thousands of machines
> > running
> > > >>> flink
> > > >>>> streaming jobs. The costs in terms of CPU, memory, and state are
> the
> > > >>> vital
> > > >>>> factors that we have to taken into account. We have been working
> on
> > > the
> > > >>>> design of OVER window in the past months, and planning to send
> out a
> > > >>>> detailed design doc to DEV quite soon. But since Fabian started a
> > good
> > > >>>> discussion on OVER window, I would like to share our
> ideas/thoughts
> > > >> about
> > > >>>> the runtime design for OVER window.
> > > >>>>
> > > >>>>   1. As SunJincheng pointed out earlier, sliding window does not
> > work
> > > >>> for
> > > >>>>   unbounded preceding, we need alternative approach for unbound
> over
> > > >>>> window.
> > > >>>>   2. Though sliding window may work for some cases of bounded
> > window,
> > > >>>>   it is not very efficient thereby should not be used for
> > production.
> > > >> To
> > > >>>> the
> > > >>>>   best of my understanding, the current runtime implementation of
> > > >>> sliding
> > > >>>>   window has not leveraged the concepts of state Panes yet. This
> > means
> > > >>>> that
> > > >>>>   if we use sliding window for OVER window,  there will be a
> backend
> > > >>> state
> > > >>>>   created per each group (partition by) and each row, and
> whenever a
> > > >> new
> > > >>>>   record arrives, it will be accumulated to all the existing
> windows
> > > >>> that
> > > >>>> has
> > > >>>>   not been closed. This would cause quite a lot of overhead in
> terms
> > > >> of
> > > >>>> both
> > > >>>>   CPU and memory&state.
> > > >>>>   3. Fabian has mentioned an approach of leveraging
> > “ProcessFunction”
> > > >>> and
> > > >>>>   a “sortedState”. I like this idea. The design details on this
> are
> > > >> not
> > > >>>> quite
> > > >>>>   clear yet. So I would like to add more thoughts on this.
> > Regardless
> > > >>>>   which dataStream API we are going to use (it is very likely that
> > we
> > > >>> need
> > > >>>>   a new API), we should come out with an optimal approach. The
> > purpose
> > > >>> of
> > > >>>>   grouping window and over window is to partition the data, such
> > that
> > > >> we
> > > >>>> can
> > > >>>>   generate the aggregate results. So when we talk about the design
> > of
> > > >>> OVER
> > > >>>>   window, we have to think about the aggregates. As we proposed in
> > our
> > > >>>> recent
> > > >>>>   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator
> > will
> > > >>> be
> > > >>>>   stored in the aggregate state. Besides accumulator, we have also
> > > >>>> introduced
> > > >>>>   a retract API for UDAGG. With aggregate accumulator and retract
> > > >> API, I
> > > >>>> am
> > > >>>>   proposing a runtime approach to implement the OVER window as
> > > >>> followings.
> > > >>>>   4.
> > > >>>>      - We first implement a sorted state interface
> > > >>>>      - Per each group, we just create one sorted state. When a new
> > > >>> record
> > > >>>>      arrives, it will insert into this sorted state, in the
> > meanwhile
> > > >> it
> > > >>>> will be
> > > >>>>      accumulated to the aggregate accumulator.
> > > >>>>      - For over window, we keep the aggregate accumulator for the
> > > >> entire
> > > >>>>      job lifelong time. This is different than the case where we
> > > >> delete
> > > >>>> the
> > > >>>>      accumulator for each group/window when a grouping-window is
> > > >>> finished.
> > > >>>>      - When an over window is up to trigger, we grab the
> > > >>>>      previous accumulator from the state and accumulate values
> onto
> > it
> > > >>>> with all
> > > >>>>      the records till the upperBoundary of the current window, and
> > > >>>> retract all
> > > >>>>      the out of scope records till its lowerBoundary. We emit the
> > > >>>>      aggregate result and save the accumulator for the next
> window.
> > > >>>>
> > > >>>>
> > > >>>> Hello Fabian,
> > > >>>> I would suggest we should first start working on runtime design of
> > > over
> > > >>>> window and aggregate. Once we have a good design there, one can
> > easily
> > > >>> add
> > > >>>> the support for SQL as well as tableAPI. What do you think?
> > > >>>>
> > > >>>> Regards,
> > > >>>> Shaoxuan
> > > >>>>
> > > >>>> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <
> [hidden email]>
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi Radu,
> > > >>>>>
> > > >>>>> thanks for your comments!
> > > >>>>>
> > > >>>>> Yes, my intention is to open new JIRA issues to structure the
> > > >>>>> development process. Everybody is very welcome to pick up issues
> > and
> > > >>>>> discuss the design proposals.
> > > >>>>> At the moment I see the following six issues to start with:
> > > >>>>>
> > > >>>>> - streaming SQL OVER ROW for processing time
> > > >>>>>  - bounded PRECEDING
> > > >>>>>  - unbounded PRECEDING
> > > >>>>>
> > > >>>>> - streaming SQL OVER RANGE for processing time
> > > >>>>>  - bounded PRECEDING
> > > >>>>>  - unbounded PRECEDING
> > > >>>>>
> > > >>>>> - streaming SQL OVER RANGE for event time
> > > >>>>>  - bounded PRECEDING
> > > >>>>>  - unbounded PRECEDING
> > > >>>>>
> > > >>>>> For each of these windows we need corresponding translation rules
> > and
> > > >>>>> execution code.
> > > >>>>>
> > > >>>>> Subsequent JIRAs would be
> > > >>>>> - extending the Table API for supported SQL windows
> > > >>>>> - add support for FOLLOWING
> > > >>>>> - etc.
> > > >>>>>
> > > >>>>> Regarding the requirement for a sorted state. I am not sure if
> the
> > > >>>>> OVER windows should be implemented using Flink's DataStream
> window
> > > >>>> framework.
> > > >>>>> We need a good design document to figure out what is the best
> > > >>>>> approach. A ProcessFunction with a sorted state might be a good
> > > >>> solution
> > > >>>> as well.
> > > >>>>>
> > > >>>>> Best, Fabian
> > > >>>>>
> > > >>>>>
> > > >>>>> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]
> >:
> > > >>>>>
> > > >>>>>> Hi all,
> > > >>>>>>
> > > >>>>>> Thanks for starting these discussion - it is very useful.
> > > >>>>>> It does make sense indeed to refactor all these and coordinate a
> > > >> bit
> > > >>>>>> the efforts not to have overlapping implementations and
> > > >> incompatible
> > > >>>>> solutions.
> > > >>>>>>
> > > >>>>>> If you close the 3 jira issues you mentioned - do you plan to
> > > >>>>>> redesign them and open new ones? Do you need help from our side
> -
> > > >> we
> > > >>>>>> can also pick the redesign of some of these new jira issues. For
> > > >>>>>> example we already
> > > >>>>> have
> > > >>>>>> an implementation for this and we can help with the design.
> > > >>>>>> Nevertheless, let's coordinate the effort.
> > > >>>>>>
> > > >>>>>> Regarding the support for the different types of window - I
> think
> > > >>>>>> the
> > > >>>>> best
> > > >>>>>> option is to split the implementation in small units. We can
> > easily
> > > >>>>>> do
> > > >>>>> this
> > > >>>>>> from the transformation rule class and with this each particular
> > > >>>>>> type of window (session/sliding/sliderows/processing time/...)
> > > >> will
> > > >>>>>> have a clear implementation and a corresponding architecture
> > within
> > > >>>> the jira issue?
> > > >>>>> What
> > > >>>>>> do you think about such a granularity?
> > > >>>>>>
> > > >>>>>> Regarding the issue of " Q4: The implementaion of SlideRows
> still
> > > >>>>>> need a custom operator that collects records in a priority queue
> > > >>>>>> ordered by the "rowtime", which is similar to the design we
> > > >>>>>> discussed in FLINK-4697, right? "
> > > >>>>>> Why would you need this operator? The window buffer can act to
> > some
> > > >>>>> extent
> > > >>>>>> as a priority queue as long as the trigger and evictor is set to
> > > >>>>>> work
> > > >>>>> based
> > > >>>>>> on the rowtime - or maybe I am missing something... Can you
> please
> > > >>>>> clarify
> > > >>>>>> this.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Dr. Radu Tudoran
> > > >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > >>>>>> European Research Center
> > > >>>>>> Riesstrasse 25, 80992 München
> > > >>>>>>
> > > >>>>>> E-mail: [hidden email]
> > > >>>>>> Mobile: +49 15209084330
> > > >>>>>> Telephone: +49 891588344173
> > > >>>>>>
> > > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > >>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > >>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> > > >> 56063,
> > > >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > >>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > > >> 56063,
> > > >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > >>>>>> This e-mail and its attachments contain confidential information
> > > >> from
> > > >>>>>> HUAWEI, which is intended only for the person or entity whose
> > > >> address
> > > >>>> is
> > > >>>>>> listed above. Any use of the information contained herein in any
> > > >> way
> > > >>>>>> (including, but not limited to, total or partial disclosure,
> > > >>>>> reproduction,
> > > >>>>>> or dissemination) by persons other than the intended
> recipient(s)
> > > >> is
> > > >>>>>> prohibited. If you receive this e-mail in error, please notify
> the
> > > >>>> sender
> > > >>>>>> by phone or email immediately and delete it!
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> -----Original Message-----
> > > >>>>>> From: Jark Wu [mailto:[hidden email]]
> > > >>>>>> Sent: Tuesday, January 24, 2017 6:53 AM
> > > >>>>>> To: [hidden email]
> > > >>>>>> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > > >>> Windows
> > > >>>>> for
> > > >>>>>> streaming tables
> > > >>>>>>
> > > >>>>>> Hi Fabian,
> > > >>>>>>
> > > >>>>>> Thanks for bringing up this discussion and the nice approach to
> > > >> avoid
> > > >>>>>> overlapping contributions.
> > > >>>>>>
> > > >>>>>> All of these make sense to me. But I have some questions.
> > > >>>>>>
> > > >>>>>> Q1: If I understand correctly, we will not support TumbleRows
> and
> > > >>>>>> SessionRows at the beginning. But maybe support them as a syntax
> > > >>> sugar
> > > >>>>> (in
> > > >>>>>> Table API) when the SlideRows is supported in the future. Right
> ?
> > > >>>>>>
> > > >>>>>> Q2: How to support SessionRows based on SlideRows ?  I don't get
> > > >> how
> > > >>> to
> > > >>>>>> partition on "gap-separated".
> > > >>>>>>
> > > >>>>>> Q3: Should we break down the approach into smaller tasks for
> > > >>> streaming
> > > >>>>>> tables and batch tables ?
> > > >>>>>>
> > > >>>>>> Q4: The implementaion of SlideRows still need a custom operator
> > > >> that
> > > >>>>>> collects records in a priority queue ordered by the "rowtime",
> > > >> which
> > > >>> is
> > > >>>>>> similar to the design we discussed in FLINK-4697, right?
> > > >>>>>>
> > > >>>>>> +1 not support for OVER ROW for event time at this point.
> > > >>>>>>
> > > >>>>>> Regards, Jark
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>> 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > > >>>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>> We are also interested in streaming sql and very willing to
> > > >>>> participate
> > > >>>>>> and contribute.
> > > >>>>>>>
> > > >>>>>>> We are now in progress and we will also contribute to calcite
> to
> > > >>> push
> > > >>>>>> forward the window and stream-join support.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> --------------
> > > >>>>>>> Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> > > >>>> 2017年1月24日
> > > >>>>>>> 5:55
> > > >>>>>>> Receiver: [hidden email]
> > > >>>>>>> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> > > >>> Windows
> > > >>>>>>> for streaming tables
> > > >>>>>>>
> > > >>>>>>> Hi Haohui,
> > > >>>>>>>
> > > >>>>>>> our plan was in fact to piggy-back on Calcite and use the
> TUMBLE
> > > >>>>>> function [1] once is it is available (CALCITE-1345 [2]).
> > > >>>>>>> Unfortunately, this issue does not seem to be very active, so I
> > > >>> don't
> > > >>>>>> know what the progress is.
> > > >>>>>>>
> > > >>>>>>> I would suggest to move the discussion about group windows to a
> > > >>>>> separate
> > > >>>>>> thread and keep this one focused on the organization of the SQL
> > > >> OVER
> > > >>>>>> windows.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Fabian
> > > >>>>>>>
> > > >>>>>>> [1] http://calcite.apache.org/docs/stream.html)
> > > >>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > >>>>>>>
> > > >>>>>>> 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > > >>>>>>>
> > > >>>>>>>> Hi Fabian,
> > > >>>>>>>>
> > > >>>>>>>> FLINK-4692 has added the support for tumbling window and we
> are
> > > >>>>>>>> excited to try it out and expose it as a SQL construct.
> > > >>>>>>>>
> > > >>>>>>>> Just curious -- what's your thought on the SQL syntax on
> > > >> tumbling
> > > >>>>>> window?
> > > >>>>>>>>
> > > >>>>>>>> Implementation wise it might make sense to think tumbling
> window
> > > >>> as
> > > >>>> a
> > > >>>>>>>> special case of the sliding window.
> > > >>>>>>>>
> > > >>>>>>>> The problem I see is that the OVER construct might be
> > > >> insufficient
> > > >>>> to
> > > >>>>>>>> support all the use cases of tumbling windows. For example, it
> > > >>> fails
> > > >>>>>>>> to express tumbling windows that have fractional time units
> (as
> > > >>>>>>>> pointed out in http://calcite.apache.org/docs/stream.html).
> > > >>>>>>>>
> > > >>>>>>>> It looks to me that the Calcite / Azure Stream Analytics have
> > > >>>>>>>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to
> address
> > > >>> this
> > > >>>>>> issue.
> > > >>>>>>>>
> > > >>>>>>>> Do you think it is a good idea to follow the same conventions?
> > > >>> Your
> > > >>>>>>>> ideas are appreciated.
> > > >>>>>>>>
> > > >>>>>>>> Regards,
> > > >>>>>>>> Haohui
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <
> [hidden email]>
> > > >>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> +1
> > > >>>>>>>>>
> > > >>>>>>>>> We are also quite interested in these features and would love
> > > >> to
> > > >>>>>>>>> participate and contribute.
> > > >>>>>>>>>
> > > >>>>>>>>> ~Haohui
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <
> > > >> [hidden email]
> > > >>>>
> > > >>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Hi everybody,
> > > >>>>>>>>>>
> > > >>>>>>>>>> it seems that currently several contributors are working on
> > > >> new
> > > >>>>>>>>>> features for the streaming Table API / SQL around row
> windows
> > > >>> (as
> > > >>>>>>>>>> defined in
> > > >>>>>>>>>> FLIP-11
> > > >>>>>>>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > > >>>> FLINK-4680,
> > > >>>>>>>>>> FLINK-5584).
> > > >>>>>>>>>> Since these efforts overlap quite a bit I spent some time
> > > >>> thinking
> > > >>>>>>>>>> about how we can approach these features and how to avoid
> > > >>>>>>>>>> overlapping contributions.
> > > >>>>>>>>>>
> > > >>>>>>>>>> The challenge here is the following. Some of the Table API
> row
> > > >>>>>>>>>> windows
> > > >>>>>>>> as
> > > >>>>>>>>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> > > >>> other
> > > >>>>>>>>>> cannot be easily expressed as such (TumbleRows for row-count
> > > >>>>>>>>>> intervals, SessionRows).
> > > >>>>>>>>>> However, since Calcite already supports SQL OVER windows, we
> > > >> can
> > > >>>>>>>>>> reuse
> > > >>>>>>>> the
> > > >>>>>>>>>> optimization logic for some of the Table API row windows. I
> > > >> also
> > > >>>>>>>>>> thought about the semantics of the TumbleRows and
> SessionRows
> > > >>>>>>>>>> windows as defined in
> > > >>>>>>>>>> FLIP-11 and came to the conclusion that these are not well
> > > >>> defined
> > > >>>>>>>>>> in
> > > >>>>>>>>>> FLIP-11 and should rather be defined as SlideRows windows
> > > >> with a
> > > >>>>>>>>>> special PARTITION BY clause.
> > > >>>>>>>>>>
> > > >>>>>>>>>> I propose to approach SQL OVER windows and Table API row
> > > >> windows
> > > >>>> as
> > > >>>>>>>>>> follows:
> > > >>>>>>>>>>
> > > >>>>>>>>>> We start with three simple cases for SQL OVER windows (not
> > > >> Table
> > > >>>>>>>>>> API
> > > >>>>>>>> yet):
> > > >>>>>>>>>>
> > > >>>>>>>>>> * OVER RANGE for event time
> > > >>>>>>>>>> * OVER RANGE for processing time
> > > >>>>>>>>>> * OVER ROW for processing time
> > > >>>>>>>>>>
> > > >>>>>>>>>> All cases fulfill the following restrictions:
> > > >>>>>>>>>> - All aggregations in SELECT must refer to the same window.
> > > >>>>>>>>>> - PARTITION BY may not contain the rowtime attribute.
> > > >>>>>>>>>> - ORDER BY must be on rowtime attribute (for event time) or
> > > >> on a
> > > >>>>>>>>>> marker function that indicates processing time. Additional
> > > >> sort
> > > >>>>>>>>>> attributes are not supported initially.
> > > >>>>>>>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> > > >>> "BETWEEN
> > > >>>> x
> > > >>>>>>>>>> PRECEDING AND CURRENT ROW" are supported.
> > > >>>>>>>>>>
> > > >>>>>>>>>> OVER ROW for event time cannot be easily supported. With
> event
> > > >>>>>>>>>> time, we may have late records which need to be injected
> into
> > > >>> the
> > > >>>>>>>>>> order of records.
> > > >>>>>>>>>> When
> > > >>>>>>>>>> a record in injected in to the order where a row-count
> window
> > > >>> has
> > > >>>>>>>> already
> > > >>>>>>>>>> been computed, this and all following windows will change.
> We
> > > >>>> could
> > > >>>>>>>> either
> > > >>>>>>>>>> drop the record or sent out many retraction records. I think
> > > >> it
> > > >>> is
> > > >>>>>>>>>> best
> > > >>>>>>>> to
> > > >>>>>>>>>> not open this can of worms at this point.
> > > >>>>>>>>>>
> > > >>>>>>>>>> The rational for all of the above restrictions is to have
> > > >> first
> > > >>>>>>>>>> versions of OVER windows soon.
> > > >>>>>>>>>> Once we have the above cases covered we can extend and
> remove
> > > >>>>>>>> limitations
> > > >>>>>>>>>> as follows:
> > > >>>>>>>>>>
> > > >>>>>>>>>> - Table API SlideRow windows (with the same restrictions as
> > > >>>> above).
> > > >>>>>>>>>> This will be mostly API work since the execution part has
> been
> > > >>>>> solved
> > > >>>>>> before.
> > > >>>>>>>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > >>>>>>>>>> - Add support for different windows in SELECT. All windows
> > > >> must
> > > >>> be
> > > >>>>>>>>>> partitioned and ordered in the same way.
> > > >>>>>>>>>> - Add support for additional ORDER BY attributes (besides
> > > >> time).
> > > >>>>>>>>>>
> > > >>>>>>>>>> As I said before, TumbleRows and SessionRows windows as in
> > > >>> FLIP-11
> > > >>>>>>>>>> are
> > > >>>>>>>> not
> > > >>>>>>>>>> well defined, IMO.
> > > >>>>>>>>>> They can be expressed as SlideRows windows with special
> > > >>>>>>>>>> partitioning (partitioning on fixed, non-overlapping time
> > > >> ranges
> > > >>>>>>>>>> for TumbleRows, and gap-separated, non-overlapping time
> ranges
> > > >>> for
> > > >>>>>>>>>> SessionRows) I would not start to work on those yet.
> > > >>>>>>>>>>
> > > >>>>>>>>>> I would like to close all related JIRA issues (FLINK-4678,
> > > >>>>>>>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > > >>>> development
> > > >>>>>>>>>> of these
> > > >>>>>>>> features
> > > >>>>>>>>>> as outlined above with corresponding JIRA issues.
> > > >>>>>>>>>>
> > > >>>>>>>>>> What do others think? (I cc'ed the contributors assigned to
> > > >> the
> > > >>>>>>>>>> above
> > > >>>>>>>> JIRA
> > > >>>>>>>>>> issues)
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best, Fabian
> > > >>>>>>>>>>
> > > >>>>>>>>>> [1]
> > > >>>>>>>>>>
> > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > >>>>>>>> 11%3A+Table+API+Stream+Aggregations
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>
12