[DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

classic Classic list List threaded Threaded
29 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
Hi everybody,

it seems that currently several contributors are working on new features
for the streaming Table API / SQL around row windows (as defined in FLIP-11
[1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
FLINK-5584).
Since these efforts overlap quite a bit I spent some time thinking about
how we can approach these features and how to avoid overlapping
contributions.

The challenge here is the following. Some of the Table API row windows as
defined by FLIP-11 [1] are basically SQL OVER windows while other cannot be
easily expressed as such (TumbleRows for row-count intervals, SessionRows).
However, since Calcite already supports SQL OVER windows, we can reuse the
optimization logic for some of the Table API row windows. I also thought
about the semantics of the TumbleRows and SessionRows windows as defined in
FLIP-11 and came to the conclusion that these are not well defined in
FLIP-11 and should rather be defined as SlideRows windows with a special
PARTITION BY clause.

I propose to approach SQL OVER windows and Table API row windows as follows:

We start with three simple cases for SQL OVER windows (not Table API yet):

* OVER RANGE for event time
* OVER RANGE for processing time
* OVER ROW for processing time

All cases fulfill the following restrictions:
- All aggregations in SELECT must refer to the same window.
- PARTITION BY may not contain the rowtime attribute.
- ORDER BY must be on rowtime attribute (for event time) or on a marker
function that indicates processing time. Additional sort attributes are not
supported initially.
- only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
PRECEDING AND CURRENT ROW" are supported.

OVER ROW for event time cannot be easily supported. With event time, we may
have late records which need to be injected into the order of records. When
a record in injected in to the order where a row-count window has already
been computed, this and all following windows will change. We could either
drop the record or sent out many retraction records. I think it is best to
not open this can of worms at this point.

The rational for all of the above restrictions is to have first versions of
OVER windows soon.
Once we have the above cases covered we can extend and remove limitations
as follows:

- Table API SlideRow windows (with the same restrictions as above). This
will be mostly API work since the execution part has been solved before.
- Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
- Add support for different windows in SELECT. All windows must be
partitioned and ordered in the same way.
- Add support for additional ORDER BY attributes (besides time).

As I said before, TumbleRows and SessionRows windows as in FLIP-11 are not
well defined, IMO.
They can be expressed as SlideRows windows with special partitioning
(partitioning on fixed, non-overlapping time ranges for TumbleRows, and
gap-separated, non-overlapping time ranges for SessionRows)
I would not start to work on those yet.

I would like to close all related JIRA issues (FLINK-4678, FLINK-4679,
FLINK-4680, FLINK-5584) and restructure the development of these features
as outlined above with corresponding JIRA issues.

What do others think? (I cc'ed the contributors assigned to the above JIRA
issues)

Best, Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Haohui Mai
+1

We are also quite interested in these features and would love to
participate and contribute.

~Haohui

On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]> wrote:

> Hi everybody,
>
> it seems that currently several contributors are working on new features
> for the streaming Table API / SQL around row windows (as defined in FLIP-11
> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> FLINK-5584).
> Since these efforts overlap quite a bit I spent some time thinking about
> how we can approach these features and how to avoid overlapping
> contributions.
>
> The challenge here is the following. Some of the Table API row windows as
> defined by FLIP-11 [1] are basically SQL OVER windows while other cannot be
> easily expressed as such (TumbleRows for row-count intervals, SessionRows).
> However, since Calcite already supports SQL OVER windows, we can reuse the
> optimization logic for some of the Table API row windows. I also thought
> about the semantics of the TumbleRows and SessionRows windows as defined in
> FLIP-11 and came to the conclusion that these are not well defined in
> FLIP-11 and should rather be defined as SlideRows windows with a special
> PARTITION BY clause.
>
> I propose to approach SQL OVER windows and Table API row windows as
> follows:
>
> We start with three simple cases for SQL OVER windows (not Table API yet):
>
> * OVER RANGE for event time
> * OVER RANGE for processing time
> * OVER ROW for processing time
>
> All cases fulfill the following restrictions:
> - All aggregations in SELECT must refer to the same window.
> - PARTITION BY may not contain the rowtime attribute.
> - ORDER BY must be on rowtime attribute (for event time) or on a marker
> function that indicates processing time. Additional sort attributes are not
> supported initially.
> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> PRECEDING AND CURRENT ROW" are supported.
>
> OVER ROW for event time cannot be easily supported. With event time, we may
> have late records which need to be injected into the order of records. When
> a record in injected in to the order where a row-count window has already
> been computed, this and all following windows will change. We could either
> drop the record or sent out many retraction records. I think it is best to
> not open this can of worms at this point.
>
> The rational for all of the above restrictions is to have first versions of
> OVER windows soon.
> Once we have the above cases covered we can extend and remove limitations
> as follows:
>
> - Table API SlideRow windows (with the same restrictions as above). This
> will be mostly API work since the execution part has been solved before.
> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> - Add support for different windows in SELECT. All windows must be
> partitioned and ordered in the same way.
> - Add support for additional ORDER BY attributes (besides time).
>
> As I said before, TumbleRows and SessionRows windows as in FLIP-11 are not
> well defined, IMO.
> They can be expressed as SlideRows windows with special partitioning
> (partitioning on fixed, non-overlapping time ranges for TumbleRows, and
> gap-separated, non-overlapping time ranges for SessionRows)
> I would not start to work on those yet.
>
> I would like to close all related JIRA issues (FLINK-4678, FLINK-4679,
> FLINK-4680, FLINK-5584) and restructure the development of these features
> as outlined above with corresponding JIRA issues.
>
> What do others think? (I cc'ed the contributors assigned to the above JIRA
> issues)
>
> Best, Fabian
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Haohui Mai
Hi Fabian,

FLINK-4692 has added the support for tumbling window and we are excited to
try it out and expose it as a SQL construct.

Just curious -- what's your thought on the SQL syntax on tumbling window?

Implementation wise it might make sense to think tumbling window as a
special case of the sliding window.

The problem I see is that the OVER construct might be insufficient to
support all the use cases of tumbling windows. For example, it fails to
express tumbling windows that have fractional time units (as pointed out in
http://calcite.apache.org/docs/stream.html).

It looks to me that the Calcite / Azure Stream Analytics have introduced a
new construct (TUMBLE / TUMBLINGWINDOW) to address this issue.

Do you think it is a good idea to follow the same conventions? Your ideas
are appreciated.

Regards,
Haohui


On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]> wrote:

> +1
>
> We are also quite interested in these features and would love to
> participate and contribute.
>
> ~Haohui
>
> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]> wrote:
>
>> Hi everybody,
>>
>> it seems that currently several contributors are working on new features
>> for the streaming Table API / SQL around row windows (as defined in
>> FLIP-11
>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
>> FLINK-5584).
>> Since these efforts overlap quite a bit I spent some time thinking about
>> how we can approach these features and how to avoid overlapping
>> contributions.
>>
>> The challenge here is the following. Some of the Table API row windows as
>> defined by FLIP-11 [1] are basically SQL OVER windows while other cannot
>> be
>> easily expressed as such (TumbleRows for row-count intervals,
>> SessionRows).
>> However, since Calcite already supports SQL OVER windows, we can reuse the
>> optimization logic for some of the Table API row windows. I also thought
>> about the semantics of the TumbleRows and SessionRows windows as defined
>> in
>> FLIP-11 and came to the conclusion that these are not well defined in
>> FLIP-11 and should rather be defined as SlideRows windows with a special
>> PARTITION BY clause.
>>
>> I propose to approach SQL OVER windows and Table API row windows as
>> follows:
>>
>> We start with three simple cases for SQL OVER windows (not Table API yet):
>>
>> * OVER RANGE for event time
>> * OVER RANGE for processing time
>> * OVER ROW for processing time
>>
>> All cases fulfill the following restrictions:
>> - All aggregations in SELECT must refer to the same window.
>> - PARTITION BY may not contain the rowtime attribute.
>> - ORDER BY must be on rowtime attribute (for event time) or on a marker
>> function that indicates processing time. Additional sort attributes are
>> not
>> supported initially.
>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
>> PRECEDING AND CURRENT ROW" are supported.
>>
>> OVER ROW for event time cannot be easily supported. With event time, we
>> may
>> have late records which need to be injected into the order of records.
>> When
>> a record in injected in to the order where a row-count window has already
>> been computed, this and all following windows will change. We could either
>> drop the record or sent out many retraction records. I think it is best to
>> not open this can of worms at this point.
>>
>> The rational for all of the above restrictions is to have first versions
>> of
>> OVER windows soon.
>> Once we have the above cases covered we can extend and remove limitations
>> as follows:
>>
>> - Table API SlideRow windows (with the same restrictions as above). This
>> will be mostly API work since the execution part has been solved before.
>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>> - Add support for different windows in SELECT. All windows must be
>> partitioned and ordered in the same way.
>> - Add support for additional ORDER BY attributes (besides time).
>>
>> As I said before, TumbleRows and SessionRows windows as in FLIP-11 are not
>> well defined, IMO.
>> They can be expressed as SlideRows windows with special partitioning
>> (partitioning on fixed, non-overlapping time ranges for TumbleRows, and
>> gap-separated, non-overlapping time ranges for SessionRows)
>> I would not start to work on those yet.
>>
>> I would like to close all related JIRA issues (FLINK-4678, FLINK-4679,
>> FLINK-4680, FLINK-5584) and restructure the development of these features
>> as outlined above with corresponding JIRA issues.
>>
>> What do others think? (I cc'ed the contributors assigned to the above JIRA
>> issues)
>>
>> Best, Fabian
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
Hi Haohui,

our plan was in fact to piggy-back on Calcite and use the TUMBLE function
[1] once is it is available (CALCITE-1345 [2]).
Unfortunately, this issue does not seem to be very active, so I don't know
what the progress is.

I would suggest to move the discussion about group windows to a separate
thread and keep this one focused on the organization of the SQL OVER
windows.

Best,
Fabian

[1] http://calcite.apache.org/docs/stream.html)
[2] https://issues.apache.org/jira/browse/CALCITE-1345

2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:

> Hi Fabian,
>
> FLINK-4692 has added the support for tumbling window and we are excited to
> try it out and expose it as a SQL construct.
>
> Just curious -- what's your thought on the SQL syntax on tumbling window?
>
> Implementation wise it might make sense to think tumbling window as a
> special case of the sliding window.
>
> The problem I see is that the OVER construct might be insufficient to
> support all the use cases of tumbling windows. For example, it fails to
> express tumbling windows that have fractional time units (as pointed out in
> http://calcite.apache.org/docs/stream.html).
>
> It looks to me that the Calcite / Azure Stream Analytics have introduced a
> new construct (TUMBLE / TUMBLINGWINDOW) to address this issue.
>
> Do you think it is a good idea to follow the same conventions? Your ideas
> are appreciated.
>
> Regards,
> Haohui
>
>
> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]> wrote:
>
> > +1
> >
> > We are also quite interested in these features and would love to
> > participate and contribute.
> >
> > ~Haohui
> >
> > On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]> wrote:
> >
> >> Hi everybody,
> >>
> >> it seems that currently several contributors are working on new features
> >> for the streaming Table API / SQL around row windows (as defined in
> >> FLIP-11
> >> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> >> FLINK-5584).
> >> Since these efforts overlap quite a bit I spent some time thinking about
> >> how we can approach these features and how to avoid overlapping
> >> contributions.
> >>
> >> The challenge here is the following. Some of the Table API row windows
> as
> >> defined by FLIP-11 [1] are basically SQL OVER windows while other cannot
> >> be
> >> easily expressed as such (TumbleRows for row-count intervals,
> >> SessionRows).
> >> However, since Calcite already supports SQL OVER windows, we can reuse
> the
> >> optimization logic for some of the Table API row windows. I also thought
> >> about the semantics of the TumbleRows and SessionRows windows as defined
> >> in
> >> FLIP-11 and came to the conclusion that these are not well defined in
> >> FLIP-11 and should rather be defined as SlideRows windows with a special
> >> PARTITION BY clause.
> >>
> >> I propose to approach SQL OVER windows and Table API row windows as
> >> follows:
> >>
> >> We start with three simple cases for SQL OVER windows (not Table API
> yet):
> >>
> >> * OVER RANGE for event time
> >> * OVER RANGE for processing time
> >> * OVER ROW for processing time
> >>
> >> All cases fulfill the following restrictions:
> >> - All aggregations in SELECT must refer to the same window.
> >> - PARTITION BY may not contain the rowtime attribute.
> >> - ORDER BY must be on rowtime attribute (for event time) or on a marker
> >> function that indicates processing time. Additional sort attributes are
> >> not
> >> supported initially.
> >> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> >> PRECEDING AND CURRENT ROW" are supported.
> >>
> >> OVER ROW for event time cannot be easily supported. With event time, we
> >> may
> >> have late records which need to be injected into the order of records.
> >> When
> >> a record in injected in to the order where a row-count window has
> already
> >> been computed, this and all following windows will change. We could
> either
> >> drop the record or sent out many retraction records. I think it is best
> to
> >> not open this can of worms at this point.
> >>
> >> The rational for all of the above restrictions is to have first versions
> >> of
> >> OVER windows soon.
> >> Once we have the above cases covered we can extend and remove
> limitations
> >> as follows:
> >>
> >> - Table API SlideRow windows (with the same restrictions as above). This
> >> will be mostly API work since the execution part has been solved before.
> >> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> >> - Add support for different windows in SELECT. All windows must be
> >> partitioned and ordered in the same way.
> >> - Add support for additional ORDER BY attributes (besides time).
> >>
> >> As I said before, TumbleRows and SessionRows windows as in FLIP-11 are
> not
> >> well defined, IMO.
> >> They can be expressed as SlideRows windows with special partitioning
> >> (partitioning on fixed, non-overlapping time ranges for TumbleRows, and
> >> gap-separated, non-overlapping time ranges for SessionRows)
> >> I would not start to work on those yet.
> >>
> >> I would like to close all related JIRA issues (FLINK-4678, FLINK-4679,
> >> FLINK-4680, FLINK-5584) and restructure the development of these
> features
> >> as outlined above with corresponding JIRA issues.
> >>
> >> What do others think? (I cc'ed the contributors assigned to the above
> JIRA
> >> issues)
> >>
> >> Best, Fabian
> >>
> >> [1]
> >>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 11%3A+Table+API+Stream+Aggregations
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Hongyuhong
Hi,
We are also interested in streaming sql and very willing to participate and contribute.

We are now in progress and we will also contribute to calcite to push forward the window and stream-join support.



--------------
Sender: Fabian Hueske [mailto:[hidden email]]
Send Time: 2017年1月24日 5:55
Receiver: [hidden email]
Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Hi Haohui,

our plan was in fact to piggy-back on Calcite and use the TUMBLE function [1] once is it is available (CALCITE-1345 [2]).
Unfortunately, this issue does not seem to be very active, so I don't know what the progress is.

I would suggest to move the discussion about group windows to a separate thread and keep this one focused on the organization of the SQL OVER windows.

Best,
Fabian

[1] http://calcite.apache.org/docs/stream.html)
[2] https://issues.apache.org/jira/browse/CALCITE-1345

2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:

> Hi Fabian,
>
> FLINK-4692 has added the support for tumbling window and we are
> excited to try it out and expose it as a SQL construct.
>
> Just curious -- what's your thought on the SQL syntax on tumbling window?
>
> Implementation wise it might make sense to think tumbling window as a
> special case of the sliding window.
>
> The problem I see is that the OVER construct might be insufficient to
> support all the use cases of tumbling windows. For example, it fails
> to express tumbling windows that have fractional time units (as
> pointed out in http://calcite.apache.org/docs/stream.html).
>
> It looks to me that the Calcite / Azure Stream Analytics have
> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this issue.
>
> Do you think it is a good idea to follow the same conventions? Your
> ideas are appreciated.
>
> Regards,
> Haohui
>
>
> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]> wrote:
>
> > +1
> >
> > We are also quite interested in these features and would love to
> > participate and contribute.
> >
> > ~Haohui
> >
> > On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]> wrote:
> >
> >> Hi everybody,
> >>
> >> it seems that currently several contributors are working on new
> >> features for the streaming Table API / SQL around row windows (as
> >> defined in
> >> FLIP-11
> >> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> >> FLINK-5584).
> >> Since these efforts overlap quite a bit I spent some time thinking
> >> about how we can approach these features and how to avoid
> >> overlapping contributions.
> >>
> >> The challenge here is the following. Some of the Table API row
> >> windows
> as
> >> defined by FLIP-11 [1] are basically SQL OVER windows while other
> >> cannot be easily expressed as such (TumbleRows for row-count
> >> intervals, SessionRows).
> >> However, since Calcite already supports SQL OVER windows, we can
> >> reuse
> the
> >> optimization logic for some of the Table API row windows. I also
> >> thought about the semantics of the TumbleRows and SessionRows
> >> windows as defined in
> >> FLIP-11 and came to the conclusion that these are not well defined
> >> in
> >> FLIP-11 and should rather be defined as SlideRows windows with a
> >> special PARTITION BY clause.
> >>
> >> I propose to approach SQL OVER windows and Table API row windows as
> >> follows:
> >>
> >> We start with three simple cases for SQL OVER windows (not Table
> >> API
> yet):
> >>
> >> * OVER RANGE for event time
> >> * OVER RANGE for processing time
> >> * OVER ROW for processing time
> >>
> >> All cases fulfill the following restrictions:
> >> - All aggregations in SELECT must refer to the same window.
> >> - PARTITION BY may not contain the rowtime attribute.
> >> - ORDER BY must be on rowtime attribute (for event time) or on a
> >> marker function that indicates processing time. Additional sort
> >> attributes are not supported initially.
> >> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> >> PRECEDING AND CURRENT ROW" are supported.
> >>
> >> OVER ROW for event time cannot be easily supported. With event
> >> time, we may have late records which need to be injected into the
> >> order of records.
> >> When
> >> a record in injected in to the order where a row-count window has
> already
> >> been computed, this and all following windows will change. We could
> either
> >> drop the record or sent out many retraction records. I think it is
> >> best
> to
> >> not open this can of worms at this point.
> >>
> >> The rational for all of the above restrictions is to have first
> >> versions of OVER windows soon.
> >> Once we have the above cases covered we can extend and remove
> limitations
> >> as follows:
> >>
> >> - Table API SlideRow windows (with the same restrictions as above).
> >> This will be mostly API work since the execution part has been solved before.
> >> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> >> - Add support for different windows in SELECT. All windows must be
> >> partitioned and ordered in the same way.
> >> - Add support for additional ORDER BY attributes (besides time).
> >>
> >> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> >> are
> not
> >> well defined, IMO.
> >> They can be expressed as SlideRows windows with special
> >> partitioning (partitioning on fixed, non-overlapping time ranges
> >> for TumbleRows, and gap-separated, non-overlapping time ranges for
> >> SessionRows) I would not start to work on those yet.
> >>
> >> I would like to close all related JIRA issues (FLINK-4678,
> >> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
> >> of these
> features
> >> as outlined above with corresponding JIRA issues.
> >>
> >> What do others think? (I cc'ed the contributors assigned to the
> >> above
> JIRA
> >> issues)
> >>
> >> Best, Fabian
> >>
> >> [1]
> >>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 11%3A+Table+API+Stream+Aggregations
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

伍翀(云邪)
Hi Fabian,

Thanks for bringing up this discussion and the nice approach to avoid overlapping contributions.

All of these make sense to me. But I have some questions.

Q1: If I understand correctly, we will not support TumbleRows and SessionRows at the beginning. But maybe support them as a syntax sugar (in Table API) when the SlideRows is supported in the future. Right ?

Q2: How to support SessionRows based on SlideRows ?  I don't get how to partition on "gap-separated".

Q3: Should we break down the approach into smaller tasks for streaming tables and batch tables ?

Q4: The implementaion of SlideRows still need a custom operator that collects records in a priority queue ordered by the "rowtime", which is similar to the design we discussed in FLINK-4697, right?

+1 not support for OVER ROW for event time at this point.

Regards, Jark


> 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
>
> Hi,
> We are also interested in streaming sql and very willing to participate and contribute.
>
> We are now in progress and we will also contribute to calcite to push forward the window and stream-join support.
>
>
>
> --------------
> Sender: Fabian Hueske [mailto:[hidden email]]
> Send Time: 2017年1月24日 5:55
> Receiver: [hidden email]
> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables
>
> Hi Haohui,
>
> our plan was in fact to piggy-back on Calcite and use the TUMBLE function [1] once is it is available (CALCITE-1345 [2]).
> Unfortunately, this issue does not seem to be very active, so I don't know what the progress is.
>
> I would suggest to move the discussion about group windows to a separate thread and keep this one focused on the organization of the SQL OVER windows.
>
> Best,
> Fabian
>
> [1] http://calcite.apache.org/docs/stream.html)
> [2] https://issues.apache.org/jira/browse/CALCITE-1345
>
> 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
>
>> Hi Fabian,
>>
>> FLINK-4692 has added the support for tumbling window and we are
>> excited to try it out and expose it as a SQL construct.
>>
>> Just curious -- what's your thought on the SQL syntax on tumbling window?
>>
>> Implementation wise it might make sense to think tumbling window as a
>> special case of the sliding window.
>>
>> The problem I see is that the OVER construct might be insufficient to
>> support all the use cases of tumbling windows. For example, it fails
>> to express tumbling windows that have fractional time units (as
>> pointed out in http://calcite.apache.org/docs/stream.html).
>>
>> It looks to me that the Calcite / Azure Stream Analytics have
>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this issue.
>>
>> Do you think it is a good idea to follow the same conventions? Your
>> ideas are appreciated.
>>
>> Regards,
>> Haohui
>>
>>
>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]> wrote:
>>
>>> +1
>>>
>>> We are also quite interested in these features and would love to
>>> participate and contribute.
>>>
>>> ~Haohui
>>>
>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]> wrote:
>>>
>>>> Hi everybody,
>>>>
>>>> it seems that currently several contributors are working on new
>>>> features for the streaming Table API / SQL around row windows (as
>>>> defined in
>>>> FLIP-11
>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
>>>> FLINK-5584).
>>>> Since these efforts overlap quite a bit I spent some time thinking
>>>> about how we can approach these features and how to avoid
>>>> overlapping contributions.
>>>>
>>>> The challenge here is the following. Some of the Table API row
>>>> windows
>> as
>>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
>>>> cannot be easily expressed as such (TumbleRows for row-count
>>>> intervals, SessionRows).
>>>> However, since Calcite already supports SQL OVER windows, we can
>>>> reuse
>> the
>>>> optimization logic for some of the Table API row windows. I also
>>>> thought about the semantics of the TumbleRows and SessionRows
>>>> windows as defined in
>>>> FLIP-11 and came to the conclusion that these are not well defined
>>>> in
>>>> FLIP-11 and should rather be defined as SlideRows windows with a
>>>> special PARTITION BY clause.
>>>>
>>>> I propose to approach SQL OVER windows and Table API row windows as
>>>> follows:
>>>>
>>>> We start with three simple cases for SQL OVER windows (not Table
>>>> API
>> yet):
>>>>
>>>> * OVER RANGE for event time
>>>> * OVER RANGE for processing time
>>>> * OVER ROW for processing time
>>>>
>>>> All cases fulfill the following restrictions:
>>>> - All aggregations in SELECT must refer to the same window.
>>>> - PARTITION BY may not contain the rowtime attribute.
>>>> - ORDER BY must be on rowtime attribute (for event time) or on a
>>>> marker function that indicates processing time. Additional sort
>>>> attributes are not supported initially.
>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
>>>> PRECEDING AND CURRENT ROW" are supported.
>>>>
>>>> OVER ROW for event time cannot be easily supported. With event
>>>> time, we may have late records which need to be injected into the
>>>> order of records.
>>>> When
>>>> a record in injected in to the order where a row-count window has
>> already
>>>> been computed, this and all following windows will change. We could
>> either
>>>> drop the record or sent out many retraction records. I think it is
>>>> best
>> to
>>>> not open this can of worms at this point.
>>>>
>>>> The rational for all of the above restrictions is to have first
>>>> versions of OVER windows soon.
>>>> Once we have the above cases covered we can extend and remove
>> limitations
>>>> as follows:
>>>>
>>>> - Table API SlideRow windows (with the same restrictions as above).
>>>> This will be mostly API work since the execution part has been solved before.
>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>>>> - Add support for different windows in SELECT. All windows must be
>>>> partitioned and ordered in the same way.
>>>> - Add support for additional ORDER BY attributes (besides time).
>>>>
>>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
>>>> are
>> not
>>>> well defined, IMO.
>>>> They can be expressed as SlideRows windows with special
>>>> partitioning (partitioning on fixed, non-overlapping time ranges
>>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
>>>> SessionRows) I would not start to work on those yet.
>>>>
>>>> I would like to close all related JIRA issues (FLINK-4678,
>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
>>>> of these
>> features
>>>> as outlined above with corresponding JIRA issues.
>>>>
>>>> What do others think? (I cc'ed the contributors assigned to the
>>>> above
>> JIRA
>>>> issues)
>>>>
>>>> Best, Fabian
>>>>
>>>> [1]
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> 11%3A+Table+API+Stream+Aggregations
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

jincheng sun
In reply to this post by Fabian Hueske-2
Hello Fabian,
Your plan looks good, I totally agree with your points.
While I am working on FLINK-4680, I had the similar concerns about the
semantics of TumbleRows and SessionRows. It is much clear if we define
these windows as SlideRows with PARTITION BY clause.
Regarding to the implementation plan of Table API row windows, I would also
like to share my ideas/thoughts on OVER window obtained while I am
developing FLINK-4680:

- Table API SlideRow windows (with the same restrictions as above). This
will be mostly API work since the execution part has been solved before.
Though the sliding window can work for the bounded preceding, but it is not
sufficient to support unbounded preceding. For instance, we may potentially
use SlidingProcessingTimeWindows and ProcessingTimeTrigger to implement
“OVER RANGE for processing time”, but we still need to provide a certain
fixed window size, which is not proper for unbounded processing. Same
problems exist for ”OVER RANGE for event time“  and “OVER ROW for
processing time”. Therefore, we need a new window assigner and trigger for
unbounded preceding, say SlideRowGlobalWindows and
SlideRowGlobalWindowXXXTrigger. What do you think?

- Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
If I understand you correctly, you want to implement the SlideRow windows
first without the support of FOLLOWING(I guess you want to leverage the
existing SlidingProcessing(Event)TimeWindows and
Processing(Event)TimeTrigger?). IMO, when we implement SlideRow windows, we
could just provide a new WindowAssigner and trigger, which can support both
bounded preceding and following semantics (current row is just a special
case of FOLLOWING where the following row is equal to 0). What do you think?

- Add support for additional ORDER BY attributes (besides time).
This is an important and a necessary part for OVER. But to achieve this, we
probably need a sorted state backend, maybe sortedMapstate? Is it also
included in your plan.

Best,
SunJincheng

2017-01-23 23:30 GMT+08:00 Fabian Hueske <[hidden email]>:

> Hi everybody,
>
> it seems that currently several contributors are working on new features
> for the streaming Table API / SQL around row windows (as defined in FLIP-11
> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> FLINK-5584).
> Since these efforts overlap quite a bit I spent some time thinking about
> how we can approach these features and how to avoid overlapping
> contributions.
>
> The challenge here is the following. Some of the Table API row windows as
> defined by FLIP-11 [1] are basically SQL OVER windows while other cannot be
> easily expressed as such (TumbleRows for row-count intervals, SessionRows).
> However, since Calcite already supports SQL OVER windows, we can reuse the
> optimization logic for some of the Table API row windows. I also thought
> about the semantics of the TumbleRows and SessionRows windows as defined in
> FLIP-11 and came to the conclusion that these are not well defined in
> FLIP-11 and should rather be defined as SlideRows windows with a special
> PARTITION BY clause.
>
> I propose to approach SQL OVER windows and Table API row windows as
> follows:
>
> We start with three simple cases for SQL OVER windows (not Table API yet):
>
> * OVER RANGE for event time
> * OVER RANGE for processing time
> * OVER ROW for processing time
>
> All cases fulfill the following restrictions:
> - All aggregations in SELECT must refer to the same window.
> - PARTITION BY may not contain the rowtime attribute.
> - ORDER BY must be on rowtime attribute (for event time) or on a marker
> function that indicates processing time. Additional sort attributes are not
> supported initially.
> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> PRECEDING AND CURRENT ROW" are supported.
>
> OVER ROW for event time cannot be easily supported. With event time, we
> may have late records which need to be injected into the order of records.
> When a record in injected in to the order where a row-count window has
> already been computed, this and all following windows will change. We could
> either drop the record or sent out many retraction records. I think it is
> best to not open this can of worms at this point.
>
> The rational for all of the above restrictions is to have first versions
> of OVER windows soon.
> Once we have the above cases covered we can extend and remove limitations
> as follows:
>
> - Table API SlideRow windows (with the same restrictions as above). This
> will be mostly API work since the execution part has been solved before.
> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> - Add support for different windows in SELECT. All windows must be
> partitioned and ordered in the same way.
> - Add support for additional ORDER BY attributes (besides time).
>
> As I said before, TumbleRows and SessionRows windows as in FLIP-11 are not
> well defined, IMO.
> They can be expressed as SlideRows windows with special partitioning
> (partitioning on fixed, non-overlapping time ranges for TumbleRows, and
> gap-separated, non-overlapping time ranges for SessionRows)
> I would not start to work on those yet.
>
> I would like to close all related JIRA issues (FLINK-4678, FLINK-4679,
> FLINK-4680, FLINK-5584) and restructure the development of these features
> as outlined above with corresponding JIRA issues.
>
> What do others think? (I cc'ed the contributors assigned to the above JIRA
> issues)
>
> Best, Fabian
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 11%3A+Table+API+Stream+Aggregations
>
>
>
Reply | Threaded
Open this post in threaded view
|

RE: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Radu Tudoran
In reply to this post by 伍翀(云邪)
Hi all,

Thanks for starting these discussion - it is very useful.
It does make sense indeed to refactor all these and coordinate a bit the efforts not to have overlapping implementations and incompatible solutions.

If you close the 3 jira issues you mentioned - do you plan to redesign them and open new ones? Do you need help from our side - we can also pick the redesign of some of these new jira issues. For example we already have an implementation for this and we can help with the design. Nevertheless, let's coordinate the effort.

Regarding the support for the different types of window - I think the best option is to split the implementation in small units. We can easily do this from the transformation rule class and with this each particular type of window (session/sliding/sliderows/processing time/...) will have a clear implementation and a corresponding architecture within the jira issue? What do you think about such a granularity?

Regarding the issue of " Q4: The implementaion of SlideRows still need a custom operator that collects records in a priority queue ordered by the "rowtime", which is similar to the design we discussed in FLINK-4697, right? "
Why would you need this operator? The window buffer can act to some extent as a priority queue as long as the trigger and evictor is set to work based on the rowtime - or maybe I am missing something... Can you please clarify this.


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!


-----Original Message-----
From: Jark Wu [mailto:[hidden email]]
Sent: Tuesday, January 24, 2017 6:53 AM
To: [hidden email]
Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Hi Fabian,

Thanks for bringing up this discussion and the nice approach to avoid overlapping contributions.

All of these make sense to me. But I have some questions.

Q1: If I understand correctly, we will not support TumbleRows and SessionRows at the beginning. But maybe support them as a syntax sugar (in Table API) when the SlideRows is supported in the future. Right ?

Q2: How to support SessionRows based on SlideRows ?  I don't get how to partition on "gap-separated".

Q3: Should we break down the approach into smaller tasks for streaming tables and batch tables ?

Q4: The implementaion of SlideRows still need a custom operator that collects records in a priority queue ordered by the "rowtime", which is similar to the design we discussed in FLINK-4697, right?

+1 not support for OVER ROW for event time at this point.

Regards, Jark


> 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
>
> Hi,
> We are also interested in streaming sql and very willing to participate and contribute.
>
> We are now in progress and we will also contribute to calcite to push forward the window and stream-join support.
>
>
>
> --------------
> Sender: Fabian Hueske [mailto:[hidden email]] Send Time: 2017年1月24日
> 5:55
> Receiver: [hidden email]
> Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> for streaming tables
>
> Hi Haohui,
>
> our plan was in fact to piggy-back on Calcite and use the TUMBLE function [1] once is it is available (CALCITE-1345 [2]).
> Unfortunately, this issue does not seem to be very active, so I don't know what the progress is.
>
> I would suggest to move the discussion about group windows to a separate thread and keep this one focused on the organization of the SQL OVER windows.
>
> Best,
> Fabian
>
> [1] http://calcite.apache.org/docs/stream.html)
> [2] https://issues.apache.org/jira/browse/CALCITE-1345
>
> 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
>
>> Hi Fabian,
>>
>> FLINK-4692 has added the support for tumbling window and we are
>> excited to try it out and expose it as a SQL construct.
>>
>> Just curious -- what's your thought on the SQL syntax on tumbling window?
>>
>> Implementation wise it might make sense to think tumbling window as a
>> special case of the sliding window.
>>
>> The problem I see is that the OVER construct might be insufficient to
>> support all the use cases of tumbling windows. For example, it fails
>> to express tumbling windows that have fractional time units (as
>> pointed out in http://calcite.apache.org/docs/stream.html).
>>
>> It looks to me that the Calcite / Azure Stream Analytics have
>> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this issue.
>>
>> Do you think it is a good idea to follow the same conventions? Your
>> ideas are appreciated.
>>
>> Regards,
>> Haohui
>>
>>
>> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]> wrote:
>>
>>> +1
>>>
>>> We are also quite interested in these features and would love to
>>> participate and contribute.
>>>
>>> ~Haohui
>>>
>>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]> wrote:
>>>
>>>> Hi everybody,
>>>>
>>>> it seems that currently several contributors are working on new
>>>> features for the streaming Table API / SQL around row windows (as
>>>> defined in
>>>> FLIP-11
>>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
>>>> FLINK-5584).
>>>> Since these efforts overlap quite a bit I spent some time thinking
>>>> about how we can approach these features and how to avoid
>>>> overlapping contributions.
>>>>
>>>> The challenge here is the following. Some of the Table API row
>>>> windows
>> as
>>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
>>>> cannot be easily expressed as such (TumbleRows for row-count
>>>> intervals, SessionRows).
>>>> However, since Calcite already supports SQL OVER windows, we can
>>>> reuse
>> the
>>>> optimization logic for some of the Table API row windows. I also
>>>> thought about the semantics of the TumbleRows and SessionRows
>>>> windows as defined in
>>>> FLIP-11 and came to the conclusion that these are not well defined
>>>> in
>>>> FLIP-11 and should rather be defined as SlideRows windows with a
>>>> special PARTITION BY clause.
>>>>
>>>> I propose to approach SQL OVER windows and Table API row windows as
>>>> follows:
>>>>
>>>> We start with three simple cases for SQL OVER windows (not Table
>>>> API
>> yet):
>>>>
>>>> * OVER RANGE for event time
>>>> * OVER RANGE for processing time
>>>> * OVER ROW for processing time
>>>>
>>>> All cases fulfill the following restrictions:
>>>> - All aggregations in SELECT must refer to the same window.
>>>> - PARTITION BY may not contain the rowtime attribute.
>>>> - ORDER BY must be on rowtime attribute (for event time) or on a
>>>> marker function that indicates processing time. Additional sort
>>>> attributes are not supported initially.
>>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
>>>> PRECEDING AND CURRENT ROW" are supported.
>>>>
>>>> OVER ROW for event time cannot be easily supported. With event
>>>> time, we may have late records which need to be injected into the
>>>> order of records.
>>>> When
>>>> a record in injected in to the order where a row-count window has
>> already
>>>> been computed, this and all following windows will change. We could
>> either
>>>> drop the record or sent out many retraction records. I think it is
>>>> best
>> to
>>>> not open this can of worms at this point.
>>>>
>>>> The rational for all of the above restrictions is to have first
>>>> versions of OVER windows soon.
>>>> Once we have the above cases covered we can extend and remove
>> limitations
>>>> as follows:
>>>>
>>>> - Table API SlideRow windows (with the same restrictions as above).
>>>> This will be mostly API work since the execution part has been solved before.
>>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>>>> - Add support for different windows in SELECT. All windows must be
>>>> partitioned and ordered in the same way.
>>>> - Add support for additional ORDER BY attributes (besides time).
>>>>
>>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
>>>> are
>> not
>>>> well defined, IMO.
>>>> They can be expressed as SlideRows windows with special
>>>> partitioning (partitioning on fixed, non-overlapping time ranges
>>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
>>>> SessionRows) I would not start to work on those yet.
>>>>
>>>> I would like to close all related JIRA issues (FLINK-4678,
>>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
>>>> of these
>> features
>>>> as outlined above with corresponding JIRA issues.
>>>>
>>>> What do others think? (I cc'ed the contributors assigned to the
>>>> above
>> JIRA
>>>> issues)
>>>>
>>>> Best, Fabian
>>>>
>>>> [1]
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> 11%3A+Table+API+Stream+Aggregations
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
In reply to this post by 伍翀(云邪)
Hi Jark,

let me answer your questions:

Q1: Yes, that's what I am currently thinking about.

Q2: You can interpret a session id as a partitioning attribute. If you have
OVER (PARTITION BY a, SessionWithGap(rowtime)), "a" would be a regular
partitioning attribute and "SessionWithGap(rowtime)" would logically be the
ID of the session a record belongs to. Within a partition you can still use
ORDER BY, PRECEDING and FOLLOWING to define the records over which the
aggregate of each row should be computed.

Q3: My proposal focused on OVER windows for streaming tables. For batch
tables, I would suggest to start as well from the SQL side and add the
Table API as a second step. Of course, the batch side does not need to have
as many restriction as streaming (although we can also start with many
restrictions and extend features later).

Q4: Yes, I think so. ProcessFunction might be the way to go (unless
somebody has a better idea). This might be a bit of effort to implement but
gives us a lot of flexibility when adding features such as retraction and
configurable update rates. We have to think about the performance
implication though. Better runtime abstractions for sorted state might be
helpful.

Best, Fabian

2017-01-24 6:53 GMT+01:00 Jark Wu <[hidden email]>:

> Hi Fabian,
>
> Thanks for bringing up this discussion and the nice approach to avoid
> overlapping contributions.
>
> All of these make sense to me. But I have some questions.
>
> Q1: If I understand correctly, we will not support TumbleRows and
> SessionRows at the beginning. But maybe support them as a syntax sugar (in
> Table API) when the SlideRows is supported in the future. Right ?
>
> Q2: How to support SessionRows based on SlideRows ?  I don't get how to
> partition on "gap-separated".
>
> Q3: Should we break down the approach into smaller tasks for streaming
> tables and batch tables ?
>
> Q4: The implementaion of SlideRows still need a custom operator that
> collects records in a priority queue ordered by the "rowtime", which is
> similar to the design we discussed in FLINK-4697, right?
>
> +1 not support for OVER ROW for event time at this point.
>
> Regards, Jark
>
>
> > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> >
> > Hi,
> > We are also interested in streaming sql and very willing to participate
> and contribute.
> >
> > We are now in progress and we will also contribute to calcite to push
> forward the window and stream-join support.
> >
> >
> >
> > --------------
> > Sender: Fabian Hueske [mailto:[hidden email]]
> > Send Time: 2017年1月24日 5:55
> > Receiver: [hidden email]
> > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for
> streaming tables
> >
> > Hi Haohui,
> >
> > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> function [1] once is it is available (CALCITE-1345 [2]).
> > Unfortunately, this issue does not seem to be very active, so I don't
> know what the progress is.
> >
> > I would suggest to move the discussion about group windows to a separate
> thread and keep this one focused on the organization of the SQL OVER
> windows.
> >
> > Best,
> > Fabian
> >
> > [1] http://calcite.apache.org/docs/stream.html)
> > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> >
> > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> >
> >> Hi Fabian,
> >>
> >> FLINK-4692 has added the support for tumbling window and we are
> >> excited to try it out and expose it as a SQL construct.
> >>
> >> Just curious -- what's your thought on the SQL syntax on tumbling
> window?
> >>
> >> Implementation wise it might make sense to think tumbling window as a
> >> special case of the sliding window.
> >>
> >> The problem I see is that the OVER construct might be insufficient to
> >> support all the use cases of tumbling windows. For example, it fails
> >> to express tumbling windows that have fractional time units (as
> >> pointed out in http://calcite.apache.org/docs/stream.html).
> >>
> >> It looks to me that the Calcite / Azure Stream Analytics have
> >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this
> issue.
> >>
> >> Do you think it is a good idea to follow the same conventions? Your
> >> ideas are appreciated.
> >>
> >> Regards,
> >> Haohui
> >>
> >>
> >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]> wrote:
> >>
> >>> +1
> >>>
> >>> We are also quite interested in these features and would love to
> >>> participate and contribute.
> >>>
> >>> ~Haohui
> >>>
> >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]>
> wrote:
> >>>
> >>>> Hi everybody,
> >>>>
> >>>> it seems that currently several contributors are working on new
> >>>> features for the streaming Table API / SQL around row windows (as
> >>>> defined in
> >>>> FLIP-11
> >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> >>>> FLINK-5584).
> >>>> Since these efforts overlap quite a bit I spent some time thinking
> >>>> about how we can approach these features and how to avoid
> >>>> overlapping contributions.
> >>>>
> >>>> The challenge here is the following. Some of the Table API row
> >>>> windows
> >> as
> >>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
> >>>> cannot be easily expressed as such (TumbleRows for row-count
> >>>> intervals, SessionRows).
> >>>> However, since Calcite already supports SQL OVER windows, we can
> >>>> reuse
> >> the
> >>>> optimization logic for some of the Table API row windows. I also
> >>>> thought about the semantics of the TumbleRows and SessionRows
> >>>> windows as defined in
> >>>> FLIP-11 and came to the conclusion that these are not well defined
> >>>> in
> >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> >>>> special PARTITION BY clause.
> >>>>
> >>>> I propose to approach SQL OVER windows and Table API row windows as
> >>>> follows:
> >>>>
> >>>> We start with three simple cases for SQL OVER windows (not Table
> >>>> API
> >> yet):
> >>>>
> >>>> * OVER RANGE for event time
> >>>> * OVER RANGE for processing time
> >>>> * OVER ROW for processing time
> >>>>
> >>>> All cases fulfill the following restrictions:
> >>>> - All aggregations in SELECT must refer to the same window.
> >>>> - PARTITION BY may not contain the rowtime attribute.
> >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> >>>> marker function that indicates processing time. Additional sort
> >>>> attributes are not supported initially.
> >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> >>>> PRECEDING AND CURRENT ROW" are supported.
> >>>>
> >>>> OVER ROW for event time cannot be easily supported. With event
> >>>> time, we may have late records which need to be injected into the
> >>>> order of records.
> >>>> When
> >>>> a record in injected in to the order where a row-count window has
> >> already
> >>>> been computed, this and all following windows will change. We could
> >> either
> >>>> drop the record or sent out many retraction records. I think it is
> >>>> best
> >> to
> >>>> not open this can of worms at this point.
> >>>>
> >>>> The rational for all of the above restrictions is to have first
> >>>> versions of OVER windows soon.
> >>>> Once we have the above cases covered we can extend and remove
> >> limitations
> >>>> as follows:
> >>>>
> >>>> - Table API SlideRow windows (with the same restrictions as above).
> >>>> This will be mostly API work since the execution part has been solved
> before.
> >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> >>>> - Add support for different windows in SELECT. All windows must be
> >>>> partitioned and ordered in the same way.
> >>>> - Add support for additional ORDER BY attributes (besides time).
> >>>>
> >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> >>>> are
> >> not
> >>>> well defined, IMO.
> >>>> They can be expressed as SlideRows windows with special
> >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> >>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
> >>>> SessionRows) I would not start to work on those yet.
> >>>>
> >>>> I would like to close all related JIRA issues (FLINK-4678,
> >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
> >>>> of these
> >> features
> >>>> as outlined above with corresponding JIRA issues.
> >>>>
> >>>> What do others think? (I cc'ed the contributors assigned to the
> >>>> above
> >> JIRA
> >>>> issues)
> >>>>
> >>>> Best, Fabian
> >>>>
> >>>> [1]
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >> 11%3A+Table+API+Stream+Aggregations
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
In reply to this post by jincheng sun
Hi SunJincheng,

thanks a lot for your comments!

regarding the suitability of DataStream sliding windows: You are right that
UNBOUNDED PRECEDING windows cannot be implemented as DataStream sliding
windows. The same is true for OVER RANGE windows.
I think the only OVER windows that could be done with DataStream sliding
windows are bounded OVER ROW windows (processing time). For the other
window types, I was thinking about implementing them using a
ProcessFunction. We might need additional support for sorted state to
improve the efficiency of the implementation.

The motivation to restrict the first version to not support FOLLOWING was
to keep the implementation more simple. I am not convinced that the best
solution to implement OVER windows is to use the DataStream window
framework (window assigner, trigger, evictor). Using a more flexible
framework (at the cost of additional implementation overhead) might pay off
when we want to add more features.

You are right, we probably need better support for sorted state. I think we
will need this as well, when implementing the OVER RANGE windows which
cannot be easily implemented in the DataStream window framework.
A thorough design document is required here.

Best,
Fabian


2017-01-24 7:51 GMT+01:00 jincheng sun <[hidden email]>:

> Hello Fabian,
> Your plan looks good, I totally agree with your points.
> While I am working on FLINK-4680, I had the similar concerns about the
> semantics of TumbleRows and SessionRows. It is much clear if we define
> these windows as SlideRows with PARTITION BY clause.
> Regarding to the implementation plan of Table API row windows, I would
> also like to share my ideas/thoughts on OVER window obtained while I am
> developing FLINK-4680:
>
> - Table API SlideRow windows (with the same restrictions as above). This
> will be mostly API work since the execution part has been solved before.
> Though the sliding window can work for the bounded preceding, but it is
> not sufficient to support unbounded preceding. For instance, we may
> potentially use SlidingProcessingTimeWindows and ProcessingTimeTrigger to
> implement “OVER RANGE for processing time”, but we still need to provide a
> certain fixed window size, which is not proper for unbounded processing.
> Same problems exist for ”OVER RANGE for event time“  and “OVER ROW for
> processing time”. Therefore, we need a new window assigner and trigger for
> unbounded preceding, say SlideRowGlobalWindows and
> SlideRowGlobalWindowXXXTrigger. What do you think?
>
> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> If I understand you correctly, you want to implement the SlideRow windows
> first without the support of FOLLOWING(I guess you want to leverage the
> existing SlidingProcessing(Event)TimeWindows and
> Processing(Event)TimeTrigger?). IMO, when we implement SlideRow windows,
> we could just provide a new WindowAssigner and trigger, which can support
> both bounded preceding and following semantics (current row is just a
> special case of FOLLOWING where the following row is equal to 0). What do
> you think?
>
> - Add support for additional ORDER BY attributes (besides time).
> This is an important and a necessary part for OVER. But to achieve this,
> we probably need a sorted state backend, maybe sortedMapstate? Is it also
> included in your plan.
>
> Best,
> SunJincheng
>
> 2017-01-23 23:30 GMT+08:00 Fabian Hueske <[hidden email]>:
>
>> Hi everybody,
>>
>> it seems that currently several contributors are working on new features
>> for the streaming Table API / SQL around row windows (as defined in FLIP-11
>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
>> FLINK-5584).
>> Since these efforts overlap quite a bit I spent some time thinking about
>> how we can approach these features and how to avoid overlapping
>> contributions.
>>
>> The challenge here is the following. Some of the Table API row windows as
>> defined by FLIP-11 [1] are basically SQL OVER windows while other cannot be
>> easily expressed as such (TumbleRows for row-count intervals, SessionRows).
>> However, since Calcite already supports SQL OVER windows, we can reuse
>> the optimization logic for some of the Table API row windows. I also
>> thought about the semantics of the TumbleRows and SessionRows windows as
>> defined in FLIP-11 and came to the conclusion that these are not well
>> defined in FLIP-11 and should rather be defined as SlideRows windows with a
>> special PARTITION BY clause.
>>
>> I propose to approach SQL OVER windows and Table API row windows as
>> follows:
>>
>> We start with three simple cases for SQL OVER windows (not Table API yet):
>>
>> * OVER RANGE for event time
>> * OVER RANGE for processing time
>> * OVER ROW for processing time
>>
>> All cases fulfill the following restrictions:
>> - All aggregations in SELECT must refer to the same window.
>> - PARTITION BY may not contain the rowtime attribute.
>> - ORDER BY must be on rowtime attribute (for event time) or on a marker
>> function that indicates processing time. Additional sort attributes are not
>> supported initially.
>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
>> PRECEDING AND CURRENT ROW" are supported.
>>
>> OVER ROW for event time cannot be easily supported. With event time, we
>> may have late records which need to be injected into the order of records.
>> When a record in injected in to the order where a row-count window has
>> already been computed, this and all following windows will change. We could
>> either drop the record or sent out many retraction records. I think it is
>> best to not open this can of worms at this point.
>>
>> The rational for all of the above restrictions is to have first versions
>> of OVER windows soon.
>> Once we have the above cases covered we can extend and remove limitations
>> as follows:
>>
>> - Table API SlideRow windows (with the same restrictions as above). This
>> will be mostly API work since the execution part has been solved before.
>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>> - Add support for different windows in SELECT. All windows must be
>> partitioned and ordered in the same way.
>> - Add support for additional ORDER BY attributes (besides time).
>>
>> As I said before, TumbleRows and SessionRows windows as in FLIP-11 are
>> not well defined, IMO.
>> They can be expressed as SlideRows windows with special partitioning
>> (partitioning on fixed, non-overlapping time ranges for TumbleRows, and
>> gap-separated, non-overlapping time ranges for SessionRows)
>> I would not start to work on those yet.
>>
>> I would like to close all related JIRA issues (FLINK-4678, FLINK-4679,
>> FLINK-4680, FLINK-5584) and restructure the development of these features
>> as outlined above with corresponding JIRA issues.
>>
>> What do others think? (I cc'ed the contributors assigned to the above
>> JIRA issues)
>>
>> Best, Fabian
>>
>> [1] <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%
>> 3A+Table+API+Stream+Aggregations
>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
In reply to this post by Radu Tudoran
Hi Radu,

thanks for your comments!

Yes, my intention is to open new JIRA issues to structure the development
process. Everybody is very welcome to pick up issues and discuss the design
proposals.
At the moment I see the following six issues to start with:

- streaming SQL OVER ROW for processing time
  - bounded PRECEDING
  - unbounded PRECEDING

- streaming SQL OVER RANGE for processing time
  - bounded PRECEDING
  - unbounded PRECEDING

- streaming SQL OVER RANGE for event time
  - bounded PRECEDING
  - unbounded PRECEDING

For each of these windows we need corresponding translation rules and
execution code.

Subsequent JIRAs would be
- extending the Table API for supported SQL windows
- add support for FOLLOWING
- etc.

Regarding the requirement for a sorted state. I am not sure if the OVER
windows should be implemented using Flink's DataStream window framework.
We need a good design document to figure out what is the best approach. A
ProcessFunction with a sorted state might be a good solution as well.

Best, Fabian


2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:

> Hi all,
>
> Thanks for starting these discussion - it is very useful.
> It does make sense indeed to refactor all these and coordinate a bit the
> efforts not to have overlapping implementations and incompatible solutions.
>
> If you close the 3 jira issues you mentioned - do you plan to redesign
> them and open new ones? Do you need help from our side - we can also pick
> the redesign of some of these new jira issues. For example we already have
> an implementation for this and we can help with the design. Nevertheless,
> let's coordinate the effort.
>
> Regarding the support for the different types of window - I think the best
> option is to split the implementation in small units. We can easily do this
> from the transformation rule class and with this each particular type of
> window (session/sliding/sliderows/processing time/...) will have a clear
> implementation and a corresponding architecture within the jira issue? What
> do you think about such a granularity?
>
> Regarding the issue of " Q4: The implementaion of SlideRows still need a
> custom operator that collects records in a priority queue ordered by the
> "rowtime", which is similar to the design we discussed in FLINK-4697,
> right? "
> Why would you need this operator? The window buffer can act to some extent
> as a priority queue as long as the trigger and evictor is set to work based
> on the rowtime - or maybe I am missing something... Can you please clarify
> this.
>
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: Jark Wu [mailto:[hidden email]]
> Sent: Tuesday, January 24, 2017 6:53 AM
> To: [hidden email]
> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for
> streaming tables
>
> Hi Fabian,
>
> Thanks for bringing up this discussion and the nice approach to avoid
> overlapping contributions.
>
> All of these make sense to me. But I have some questions.
>
> Q1: If I understand correctly, we will not support TumbleRows and
> SessionRows at the beginning. But maybe support them as a syntax sugar (in
> Table API) when the SlideRows is supported in the future. Right ?
>
> Q2: How to support SessionRows based on SlideRows ?  I don't get how to
> partition on "gap-separated".
>
> Q3: Should we break down the approach into smaller tasks for streaming
> tables and batch tables ?
>
> Q4: The implementaion of SlideRows still need a custom operator that
> collects records in a priority queue ordered by the "rowtime", which is
> similar to the design we discussed in FLINK-4697, right?
>
> +1 not support for OVER ROW for event time at this point.
>
> Regards, Jark
>
>
> > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> >
> > Hi,
> > We are also interested in streaming sql and very willing to participate
> and contribute.
> >
> > We are now in progress and we will also contribute to calcite to push
> forward the window and stream-join support.
> >
> >
> >
> > --------------
> > Sender: Fabian Hueske [mailto:[hidden email]] Send Time: 2017年1月24日
> > 5:55
> > Receiver: [hidden email]
> > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > for streaming tables
> >
> > Hi Haohui,
> >
> > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> function [1] once is it is available (CALCITE-1345 [2]).
> > Unfortunately, this issue does not seem to be very active, so I don't
> know what the progress is.
> >
> > I would suggest to move the discussion about group windows to a separate
> thread and keep this one focused on the organization of the SQL OVER
> windows.
> >
> > Best,
> > Fabian
> >
> > [1] http://calcite.apache.org/docs/stream.html)
> > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> >
> > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> >
> >> Hi Fabian,
> >>
> >> FLINK-4692 has added the support for tumbling window and we are
> >> excited to try it out and expose it as a SQL construct.
> >>
> >> Just curious -- what's your thought on the SQL syntax on tumbling
> window?
> >>
> >> Implementation wise it might make sense to think tumbling window as a
> >> special case of the sliding window.
> >>
> >> The problem I see is that the OVER construct might be insufficient to
> >> support all the use cases of tumbling windows. For example, it fails
> >> to express tumbling windows that have fractional time units (as
> >> pointed out in http://calcite.apache.org/docs/stream.html).
> >>
> >> It looks to me that the Calcite / Azure Stream Analytics have
> >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this
> issue.
> >>
> >> Do you think it is a good idea to follow the same conventions? Your
> >> ideas are appreciated.
> >>
> >> Regards,
> >> Haohui
> >>
> >>
> >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]> wrote:
> >>
> >>> +1
> >>>
> >>> We are also quite interested in these features and would love to
> >>> participate and contribute.
> >>>
> >>> ~Haohui
> >>>
> >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]>
> wrote:
> >>>
> >>>> Hi everybody,
> >>>>
> >>>> it seems that currently several contributors are working on new
> >>>> features for the streaming Table API / SQL around row windows (as
> >>>> defined in
> >>>> FLIP-11
> >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> >>>> FLINK-5584).
> >>>> Since these efforts overlap quite a bit I spent some time thinking
> >>>> about how we can approach these features and how to avoid
> >>>> overlapping contributions.
> >>>>
> >>>> The challenge here is the following. Some of the Table API row
> >>>> windows
> >> as
> >>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
> >>>> cannot be easily expressed as such (TumbleRows for row-count
> >>>> intervals, SessionRows).
> >>>> However, since Calcite already supports SQL OVER windows, we can
> >>>> reuse
> >> the
> >>>> optimization logic for some of the Table API row windows. I also
> >>>> thought about the semantics of the TumbleRows and SessionRows
> >>>> windows as defined in
> >>>> FLIP-11 and came to the conclusion that these are not well defined
> >>>> in
> >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> >>>> special PARTITION BY clause.
> >>>>
> >>>> I propose to approach SQL OVER windows and Table API row windows as
> >>>> follows:
> >>>>
> >>>> We start with three simple cases for SQL OVER windows (not Table
> >>>> API
> >> yet):
> >>>>
> >>>> * OVER RANGE for event time
> >>>> * OVER RANGE for processing time
> >>>> * OVER ROW for processing time
> >>>>
> >>>> All cases fulfill the following restrictions:
> >>>> - All aggregations in SELECT must refer to the same window.
> >>>> - PARTITION BY may not contain the rowtime attribute.
> >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> >>>> marker function that indicates processing time. Additional sort
> >>>> attributes are not supported initially.
> >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> >>>> PRECEDING AND CURRENT ROW" are supported.
> >>>>
> >>>> OVER ROW for event time cannot be easily supported. With event
> >>>> time, we may have late records which need to be injected into the
> >>>> order of records.
> >>>> When
> >>>> a record in injected in to the order where a row-count window has
> >> already
> >>>> been computed, this and all following windows will change. We could
> >> either
> >>>> drop the record or sent out many retraction records. I think it is
> >>>> best
> >> to
> >>>> not open this can of worms at this point.
> >>>>
> >>>> The rational for all of the above restrictions is to have first
> >>>> versions of OVER windows soon.
> >>>> Once we have the above cases covered we can extend and remove
> >> limitations
> >>>> as follows:
> >>>>
> >>>> - Table API SlideRow windows (with the same restrictions as above).
> >>>> This will be mostly API work since the execution part has been solved
> before.
> >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> >>>> - Add support for different windows in SELECT. All windows must be
> >>>> partitioned and ordered in the same way.
> >>>> - Add support for additional ORDER BY attributes (besides time).
> >>>>
> >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> >>>> are
> >> not
> >>>> well defined, IMO.
> >>>> They can be expressed as SlideRows windows with special
> >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> >>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
> >>>> SessionRows) I would not start to work on those yet.
> >>>>
> >>>> I would like to close all related JIRA issues (FLINK-4678,
> >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
> >>>> of these
> >> features
> >>>> as outlined above with corresponding JIRA issues.
> >>>>
> >>>> What do others think? (I cc'ed the contributors assigned to the
> >>>> above
> >> JIRA
> >>>> issues)
> >>>>
> >>>> Best, Fabian
> >>>>
> >>>> [1]
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >> 11%3A+Table+API+Stream+Aggregations
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

RE: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Radu Tudoran
Hi Fabian,

Thanks for the feedback and for clarifications with respect to the new JIRA issues.
How should we proceed from here? Will you start creating this or can we also take the design of some of these issues?

I am particular interested in - streaming SQL OVER RANGE for processing time
if you want to share the workload :)


Regarding the topic of implementing OVER windows based on Flink's DataStream window framework or using things like ProcessFunction I have a slight different opinion: I believe that windows are more suitable to support this implementation. This is because:
-stream windows are semantically richer and more evolved then the Stream SQL windows, so they can cover in principle all cases
-if it is to extend the ProcessFunction to provide this implementation we would have 2 cases:
        1) in which we have aggregates over the streams <<-eg. SELECT STREAM COUNT (*) FROM streaminput>> for which indeed we can easily rely on process function
        2) in which the aggregate has bounds << COUNT(*) OVER streaninput (RANGE INTERVAL '10' MINUTE PRECEDING) >> where the aggregate needs to be updated based on contents of the "window". This means that if we would try to implement this behavior with ProcessFunction we end up adding the window buffer to it - which becomes an actual window operator.
->I believe we could potentially just define a special type of window for this case and still rely on the rich semantics that exists in window. We can define this SlidingRowWindow or RollingWindow..whatever the name would be - which is a specialized window for this. We can perhaps enhance the window with a sorting function. Let me know what do you think about this?


Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: [hidden email]
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!


-----Original Message-----
From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, January 24, 2017 3:43 PM
To: [hidden email]
Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Hi Radu,

thanks for your comments!

Yes, my intention is to open new JIRA issues to structure the development process. Everybody is very welcome to pick up issues and discuss the design proposals.
At the moment I see the following six issues to start with:

- streaming SQL OVER ROW for processing time
  - bounded PRECEDING
  - unbounded PRECEDING

- streaming SQL OVER RANGE for processing time
  - bounded PRECEDING
  - unbounded PRECEDING

- streaming SQL OVER RANGE for event time
  - bounded PRECEDING
  - unbounded PRECEDING

For each of these windows we need corresponding translation rules and execution code.

Subsequent JIRAs would be
- extending the Table API for supported SQL windows
- add support for FOLLOWING
- etc.

Regarding the requirement for a sorted state. I am not sure if the OVER windows should be implemented using Flink's DataStream window framework.
We need a good design document to figure out what is the best approach. A ProcessFunction with a sorted state might be a good solution as well.

Best, Fabian


2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:

> Hi all,
>
> Thanks for starting these discussion - it is very useful.
> It does make sense indeed to refactor all these and coordinate a bit
> the efforts not to have overlapping implementations and incompatible solutions.
>
> If you close the 3 jira issues you mentioned - do you plan to redesign
> them and open new ones? Do you need help from our side - we can also
> pick the redesign of some of these new jira issues. For example we
> already have an implementation for this and we can help with the
> design. Nevertheless, let's coordinate the effort.
>
> Regarding the support for the different types of window - I think the
> best option is to split the implementation in small units. We can
> easily do this from the transformation rule class and with this each
> particular type of window (session/sliding/sliderows/processing
> time/...) will have a clear implementation and a corresponding
> architecture within the jira issue? What do you think about such a granularity?
>
> Regarding the issue of " Q4: The implementaion of SlideRows still need
> a custom operator that collects records in a priority queue ordered by
> the "rowtime", which is similar to the design we discussed in
> FLINK-4697, right? "
> Why would you need this operator? The window buffer can act to some
> extent as a priority queue as long as the trigger and evictor is set
> to work based on the rowtime - or maybe I am missing something... Can
> you please clarify this.
>
>
> Dr. Radu Tudoran
> Senior Research Engineer - Big Data Expert IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
> -----Original Message-----
> From: Jark Wu [mailto:[hidden email]]
> Sent: Tuesday, January 24, 2017 6:53 AM
> To: [hidden email]
> Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for
> streaming tables
>
> Hi Fabian,
>
> Thanks for bringing up this discussion and the nice approach to avoid
> overlapping contributions.
>
> All of these make sense to me. But I have some questions.
>
> Q1: If I understand correctly, we will not support TumbleRows and
> SessionRows at the beginning. But maybe support them as a syntax sugar (in
> Table API) when the SlideRows is supported in the future. Right ?
>
> Q2: How to support SessionRows based on SlideRows ?  I don't get how to
> partition on "gap-separated".
>
> Q3: Should we break down the approach into smaller tasks for streaming
> tables and batch tables ?
>
> Q4: The implementaion of SlideRows still need a custom operator that
> collects records in a priority queue ordered by the "rowtime", which is
> similar to the design we discussed in FLINK-4697, right?
>
> +1 not support for OVER ROW for event time at this point.
>
> Regards, Jark
>
>
> > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> >
> > Hi,
> > We are also interested in streaming sql and very willing to participate
> and contribute.
> >
> > We are now in progress and we will also contribute to calcite to push
> forward the window and stream-join support.
> >
> >
> >
> > --------------
> > Sender: Fabian Hueske [mailto:[hidden email]] Send Time: 2017年1月24日
> > 5:55
> > Receiver: [hidden email]
> > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > for streaming tables
> >
> > Hi Haohui,
> >
> > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> function [1] once is it is available (CALCITE-1345 [2]).
> > Unfortunately, this issue does not seem to be very active, so I don't
> know what the progress is.
> >
> > I would suggest to move the discussion about group windows to a separate
> thread and keep this one focused on the organization of the SQL OVER
> windows.
> >
> > Best,
> > Fabian
> >
> > [1] http://calcite.apache.org/docs/stream.html)
> > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> >
> > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> >
> >> Hi Fabian,
> >>
> >> FLINK-4692 has added the support for tumbling window and we are
> >> excited to try it out and expose it as a SQL construct.
> >>
> >> Just curious -- what's your thought on the SQL syntax on tumbling
> window?
> >>
> >> Implementation wise it might make sense to think tumbling window as a
> >> special case of the sliding window.
> >>
> >> The problem I see is that the OVER construct might be insufficient to
> >> support all the use cases of tumbling windows. For example, it fails
> >> to express tumbling windows that have fractional time units (as
> >> pointed out in http://calcite.apache.org/docs/stream.html).
> >>
> >> It looks to me that the Calcite / Azure Stream Analytics have
> >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this
> issue.
> >>
> >> Do you think it is a good idea to follow the same conventions? Your
> >> ideas are appreciated.
> >>
> >> Regards,
> >> Haohui
> >>
> >>
> >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]> wrote:
> >>
> >>> +1
> >>>
> >>> We are also quite interested in these features and would love to
> >>> participate and contribute.
> >>>
> >>> ~Haohui
> >>>
> >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]>
> wrote:
> >>>
> >>>> Hi everybody,
> >>>>
> >>>> it seems that currently several contributors are working on new
> >>>> features for the streaming Table API / SQL around row windows (as
> >>>> defined in
> >>>> FLIP-11
> >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> >>>> FLINK-5584).
> >>>> Since these efforts overlap quite a bit I spent some time thinking
> >>>> about how we can approach these features and how to avoid
> >>>> overlapping contributions.
> >>>>
> >>>> The challenge here is the following. Some of the Table API row
> >>>> windows
> >> as
> >>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
> >>>> cannot be easily expressed as such (TumbleRows for row-count
> >>>> intervals, SessionRows).
> >>>> However, since Calcite already supports SQL OVER windows, we can
> >>>> reuse
> >> the
> >>>> optimization logic for some of the Table API row windows. I also
> >>>> thought about the semantics of the TumbleRows and SessionRows
> >>>> windows as defined in
> >>>> FLIP-11 and came to the conclusion that these are not well defined
> >>>> in
> >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> >>>> special PARTITION BY clause.
> >>>>
> >>>> I propose to approach SQL OVER windows and Table API row windows as
> >>>> follows:
> >>>>
> >>>> We start with three simple cases for SQL OVER windows (not Table
> >>>> API
> >> yet):
> >>>>
> >>>> * OVER RANGE for event time
> >>>> * OVER RANGE for processing time
> >>>> * OVER ROW for processing time
> >>>>
> >>>> All cases fulfill the following restrictions:
> >>>> - All aggregations in SELECT must refer to the same window.
> >>>> - PARTITION BY may not contain the rowtime attribute.
> >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> >>>> marker function that indicates processing time. Additional sort
> >>>> attributes are not supported initially.
> >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> >>>> PRECEDING AND CURRENT ROW" are supported.
> >>>>
> >>>> OVER ROW for event time cannot be easily supported. With event
> >>>> time, we may have late records which need to be injected into the
> >>>> order of records.
> >>>> When
> >>>> a record in injected in to the order where a row-count window has
> >> already
> >>>> been computed, this and all following windows will change. We could
> >> either
> >>>> drop the record or sent out many retraction records. I think it is
> >>>> best
> >> to
> >>>> not open this can of worms at this point.
> >>>>
> >>>> The rational for all of the above restrictions is to have first
> >>>> versions of OVER windows soon.
> >>>> Once we have the above cases covered we can extend and remove
> >> limitations
> >>>> as follows:
> >>>>
> >>>> - Table API SlideRow windows (with the same restrictions as above).
> >>>> This will be mostly API work since the execution part has been solved
> before.
> >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> >>>> - Add support for different windows in SELECT. All windows must be
> >>>> partitioned and ordered in the same way.
> >>>> - Add support for additional ORDER BY attributes (besides time).
> >>>>
> >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> >>>> are
> >> not
> >>>> well defined, IMO.
> >>>> They can be expressed as SlideRows windows with special
> >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> >>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
> >>>> SessionRows) I would not start to work on those yet.
> >>>>
> >>>> I would like to close all related JIRA issues (FLINK-4678,
> >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
> >>>> of these
> >> features
> >>>> as outlined above with corresponding JIRA issues.
> >>>>
> >>>> What do others think? (I cc'ed the contributors assigned to the
> >>>> above
> >> JIRA
> >>>> issues)
> >>>>
> >>>> Best, Fabian
> >>>>
> >>>> [1]
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >> 11%3A+Table+API+Stream+Aggregations
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Shaoxuan Wang
In reply to this post by Fabian Hueske-2
 Hi everyone,
Thanks for this great discussion, and glad to see more and more people are
interested on stream SQL & tableAPI.

IMO, the key problems for Over window design are the SQL semantics and the
runtime design. I totally agree with Fabian that we should skip the design
of TumbleRows and SessionRows windows for now, as they are not well defined
in SQL semantics.

Runtime design is the most crucial part we are interested in and
volunteered to contribute into. We have thousands of machines running flink
streaming jobs. The costs in terms of CPU, memory, and state are the vital
factors that we have to taken into account. We have been working on the
design of OVER window in the past months, and planning to send out a
detailed design doc to DEV quite soon. But since Fabian started a
good discussion on OVER window, I would like to share our ideas/thoughts
about the runtime design for OVER window.

   1. As SunJincheng pointed out earlier, sliding window does not work for
   unbounded preceding, we need alternative approach for unbound over window.
   2. Though sliding window may work for some cases of bounded window,
   it is not very efficient thereby should not be used for production. To the
   best of my understanding, the current runtime implementation of sliding
   window has not leveraged the concepts of state Panes yet. This means that
   if we use sliding window for OVER window,  there will be a backend state
   created per each group (partition by) and each row, and whenever a new
   record arrives, it will be accumulated to all the existing windows that has
   not been closed. This would cause quite a lot of overhead in terms of both
   CPU and memory&state.
   3. Fabian has mentioned an approach of leveraging “ProcessFunction” and
   a “sortedState”. I like this idea. The design details on this are not quite
   clear yet. So I would like to add more thoughts on this. Regardless
   which dataStream API we are going to use (it is very likely that we need
   a new API), we should come out with an optimal approach. The purpose of
   grouping window and over window is to partition the data, such that we can
   generate the aggregate results. So when we talk about the design of OVER
   window, we have to think about the aggregates. As we proposed in our recent
   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will be
   stored in the aggregate state. Besides accumulator, we have also introduced
   a retract API for UDAGG. With aggregate accumulator and retract API, I am
   proposing a runtime approach to implement the OVER window as followings.
   4.
      - We first implement a sorted state interface
      - Per each group, we just create one sorted state. When a new record
      arrives, it will insert into this sorted state, in the meanwhile
it will be
      accumulated to the aggregate accumulator.
      - For over window, we keep the aggregate accumulator for the entire
      job lifelong time. This is different than the case where we delete the
      accumulator for each group/window when a grouping-window is finished.
      - When an over window is up to trigger, we grab the
      previous accumulator from the state and accumulate values onto
it with all
      the records till the upperBoundary of the current window, and retract all
      the out of scope records till its lowerBoundary. We emit the
      aggregate result and save the accumulator for the next window.


Hello Fabian,
I would suggest we should first start working on runtime design of over
window and aggregate. Once we have a good design there, one can easily add
the support for SQL as well as tableAPI. What do you think?

Regards,
Shaoxuan

On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]> wrote:

> Hi Radu,
>
> thanks for your comments!
>
> Yes, my intention is to open new JIRA issues to structure the development
> process. Everybody is very welcome to pick up issues and discuss the design
> proposals.
> At the moment I see the following six issues to start with:
>
> - streaming SQL OVER ROW for processing time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> - streaming SQL OVER RANGE for processing time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> - streaming SQL OVER RANGE for event time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> For each of these windows we need corresponding translation rules and
> execution code.
>
> Subsequent JIRAs would be
> - extending the Table API for supported SQL windows
> - add support for FOLLOWING
> - etc.
>
> Regarding the requirement for a sorted state. I am not sure if the OVER
> windows should be implemented using Flink's DataStream window framework.
> We need a good design document to figure out what is the best approach. A
> ProcessFunction with a sorted state might be a good solution as well.
>
> Best, Fabian
>
>
> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
>
> > Hi all,
> >
> > Thanks for starting these discussion - it is very useful.
> > It does make sense indeed to refactor all these and coordinate a bit the
> > efforts not to have overlapping implementations and incompatible
> solutions.
> >
> > If you close the 3 jira issues you mentioned - do you plan to redesign
> > them and open new ones? Do you need help from our side - we can also pick
> > the redesign of some of these new jira issues. For example we already
> have
> > an implementation for this and we can help with the design. Nevertheless,
> > let's coordinate the effort.
> >
> > Regarding the support for the different types of window - I think the
> best
> > option is to split the implementation in small units. We can easily do
> this
> > from the transformation rule class and with this each particular type of
> > window (session/sliding/sliderows/processing time/...) will have a clear
> > implementation and a corresponding architecture within the jira issue?
> What
> > do you think about such a granularity?
> >
> > Regarding the issue of " Q4: The implementaion of SlideRows still need a
> > custom operator that collects records in a priority queue ordered by the
> > "rowtime", which is similar to the design we discussed in FLINK-4697,
> > right? "
> > Why would you need this operator? The window buffer can act to some
> extent
> > as a priority queue as long as the trigger and evictor is set to work
> based
> > on the rowtime - or maybe I am missing something... Can you please
> clarify
> > this.
> >
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert
> > IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: [hidden email]
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> >
> > -----Original Message-----
> > From: Jark Wu [mailto:[hidden email]]
> > Sent: Tuesday, January 24, 2017 6:53 AM
> > To: [hidden email]
> > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> for
> > streaming tables
> >
> > Hi Fabian,
> >
> > Thanks for bringing up this discussion and the nice approach to avoid
> > overlapping contributions.
> >
> > All of these make sense to me. But I have some questions.
> >
> > Q1: If I understand correctly, we will not support TumbleRows and
> > SessionRows at the beginning. But maybe support them as a syntax sugar
> (in
> > Table API) when the SlideRows is supported in the future. Right ?
> >
> > Q2: How to support SessionRows based on SlideRows ?  I don't get how to
> > partition on "gap-separated".
> >
> > Q3: Should we break down the approach into smaller tasks for streaming
> > tables and batch tables ?
> >
> > Q4: The implementaion of SlideRows still need a custom operator that
> > collects records in a priority queue ordered by the "rowtime", which is
> > similar to the design we discussed in FLINK-4697, right?
> >
> > +1 not support for OVER ROW for event time at this point.
> >
> > Regards, Jark
> >
> >
> > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > >
> > > Hi,
> > > We are also interested in streaming sql and very willing to participate
> > and contribute.
> > >
> > > We are now in progress and we will also contribute to calcite to push
> > forward the window and stream-join support.
> > >
> > >
> > >
> > > --------------
> > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time: 2017年1月24日
> > > 5:55
> > > Receiver: [hidden email]
> > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > > for streaming tables
> > >
> > > Hi Haohui,
> > >
> > > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > function [1] once is it is available (CALCITE-1345 [2]).
> > > Unfortunately, this issue does not seem to be very active, so I don't
> > know what the progress is.
> > >
> > > I would suggest to move the discussion about group windows to a
> separate
> > thread and keep this one focused on the organization of the SQL OVER
> > windows.
> > >
> > > Best,
> > > Fabian
> > >
> > > [1] http://calcite.apache.org/docs/stream.html)
> > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > >
> > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > >
> > >> Hi Fabian,
> > >>
> > >> FLINK-4692 has added the support for tumbling window and we are
> > >> excited to try it out and expose it as a SQL construct.
> > >>
> > >> Just curious -- what's your thought on the SQL syntax on tumbling
> > window?
> > >>
> > >> Implementation wise it might make sense to think tumbling window as a
> > >> special case of the sliding window.
> > >>
> > >> The problem I see is that the OVER construct might be insufficient to
> > >> support all the use cases of tumbling windows. For example, it fails
> > >> to express tumbling windows that have fractional time units (as
> > >> pointed out in http://calcite.apache.org/docs/stream.html).
> > >>
> > >> It looks to me that the Calcite / Azure Stream Analytics have
> > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this
> > issue.
> > >>
> > >> Do you think it is a good idea to follow the same conventions? Your
> > >> ideas are appreciated.
> > >>
> > >> Regards,
> > >> Haohui
> > >>
> > >>
> > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> We are also quite interested in these features and would love to
> > >>> participate and contribute.
> > >>>
> > >>> ~Haohui
> > >>>
> > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]>
> > wrote:
> > >>>
> > >>>> Hi everybody,
> > >>>>
> > >>>> it seems that currently several contributors are working on new
> > >>>> features for the streaming Table API / SQL around row windows (as
> > >>>> defined in
> > >>>> FLIP-11
> > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> > >>>> FLINK-5584).
> > >>>> Since these efforts overlap quite a bit I spent some time thinking
> > >>>> about how we can approach these features and how to avoid
> > >>>> overlapping contributions.
> > >>>>
> > >>>> The challenge here is the following. Some of the Table API row
> > >>>> windows
> > >> as
> > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
> > >>>> cannot be easily expressed as such (TumbleRows for row-count
> > >>>> intervals, SessionRows).
> > >>>> However, since Calcite already supports SQL OVER windows, we can
> > >>>> reuse
> > >> the
> > >>>> optimization logic for some of the Table API row windows. I also
> > >>>> thought about the semantics of the TumbleRows and SessionRows
> > >>>> windows as defined in
> > >>>> FLIP-11 and came to the conclusion that these are not well defined
> > >>>> in
> > >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> > >>>> special PARTITION BY clause.
> > >>>>
> > >>>> I propose to approach SQL OVER windows and Table API row windows as
> > >>>> follows:
> > >>>>
> > >>>> We start with three simple cases for SQL OVER windows (not Table
> > >>>> API
> > >> yet):
> > >>>>
> > >>>> * OVER RANGE for event time
> > >>>> * OVER RANGE for processing time
> > >>>> * OVER ROW for processing time
> > >>>>
> > >>>> All cases fulfill the following restrictions:
> > >>>> - All aggregations in SELECT must refer to the same window.
> > >>>> - PARTITION BY may not contain the rowtime attribute.
> > >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> > >>>> marker function that indicates processing time. Additional sort
> > >>>> attributes are not supported initially.
> > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> > >>>> PRECEDING AND CURRENT ROW" are supported.
> > >>>>
> > >>>> OVER ROW for event time cannot be easily supported. With event
> > >>>> time, we may have late records which need to be injected into the
> > >>>> order of records.
> > >>>> When
> > >>>> a record in injected in to the order where a row-count window has
> > >> already
> > >>>> been computed, this and all following windows will change. We could
> > >> either
> > >>>> drop the record or sent out many retraction records. I think it is
> > >>>> best
> > >> to
> > >>>> not open this can of worms at this point.
> > >>>>
> > >>>> The rational for all of the above restrictions is to have first
> > >>>> versions of OVER windows soon.
> > >>>> Once we have the above cases covered we can extend and remove
> > >> limitations
> > >>>> as follows:
> > >>>>
> > >>>> - Table API SlideRow windows (with the same restrictions as above).
> > >>>> This will be mostly API work since the execution part has been
> solved
> > before.
> > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > >>>> - Add support for different windows in SELECT. All windows must be
> > >>>> partitioned and ordered in the same way.
> > >>>> - Add support for additional ORDER BY attributes (besides time).
> > >>>>
> > >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> > >>>> are
> > >> not
> > >>>> well defined, IMO.
> > >>>> They can be expressed as SlideRows windows with special
> > >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
> > >>>> SessionRows) I would not start to work on those yet.
> > >>>>
> > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
> > >>>> of these
> > >> features
> > >>>> as outlined above with corresponding JIRA issues.
> > >>>>
> > >>>> What do others think? (I cc'ed the contributors assigned to the
> > >>>> above
> > >> JIRA
> > >>>> issues)
> > >>>>
> > >>>> Best, Fabian
> > >>>>
> > >>>> [1]
> > >>>>
> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > >> 11%3A+Table+API+Stream+Aggregations
> > >>>>
> > >>>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Hongyuhong
Hi shaoxuan,

Is that means there will be only one sorted state to save records, and an accumulator to save the reduce value per group.
Once an record arrive, it will first clean the out-time records and restract the reduce value and then accumulate the new record?
Thus is clearly with processing time, but if event-time, once a record arrive later, is that means it should recalculate the preceding time e.g.10 minutes 's all records?


-----邮件原件-----
发件人: Shaoxuan Wang [mailto:[hidden email]]
发送时间: 2017年1月25日 13:43
收件人: [hidden email]
主题: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

 Hi everyone,
Thanks for this great discussion, and glad to see more and more people are interested on stream SQL & tableAPI.

IMO, the key problems for Over window design are the SQL semantics and the runtime design. I totally agree with Fabian that we should skip the design of TumbleRows and SessionRows windows for now, as they are not well defined in SQL semantics.

Runtime design is the most crucial part we are interested in and volunteered to contribute into. We have thousands of machines running flink streaming jobs. The costs in terms of CPU, memory, and state are the vital factors that we have to taken into account. We have been working on the design of OVER window in the past months, and planning to send out a detailed design doc to DEV quite soon. But since Fabian started a good discussion on OVER window, I would like to share our ideas/thoughts about the runtime design for OVER window.

   1. As SunJincheng pointed out earlier, sliding window does not work for
   unbounded preceding, we need alternative approach for unbound over window.
   2. Though sliding window may work for some cases of bounded window,
   it is not very efficient thereby should not be used for production. To the
   best of my understanding, the current runtime implementation of sliding
   window has not leveraged the concepts of state Panes yet. This means that
   if we use sliding window for OVER window,  there will be a backend state
   created per each group (partition by) and each row, and whenever a new
   record arrives, it will be accumulated to all the existing windows that has
   not been closed. This would cause quite a lot of overhead in terms of both
   CPU and memory&state.
   3. Fabian has mentioned an approach of leveraging “ProcessFunction” and
   a “sortedState”. I like this idea. The design details on this are not quite
   clear yet. So I would like to add more thoughts on this. Regardless
   which dataStream API we are going to use (it is very likely that we need
   a new API), we should come out with an optimal approach. The purpose of
   grouping window and over window is to partition the data, such that we can
   generate the aggregate results. So when we talk about the design of OVER
   window, we have to think about the aggregates. As we proposed in our recent
   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will be
   stored in the aggregate state. Besides accumulator, we have also introduced
   a retract API for UDAGG. With aggregate accumulator and retract API, I am
   proposing a runtime approach to implement the OVER window as followings.
   4.
      - We first implement a sorted state interface
      - Per each group, we just create one sorted state. When a new record
      arrives, it will insert into this sorted state, in the meanwhile it will be
      accumulated to the aggregate accumulator.
      - For over window, we keep the aggregate accumulator for the entire
      job lifelong time. This is different than the case where we delete the
      accumulator for each group/window when a grouping-window is finished.
      - When an over window is up to trigger, we grab the
      previous accumulator from the state and accumulate values onto it with all
      the records till the upperBoundary of the current window, and retract all
      the out of scope records till its lowerBoundary. We emit the
      aggregate result and save the accumulator for the next window.


Hello Fabian,
I would suggest we should first start working on runtime design of over window and aggregate. Once we have a good design there, one can easily add the support for SQL as well as tableAPI. What do you think?

Regards,
Shaoxuan

On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]> wrote:

> Hi Radu,
>
> thanks for your comments!
>
> Yes, my intention is to open new JIRA issues to structure the
> development process. Everybody is very welcome to pick up issues and
> discuss the design proposals.
> At the moment I see the following six issues to start with:
>
> - streaming SQL OVER ROW for processing time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> - streaming SQL OVER RANGE for processing time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> - streaming SQL OVER RANGE for event time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> For each of these windows we need corresponding translation rules and
> execution code.
>
> Subsequent JIRAs would be
> - extending the Table API for supported SQL windows
> - add support for FOLLOWING
> - etc.
>
> Regarding the requirement for a sorted state. I am not sure if the
> OVER windows should be implemented using Flink's DataStream window framework.
> We need a good design document to figure out what is the best
> approach. A ProcessFunction with a sorted state might be a good solution as well.
>
> Best, Fabian
>
>
> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
>
> > Hi all,
> >
> > Thanks for starting these discussion - it is very useful.
> > It does make sense indeed to refactor all these and coordinate a bit
> > the efforts not to have overlapping implementations and incompatible
> solutions.
> >
> > If you close the 3 jira issues you mentioned - do you plan to
> > redesign them and open new ones? Do you need help from our side - we
> > can also pick the redesign of some of these new jira issues. For
> > example we already
> have
> > an implementation for this and we can help with the design.
> > Nevertheless, let's coordinate the effort.
> >
> > Regarding the support for the different types of window - I think
> > the
> best
> > option is to split the implementation in small units. We can easily
> > do
> this
> > from the transformation rule class and with this each particular
> > type of window (session/sliding/sliderows/processing time/...) will
> > have a clear implementation and a corresponding architecture within the jira issue?
> What
> > do you think about such a granularity?
> >
> > Regarding the issue of " Q4: The implementaion of SlideRows still
> > need a custom operator that collects records in a priority queue
> > ordered by the "rowtime", which is similar to the design we
> > discussed in FLINK-4697, right? "
> > Why would you need this operator? The window buffer can act to some
> extent
> > as a priority queue as long as the trigger and evictor is set to
> > work
> based
> > on the rowtime - or maybe I am missing something... Can you please
> clarify
> > this.
> >
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: [hidden email]
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> >
> > -----Original Message-----
> > From: Jark Wu [mailto:[hidden email]]
> > Sent: Tuesday, January 24, 2017 6:53 AM
> > To: [hidden email]
> > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> for
> > streaming tables
> >
> > Hi Fabian,
> >
> > Thanks for bringing up this discussion and the nice approach to avoid
> > overlapping contributions.
> >
> > All of these make sense to me. But I have some questions.
> >
> > Q1: If I understand correctly, we will not support TumbleRows and
> > SessionRows at the beginning. But maybe support them as a syntax sugar
> (in
> > Table API) when the SlideRows is supported in the future. Right ?
> >
> > Q2: How to support SessionRows based on SlideRows ?  I don't get how to
> > partition on "gap-separated".
> >
> > Q3: Should we break down the approach into smaller tasks for streaming
> > tables and batch tables ?
> >
> > Q4: The implementaion of SlideRows still need a custom operator that
> > collects records in a priority queue ordered by the "rowtime", which is
> > similar to the design we discussed in FLINK-4697, right?
> >
> > +1 not support for OVER ROW for event time at this point.
> >
> > Regards, Jark
> >
> >
> > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > >
> > > Hi,
> > > We are also interested in streaming sql and very willing to participate
> > and contribute.
> > >
> > > We are now in progress and we will also contribute to calcite to push
> > forward the window and stream-join support.
> > >
> > >
> > >
> > > --------------
> > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time: 2017年1月24日
> > > 5:55
> > > Receiver: [hidden email]
> > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > > for streaming tables
> > >
> > > Hi Haohui,
> > >
> > > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > function [1] once is it is available (CALCITE-1345 [2]).
> > > Unfortunately, this issue does not seem to be very active, so I don't
> > know what the progress is.
> > >
> > > I would suggest to move the discussion about group windows to a
> separate
> > thread and keep this one focused on the organization of the SQL OVER
> > windows.
> > >
> > > Best,
> > > Fabian
> > >
> > > [1] http://calcite.apache.org/docs/stream.html)
> > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > >
> > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > >
> > >> Hi Fabian,
> > >>
> > >> FLINK-4692 has added the support for tumbling window and we are
> > >> excited to try it out and expose it as a SQL construct.
> > >>
> > >> Just curious -- what's your thought on the SQL syntax on tumbling
> > window?
> > >>
> > >> Implementation wise it might make sense to think tumbling window as a
> > >> special case of the sliding window.
> > >>
> > >> The problem I see is that the OVER construct might be insufficient to
> > >> support all the use cases of tumbling windows. For example, it fails
> > >> to express tumbling windows that have fractional time units (as
> > >> pointed out in http://calcite.apache.org/docs/stream.html).
> > >>
> > >> It looks to me that the Calcite / Azure Stream Analytics have
> > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this
> > issue.
> > >>
> > >> Do you think it is a good idea to follow the same conventions? Your
> > >> ideas are appreciated.
> > >>
> > >> Regards,
> > >> Haohui
> > >>
> > >>
> > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> We are also quite interested in these features and would love to
> > >>> participate and contribute.
> > >>>
> > >>> ~Haohui
> > >>>
> > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]>
> > wrote:
> > >>>
> > >>>> Hi everybody,
> > >>>>
> > >>>> it seems that currently several contributors are working on new
> > >>>> features for the streaming Table API / SQL around row windows (as
> > >>>> defined in
> > >>>> FLIP-11
> > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> > >>>> FLINK-5584).
> > >>>> Since these efforts overlap quite a bit I spent some time thinking
> > >>>> about how we can approach these features and how to avoid
> > >>>> overlapping contributions.
> > >>>>
> > >>>> The challenge here is the following. Some of the Table API row
> > >>>> windows
> > >> as
> > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
> > >>>> cannot be easily expressed as such (TumbleRows for row-count
> > >>>> intervals, SessionRows).
> > >>>> However, since Calcite already supports SQL OVER windows, we can
> > >>>> reuse
> > >> the
> > >>>> optimization logic for some of the Table API row windows. I also
> > >>>> thought about the semantics of the TumbleRows and SessionRows
> > >>>> windows as defined in
> > >>>> FLIP-11 and came to the conclusion that these are not well defined
> > >>>> in
> > >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> > >>>> special PARTITION BY clause.
> > >>>>
> > >>>> I propose to approach SQL OVER windows and Table API row windows as
> > >>>> follows:
> > >>>>
> > >>>> We start with three simple cases for SQL OVER windows (not Table
> > >>>> API
> > >> yet):
> > >>>>
> > >>>> * OVER RANGE for event time
> > >>>> * OVER RANGE for processing time
> > >>>> * OVER ROW for processing time
> > >>>>
> > >>>> All cases fulfill the following restrictions:
> > >>>> - All aggregations in SELECT must refer to the same window.
> > >>>> - PARTITION BY may not contain the rowtime attribute.
> > >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> > >>>> marker function that indicates processing time. Additional sort
> > >>>> attributes are not supported initially.
> > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> > >>>> PRECEDING AND CURRENT ROW" are supported.
> > >>>>
> > >>>> OVER ROW for event time cannot be easily supported. With event
> > >>>> time, we may have late records which need to be injected into the
> > >>>> order of records.
> > >>>> When
> > >>>> a record in injected in to the order where a row-count window has
> > >> already
> > >>>> been computed, this and all following windows will change. We could
> > >> either
> > >>>> drop the record or sent out many retraction records. I think it is
> > >>>> best
> > >> to
> > >>>> not open this can of worms at this point.
> > >>>>
> > >>>> The rational for all of the above restrictions is to have first
> > >>>> versions of OVER windows soon.
> > >>>> Once we have the above cases covered we can extend and remove
> > >> limitations
> > >>>> as follows:
> > >>>>
> > >>>> - Table API SlideRow windows (with the same restrictions as above).
> > >>>> This will be mostly API work since the execution part has been
> solved
> > before.
> > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > >>>> - Add support for different windows in SELECT. All windows must be
> > >>>> partitioned and ordered in the same way.
> > >>>> - Add support for additional ORDER BY attributes (besides time).
> > >>>>
> > >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> > >>>> are
> > >> not
> > >>>> well defined, IMO.
> > >>>> They can be expressed as SlideRows windows with special
> > >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
> > >>>> SessionRows) I would not start to work on those yet.
> > >>>>
> > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
> > >>>> of these
> > >> features
> > >>>> as outlined above with corresponding JIRA issues.
> > >>>>
> > >>>> What do others think? (I cc'ed the contributors assigned to the
> > >>>> above
> > >> JIRA
> > >>>> issues)
> > >>>>
> > >>>> Best, Fabian
> > >>>>
> > >>>> [1]
> > >>>>
> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > >> 11%3A+Table+API+Stream+Aggregations
> > >>>>
> > >>>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

liuxinchun
In reply to this post by Shaoxuan Wang
I don't think it is a good idea to process Event time window in Sliding Row Window. In Sliding Time window, when an element is late, we can trigger the recalculation of the related windows. And the sliding period is coarse-gained, We only need to recalculate size/sliding number of windows. But in Sliding Row Window, the calculation is triggered when every element is coming. The sliding period is becoming fine-gained. When an element is late, there are so many "windows" are influenced. Even if we store all the raw data, the computation is very large.

I think if it is possible to set a standard to sliding Event Time Row Window, When certain elements are late, we can only recalculate partial windows and permit some error. For example, we can only recalculate the windows end in range between (lateElement.timestamp - leftDelta, lateElement.timestamp] and those windows begin in range between [lateElement.timestamp, lateElement.timestamp + rightDelta).
//////////////////////////////////////////////////////////////////////////////////////
 Hi everyone,
Thanks for this great discussion, and glad to see more and more people are interested on stream SQL & tableAPI.

IMO, the key problems for Over window design are the SQL semantics and the runtime design. I totally agree with Fabian that we should skip the design of TumbleRows and SessionRows windows for now, as they are not well defined in SQL semantics.

Runtime design is the most crucial part we are interested in and volunteered to contribute into. We have thousands of machines running flink streaming jobs. The costs in terms of CPU, memory, and state are the vital factors that we have to taken into account. We have been working on the design of OVER window in the past months, and planning to send out a detailed design doc to DEV quite soon. But since Fabian started a good discussion on OVER window, I would like to share our ideas/thoughts about the runtime design for OVER window.

   1. As SunJincheng pointed out earlier, sliding window does not work for
   unbounded preceding, we need alternative approach for unbound over window.
   2. Though sliding window may work for some cases of bounded window,
   it is not very efficient thereby should not be used for production. To the
   best of my understanding, the current runtime implementation of sliding
   window has not leveraged the concepts of state Panes yet. This means that
   if we use sliding window for OVER window,  there will be a backend state
   created per each group (partition by) and each row, and whenever a new
   record arrives, it will be accumulated to all the existing windows that has
   not been closed. This would cause quite a lot of overhead in terms of both
   CPU and memory&state.
   3. Fabian has mentioned an approach of leveraging “ProcessFunction” and
   a “sortedState”. I like this idea. The design details on this are not quite
   clear yet. So I would like to add more thoughts on this. Regardless
   which dataStream API we are going to use (it is very likely that we need
   a new API), we should come out with an optimal approach. The purpose of
   grouping window and over window is to partition the data, such that we can
   generate the aggregate results. So when we talk about the design of OVER
   window, we have to think about the aggregates. As we proposed in our recent
   UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will be
   stored in the aggregate state. Besides accumulator, we have also introduced
   a retract API for UDAGG. With aggregate accumulator and retract API, I am
   proposing a runtime approach to implement the OVER window as followings.
   4.
      - We first implement a sorted state interface
      - Per each group, we just create one sorted state. When a new record
      arrives, it will insert into this sorted state, in the meanwhile it will be
      accumulated to the aggregate accumulator.
      - For over window, we keep the aggregate accumulator for the entire
      job lifelong time. This is different than the case where we delete the
      accumulator for each group/window when a grouping-window is finished.
      - When an over window is up to trigger, we grab the
      previous accumulator from the state and accumulate values onto it with all
      the records till the upperBoundary of the current window, and retract all
      the out of scope records till its lowerBoundary. We emit the
      aggregate result and save the accumulator for the next window.


Hello Fabian,
I would suggest we should first start working on runtime design of over window and aggregate. Once we have a good design there, one can easily add the support for SQL as well as tableAPI. What do you think?

Regards,
Shaoxuan

On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]> wrote:

> Hi Radu,
>
> thanks for your comments!
>
> Yes, my intention is to open new JIRA issues to structure the
> development process. Everybody is very welcome to pick up issues and
> discuss the design proposals.
> At the moment I see the following six issues to start with:
>
> - streaming SQL OVER ROW for processing time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> - streaming SQL OVER RANGE for processing time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> - streaming SQL OVER RANGE for event time
>   - bounded PRECEDING
>   - unbounded PRECEDING
>
> For each of these windows we need corresponding translation rules and
> execution code.
>
> Subsequent JIRAs would be
> - extending the Table API for supported SQL windows
> - add support for FOLLOWING
> - etc.
>
> Regarding the requirement for a sorted state. I am not sure if the
> OVER windows should be implemented using Flink's DataStream window framework.
> We need a good design document to figure out what is the best
> approach. A ProcessFunction with a sorted state might be a good solution as well.
>
> Best, Fabian
>
>
> 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
>
> > Hi all,
> >
> > Thanks for starting these discussion - it is very useful.
> > It does make sense indeed to refactor all these and coordinate a bit
> > the efforts not to have overlapping implementations and incompatible
> solutions.
> >
> > If you close the 3 jira issues you mentioned - do you plan to
> > redesign them and open new ones? Do you need help from our side - we
> > can also pick the redesign of some of these new jira issues. For
> > example we already
> have
> > an implementation for this and we can help with the design.
> > Nevertheless, let's coordinate the effort.
> >
> > Regarding the support for the different types of window - I think
> > the
> best
> > option is to split the implementation in small units. We can easily
> > do
> this
> > from the transformation rule class and with this each particular
> > type of window (session/sliding/sliderows/processing time/...) will
> > have a clear implementation and a corresponding architecture within the jira issue?
> What
> > do you think about such a granularity?
> >
> > Regarding the issue of " Q4: The implementaion of SlideRows still
> > need a custom operator that collects records in a priority queue
> > ordered by the "rowtime", which is similar to the design we
> > discussed in FLINK-4697, right? "
> > Why would you need this operator? The window buffer can act to some
> extent
> > as a priority queue as long as the trigger and evictor is set to
> > work
> based
> > on the rowtime - or maybe I am missing something... Can you please
> clarify
> > this.
> >
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: [hidden email]
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> >
> > -----Original Message-----
> > From: Jark Wu [mailto:[hidden email]]
> > Sent: Tuesday, January 24, 2017 6:53 AM
> > To: [hidden email]
> > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> for
> > streaming tables
> >
> > Hi Fabian,
> >
> > Thanks for bringing up this discussion and the nice approach to avoid
> > overlapping contributions.
> >
> > All of these make sense to me. But I have some questions.
> >
> > Q1: If I understand correctly, we will not support TumbleRows and
> > SessionRows at the beginning. But maybe support them as a syntax sugar
> (in
> > Table API) when the SlideRows is supported in the future. Right ?
> >
> > Q2: How to support SessionRows based on SlideRows ?  I don't get how to
> > partition on "gap-separated".
> >
> > Q3: Should we break down the approach into smaller tasks for streaming
> > tables and batch tables ?
> >
> > Q4: The implementaion of SlideRows still need a custom operator that
> > collects records in a priority queue ordered by the "rowtime", which is
> > similar to the design we discussed in FLINK-4697, right?
> >
> > +1 not support for OVER ROW for event time at this point.
> >
> > Regards, Jark
> >
> >
> > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > >
> > > Hi,
> > > We are also interested in streaming sql and very willing to participate
> > and contribute.
> > >
> > > We are now in progress and we will also contribute to calcite to push
> > forward the window and stream-join support.
> > >
> > >
> > >
> > > --------------
> > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time: 2017年1月24日
> > > 5:55
> > > Receiver: [hidden email]
> > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > > for streaming tables
> > >
> > > Hi Haohui,
> > >
> > > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > function [1] once is it is available (CALCITE-1345 [2]).
> > > Unfortunately, this issue does not seem to be very active, so I don't
> > know what the progress is.
> > >
> > > I would suggest to move the discussion about group windows to a
> separate
> > thread and keep this one focused on the organization of the SQL OVER
> > windows.
> > >
> > > Best,
> > > Fabian
> > >
> > > [1] http://calcite.apache.org/docs/stream.html)
> > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > >
> > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > >
> > >> Hi Fabian,
> > >>
> > >> FLINK-4692 has added the support for tumbling window and we are
> > >> excited to try it out and expose it as a SQL construct.
> > >>
> > >> Just curious -- what's your thought on the SQL syntax on tumbling
> > window?
> > >>
> > >> Implementation wise it might make sense to think tumbling window as a
> > >> special case of the sliding window.
> > >>
> > >> The problem I see is that the OVER construct might be insufficient to
> > >> support all the use cases of tumbling windows. For example, it fails
> > >> to express tumbling windows that have fractional time units (as
> > >> pointed out in http://calcite.apache.org/docs/stream.html).
> > >>
> > >> It looks to me that the Calcite / Azure Stream Analytics have
> > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this
> > issue.
> > >>
> > >> Do you think it is a good idea to follow the same conventions? Your
> > >> ideas are appreciated.
> > >>
> > >> Regards,
> > >> Haohui
> > >>
> > >>
> > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> We are also quite interested in these features and would love to
> > >>> participate and contribute.
> > >>>
> > >>> ~Haohui
> > >>>
> > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]>
> > wrote:
> > >>>
> > >>>> Hi everybody,
> > >>>>
> > >>>> it seems that currently several contributors are working on new
> > >>>> features for the streaming Table API / SQL around row windows (as
> > >>>> defined in
> > >>>> FLIP-11
> > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
> > >>>> FLINK-5584).
> > >>>> Since these efforts overlap quite a bit I spent some time thinking
> > >>>> about how we can approach these features and how to avoid
> > >>>> overlapping contributions.
> > >>>>
> > >>>> The challenge here is the following. Some of the Table API row
> > >>>> windows
> > >> as
> > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
> > >>>> cannot be easily expressed as such (TumbleRows for row-count
> > >>>> intervals, SessionRows).
> > >>>> However, since Calcite already supports SQL OVER windows, we can
> > >>>> reuse
> > >> the
> > >>>> optimization logic for some of the Table API row windows. I also
> > >>>> thought about the semantics of the TumbleRows and SessionRows
> > >>>> windows as defined in
> > >>>> FLIP-11 and came to the conclusion that these are not well defined
> > >>>> in
> > >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> > >>>> special PARTITION BY clause.
> > >>>>
> > >>>> I propose to approach SQL OVER windows and Table API row windows as
> > >>>> follows:
> > >>>>
> > >>>> We start with three simple cases for SQL OVER windows (not Table
> > >>>> API
> > >> yet):
> > >>>>
> > >>>> * OVER RANGE for event time
> > >>>> * OVER RANGE for processing time
> > >>>> * OVER ROW for processing time
> > >>>>
> > >>>> All cases fulfill the following restrictions:
> > >>>> - All aggregations in SELECT must refer to the same window.
> > >>>> - PARTITION BY may not contain the rowtime attribute.
> > >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> > >>>> marker function that indicates processing time. Additional sort
> > >>>> attributes are not supported initially.
> > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
> > >>>> PRECEDING AND CURRENT ROW" are supported.
> > >>>>
> > >>>> OVER ROW for event time cannot be easily supported. With event
> > >>>> time, we may have late records which need to be injected into the
> > >>>> order of records.
> > >>>> When
> > >>>> a record in injected in to the order where a row-count window has
> > >> already
> > >>>> been computed, this and all following windows will change. We could
> > >> either
> > >>>> drop the record or sent out many retraction records. I think it is
> > >>>> best
> > >> to
> > >>>> not open this can of worms at this point.
> > >>>>
> > >>>> The rational for all of the above restrictions is to have first
> > >>>> versions of OVER windows soon.
> > >>>> Once we have the above cases covered we can extend and remove
> > >> limitations
> > >>>> as follows:
> > >>>>
> > >>>> - Table API SlideRow windows (with the same restrictions as above).
> > >>>> This will be mostly API work since the execution part has been
> solved
> > before.
> > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > >>>> - Add support for different windows in SELECT. All windows must be
> > >>>> partitioned and ordered in the same way.
> > >>>> - Add support for additional ORDER BY attributes (besides time).
> > >>>>
> > >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> > >>>> are
> > >> not
> > >>>> well defined, IMO.
> > >>>> They can be expressed as SlideRows windows with special
> > >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
> > >>>> SessionRows) I would not start to work on those yet.
> > >>>>
> > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the development
> > >>>> of these
> > >> features
> > >>>> as outlined above with corresponding JIRA issues.
> > >>>>
> > >>>> What do others think? (I cc'ed the contributors assigned to the
> > >>>> above
> > >> JIRA
> > >>>> issues)
> > >>>>
> > >>>> Best, Fabian
> > >>>>
> > >>>> [1]
> > >>>>
> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > >> 11%3A+Table+API+Stream+Aggregations
> > >>>>
> > >>>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Shaoxuan Wang
In reply to this post by Hongyuhong
Hi Hongyu,
Nice to virtually meet you here and thanks for asking. You raised a good
question. Your question is more about how to refine a result, "retraction".
This is a another big building block for streaming we would like to address
in the near future. To the best of my understanding, in the current design,
for already emitted window, we do not support the refinement, thereby we
will not recalculate and send the retracted data for the emitted window.
More specifically, in my proposal, runtime backend saves all records in a
sorted state. For each group, it maintains one accumulator state, runtime
knows the boundary of the current accumulator state. The late arrival
events will be inserted into the sorted state. Whether it will be
accumulated to the current accumulator state or not, is really depending on
the boundary. If the late record with event-time is within the boundary, it
will be accumulated onto the accumulator, and this late record will be
taken effect in the next window (we do not refine the results for the
emitted window). If your application SLA cannot tolerate the drops of late
arrival records (in the emitted window), you can tune the watermark to
improve QoS.

Regards,
Shaoxuan



On Wed, Jan 25, 2017 at 3:16 PM, Hongyuhong <[hidden email]> wrote:

> Hi shaoxuan,
>
> Is that means there will be only one sorted state to save records, and an
> accumulator to save the reduce value per group.
> Once an record arrive, it will first clean the out-time records and
> restract the reduce value and then accumulate the new record?
> Thus is clearly with processing time, but if event-time, once a record
> arrive later, is that means it should recalculate the preceding time e.g.10
> minutes 's all records?
>
>
> -----邮件原件-----
> 发件人: Shaoxuan Wang [mailto:[hidden email]]
> 发送时间: 2017年1月25日 13:43
> 收件人: [hidden email]
> 主题: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for
> streaming tables
>
>  Hi everyone,
> Thanks for this great discussion, and glad to see more and more people are
> interested on stream SQL & tableAPI.
>
> IMO, the key problems for Over window design are the SQL semantics and the
> runtime design. I totally agree with Fabian that we should skip the design
> of TumbleRows and SessionRows windows for now, as they are not well defined
> in SQL semantics.
>
> Runtime design is the most crucial part we are interested in and
> volunteered to contribute into. We have thousands of machines running flink
> streaming jobs. The costs in terms of CPU, memory, and state are the vital
> factors that we have to taken into account. We have been working on the
> design of OVER window in the past months, and planning to send out a
> detailed design doc to DEV quite soon. But since Fabian started a good
> discussion on OVER window, I would like to share our ideas/thoughts about
> the runtime design for OVER window.
>
>    1. As SunJincheng pointed out earlier, sliding window does not work for
>    unbounded preceding, we need alternative approach for unbound over
> window.
>    2. Though sliding window may work for some cases of bounded window,
>    it is not very efficient thereby should not be used for production. To
> the
>    best of my understanding, the current runtime implementation of sliding
>    window has not leveraged the concepts of state Panes yet. This means
> that
>    if we use sliding window for OVER window,  there will be a backend state
>    created per each group (partition by) and each row, and whenever a new
>    record arrives, it will be accumulated to all the existing windows that
> has
>    not been closed. This would cause quite a lot of overhead in terms of
> both
>    CPU and memory&state.
>    3. Fabian has mentioned an approach of leveraging “ProcessFunction” and
>    a “sortedState”. I like this idea. The design details on this are not
> quite
>    clear yet. So I would like to add more thoughts on this. Regardless
>    which dataStream API we are going to use (it is very likely that we need
>    a new API), we should come out with an optimal approach. The purpose of
>    grouping window and over window is to partition the data, such that we
> can
>    generate the aggregate results. So when we talk about the design of OVER
>    window, we have to think about the aggregates. As we proposed in our
> recent
>    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will be
>    stored in the aggregate state. Besides accumulator, we have also
> introduced
>    a retract API for UDAGG. With aggregate accumulator and retract API, I
> am
>    proposing a runtime approach to implement the OVER window as followings.
>    4.
>       - We first implement a sorted state interface
>       - Per each group, we just create one sorted state. When a new record
>       arrives, it will insert into this sorted state, in the meanwhile it
> will be
>       accumulated to the aggregate accumulator.
>       - For over window, we keep the aggregate accumulator for the entire
>       job lifelong time. This is different than the case where we delete
> the
>       accumulator for each group/window when a grouping-window is finished.
>       - When an over window is up to trigger, we grab the
>       previous accumulator from the state and accumulate values onto it
> with all
>       the records till the upperBoundary of the current window, and
> retract all
>       the out of scope records till its lowerBoundary. We emit the
>       aggregate result and save the accumulator for the next window.
>
>
> Hello Fabian,
> I would suggest we should first start working on runtime design of over
> window and aggregate. Once we have a good design there, one can easily add
> the support for SQL as well as tableAPI. What do you think?
>
> Regards,
> Shaoxuan
>
> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]> wrote:
>
> > Hi Radu,
> >
> > thanks for your comments!
> >
> > Yes, my intention is to open new JIRA issues to structure the
> > development process. Everybody is very welcome to pick up issues and
> > discuss the design proposals.
> > At the moment I see the following six issues to start with:
> >
> > - streaming SQL OVER ROW for processing time
> >   - bounded PRECEDING
> >   - unbounded PRECEDING
> >
> > - streaming SQL OVER RANGE for processing time
> >   - bounded PRECEDING
> >   - unbounded PRECEDING
> >
> > - streaming SQL OVER RANGE for event time
> >   - bounded PRECEDING
> >   - unbounded PRECEDING
> >
> > For each of these windows we need corresponding translation rules and
> > execution code.
> >
> > Subsequent JIRAs would be
> > - extending the Table API for supported SQL windows
> > - add support for FOLLOWING
> > - etc.
> >
> > Regarding the requirement for a sorted state. I am not sure if the
> > OVER windows should be implemented using Flink's DataStream window
> framework.
> > We need a good design document to figure out what is the best
> > approach. A ProcessFunction with a sorted state might be a good solution
> as well.
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> >
> > > Hi all,
> > >
> > > Thanks for starting these discussion - it is very useful.
> > > It does make sense indeed to refactor all these and coordinate a bit
> > > the efforts not to have overlapping implementations and incompatible
> > solutions.
> > >
> > > If you close the 3 jira issues you mentioned - do you plan to
> > > redesign them and open new ones? Do you need help from our side - we
> > > can also pick the redesign of some of these new jira issues. For
> > > example we already
> > have
> > > an implementation for this and we can help with the design.
> > > Nevertheless, let's coordinate the effort.
> > >
> > > Regarding the support for the different types of window - I think
> > > the
> > best
> > > option is to split the implementation in small units. We can easily
> > > do
> > this
> > > from the transformation rule class and with this each particular
> > > type of window (session/sliding/sliderows/processing time/...) will
> > > have a clear implementation and a corresponding architecture within
> the jira issue?
> > What
> > > do you think about such a granularity?
> > >
> > > Regarding the issue of " Q4: The implementaion of SlideRows still
> > > need a custom operator that collects records in a priority queue
> > > ordered by the "rowtime", which is similar to the design we
> > > discussed in FLINK-4697, right? "
> > > Why would you need this operator? The window buffer can act to some
> > extent
> > > as a priority queue as long as the trigger and evictor is set to
> > > work
> > based
> > > on the rowtime - or maybe I am missing something... Can you please
> > clarify
> > > this.
> > >
> > >
> > > Dr. Radu Tudoran
> > > Senior Research Engineer - Big Data Expert IT R&D Division
> > >
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > European Research Center
> > > Riesstrasse 25, 80992 München
> > >
> > > E-mail: [hidden email]
> > > Mobile: +49 15209084330
> > > Telephone: +49 891588344173
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > This e-mail and its attachments contain confidential information from
> > > HUAWEI, which is intended only for the person or entity whose address
> is
> > > listed above. Any use of the information contained herein in any way
> > > (including, but not limited to, total or partial disclosure,
> > reproduction,
> > > or dissemination) by persons other than the intended recipient(s) is
> > > prohibited. If you receive this e-mail in error, please notify the
> sender
> > > by phone or email immediately and delete it!
> > >
> > >
> > > -----Original Message-----
> > > From: Jark Wu [mailto:[hidden email]]
> > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > To: [hidden email]
> > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > for
> > > streaming tables
> > >
> > > Hi Fabian,
> > >
> > > Thanks for bringing up this discussion and the nice approach to avoid
> > > overlapping contributions.
> > >
> > > All of these make sense to me. But I have some questions.
> > >
> > > Q1: If I understand correctly, we will not support TumbleRows and
> > > SessionRows at the beginning. But maybe support them as a syntax sugar
> > (in
> > > Table API) when the SlideRows is supported in the future. Right ?
> > >
> > > Q2: How to support SessionRows based on SlideRows ?  I don't get how to
> > > partition on "gap-separated".
> > >
> > > Q3: Should we break down the approach into smaller tasks for streaming
> > > tables and batch tables ?
> > >
> > > Q4: The implementaion of SlideRows still need a custom operator that
> > > collects records in a priority queue ordered by the "rowtime", which is
> > > similar to the design we discussed in FLINK-4697, right?
> > >
> > > +1 not support for OVER ROW for event time at this point.
> > >
> > > Regards, Jark
> > >
> > >
> > > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > > >
> > > > Hi,
> > > > We are also interested in streaming sql and very willing to
> participate
> > > and contribute.
> > > >
> > > > We are now in progress and we will also contribute to calcite to push
> > > forward the window and stream-join support.
> > > >
> > > >
> > > >
> > > > --------------
> > > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> 2017年1月24日
> > > > 5:55
> > > > Receiver: [hidden email]
> > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > > > for streaming tables
> > > >
> > > > Hi Haohui,
> > > >
> > > > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > Unfortunately, this issue does not seem to be very active, so I don't
> > > know what the progress is.
> > > >
> > > > I would suggest to move the discussion about group windows to a
> > separate
> > > thread and keep this one focused on the organization of the SQL OVER
> > > windows.
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > >
> > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > > >
> > > >> Hi Fabian,
> > > >>
> > > >> FLINK-4692 has added the support for tumbling window and we are
> > > >> excited to try it out and expose it as a SQL construct.
> > > >>
> > > >> Just curious -- what's your thought on the SQL syntax on tumbling
> > > window?
> > > >>
> > > >> Implementation wise it might make sense to think tumbling window as
> a
> > > >> special case of the sliding window.
> > > >>
> > > >> The problem I see is that the OVER construct might be insufficient
> to
> > > >> support all the use cases of tumbling windows. For example, it fails
> > > >> to express tumbling windows that have fractional time units (as
> > > >> pointed out in http://calcite.apache.org/docs/stream.html).
> > > >>
> > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this
> > > issue.
> > > >>
> > > >> Do you think it is a good idea to follow the same conventions? Your
> > > >> ideas are appreciated.
> > > >>
> > > >> Regards,
> > > >> Haohui
> > > >>
> > > >>
> > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> > wrote:
> > > >>
> > > >>> +1
> > > >>>
> > > >>> We are also quite interested in these features and would love to
> > > >>> participate and contribute.
> > > >>>
> > > >>> ~Haohui
> > > >>>
> > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]>
> > > wrote:
> > > >>>
> > > >>>> Hi everybody,
> > > >>>>
> > > >>>> it seems that currently several contributors are working on new
> > > >>>> features for the streaming Table API / SQL around row windows (as
> > > >>>> defined in
> > > >>>> FLIP-11
> > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> FLINK-4680,
> > > >>>> FLINK-5584).
> > > >>>> Since these efforts overlap quite a bit I spent some time thinking
> > > >>>> about how we can approach these features and how to avoid
> > > >>>> overlapping contributions.
> > > >>>>
> > > >>>> The challenge here is the following. Some of the Table API row
> > > >>>> windows
> > > >> as
> > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
> > > >>>> cannot be easily expressed as such (TumbleRows for row-count
> > > >>>> intervals, SessionRows).
> > > >>>> However, since Calcite already supports SQL OVER windows, we can
> > > >>>> reuse
> > > >> the
> > > >>>> optimization logic for some of the Table API row windows. I also
> > > >>>> thought about the semantics of the TumbleRows and SessionRows
> > > >>>> windows as defined in
> > > >>>> FLIP-11 and came to the conclusion that these are not well defined
> > > >>>> in
> > > >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> > > >>>> special PARTITION BY clause.
> > > >>>>
> > > >>>> I propose to approach SQL OVER windows and Table API row windows
> as
> > > >>>> follows:
> > > >>>>
> > > >>>> We start with three simple cases for SQL OVER windows (not Table
> > > >>>> API
> > > >> yet):
> > > >>>>
> > > >>>> * OVER RANGE for event time
> > > >>>> * OVER RANGE for processing time
> > > >>>> * OVER ROW for processing time
> > > >>>>
> > > >>>> All cases fulfill the following restrictions:
> > > >>>> - All aggregations in SELECT must refer to the same window.
> > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> > > >>>> marker function that indicates processing time. Additional sort
> > > >>>> attributes are not supported initially.
> > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN
> x
> > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > >>>>
> > > >>>> OVER ROW for event time cannot be easily supported. With event
> > > >>>> time, we may have late records which need to be injected into the
> > > >>>> order of records.
> > > >>>> When
> > > >>>> a record in injected in to the order where a row-count window has
> > > >> already
> > > >>>> been computed, this and all following windows will change. We
> could
> > > >> either
> > > >>>> drop the record or sent out many retraction records. I think it is
> > > >>>> best
> > > >> to
> > > >>>> not open this can of worms at this point.
> > > >>>>
> > > >>>> The rational for all of the above restrictions is to have first
> > > >>>> versions of OVER windows soon.
> > > >>>> Once we have the above cases covered we can extend and remove
> > > >> limitations
> > > >>>> as follows:
> > > >>>>
> > > >>>> - Table API SlideRow windows (with the same restrictions as
> above).
> > > >>>> This will be mostly API work since the execution part has been
> > solved
> > > before.
> > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > >>>> - Add support for different windows in SELECT. All windows must be
> > > >>>> partitioned and ordered in the same way.
> > > >>>> - Add support for additional ORDER BY attributes (besides time).
> > > >>>>
> > > >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> > > >>>> are
> > > >> not
> > > >>>> well defined, IMO.
> > > >>>> They can be expressed as SlideRows windows with special
> > > >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> > > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
> > > >>>> SessionRows) I would not start to work on those yet.
> > > >>>>
> > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> development
> > > >>>> of these
> > > >> features
> > > >>>> as outlined above with corresponding JIRA issues.
> > > >>>>
> > > >>>> What do others think? (I cc'ed the contributors assigned to the
> > > >>>> above
> > > >> JIRA
> > > >>>> issues)
> > > >>>>
> > > >>>> Best, Fabian
> > > >>>>
> > > >>>> [1]
> > > >>>>
> > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > >> 11%3A+Table+API+Stream+Aggregations
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Shaoxuan Wang
In reply to this post by liuxinchun
Hi Liuxinchun,
I am not sure where did you get the inception: anyone has suggested "to
process Event time window in Sliding Row Window". If you were referring my
post, there may be some misunderstanding there. I think you were asking the
similar question as Hongyuhong. I have just replied to him. Please take a
look and let me know if that makes sense to you. "Retraction" is an
important building block to compute correct incremental results in
streaming. It is another big topic, we should discuss this in another
thread.

Regards,
Shaoxuan



On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]> wrote:

> I don't think it is a good idea to process Event time window in Sliding
> Row Window. In Sliding Time window, when an element is late, we can trigger
> the recalculation of the related windows. And the sliding period is
> coarse-gained, We only need to recalculate size/sliding number of windows.
> But in Sliding Row Window, the calculation is triggered when every element
> is coming. The sliding period is becoming fine-gained. When an element is
> late, there are so many "windows" are influenced. Even if we store all the
> raw data, the computation is very large.
>
> I think if it is possible to set a standard to sliding Event Time Row
> Window, When certain elements are late, we can only recalculate partial
> windows and permit some error. For example, we can only recalculate the
> windows end in range between (lateElement.timestamp - leftDelta,
> lateElement.timestamp] and those windows begin in range between
> [lateElement.timestamp, lateElement.timestamp + rightDelta).
> ////////////////////////////////////////////////////////////
> //////////////////////////
>  Hi everyone,
> Thanks for this great discussion, and glad to see more and more people are
> interested on stream SQL & tableAPI.
>
> IMO, the key problems for Over window design are the SQL semantics and the
> runtime design. I totally agree with Fabian that we should skip the design
> of TumbleRows and SessionRows windows for now, as they are not well defined
> in SQL semantics.
>
> Runtime design is the most crucial part we are interested in and
> volunteered to contribute into. We have thousands of machines running flink
> streaming jobs. The costs in terms of CPU, memory, and state are the vital
> factors that we have to taken into account. We have been working on the
> design of OVER window in the past months, and planning to send out a
> detailed design doc to DEV quite soon. But since Fabian started a good
> discussion on OVER window, I would like to share our ideas/thoughts about
> the runtime design for OVER window.
>
>    1. As SunJincheng pointed out earlier, sliding window does not work for
>    unbounded preceding, we need alternative approach for unbound over
> window.
>    2. Though sliding window may work for some cases of bounded window,
>    it is not very efficient thereby should not be used for production. To
> the
>    best of my understanding, the current runtime implementation of sliding
>    window has not leveraged the concepts of state Panes yet. This means
> that
>    if we use sliding window for OVER window,  there will be a backend state
>    created per each group (partition by) and each row, and whenever a new
>    record arrives, it will be accumulated to all the existing windows that
> has
>    not been closed. This would cause quite a lot of overhead in terms of
> both
>    CPU and memory&state.
>    3. Fabian has mentioned an approach of leveraging “ProcessFunction” and
>    a “sortedState”. I like this idea. The design details on this are not
> quite
>    clear yet. So I would like to add more thoughts on this. Regardless
>    which dataStream API we are going to use (it is very likely that we need
>    a new API), we should come out with an optimal approach. The purpose of
>    grouping window and over window is to partition the data, such that we
> can
>    generate the aggregate results. So when we talk about the design of OVER
>    window, we have to think about the aggregates. As we proposed in our
> recent
>    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will be
>    stored in the aggregate state. Besides accumulator, we have also
> introduced
>    a retract API for UDAGG. With aggregate accumulator and retract API, I
> am
>    proposing a runtime approach to implement the OVER window as followings.
>    4.
>       - We first implement a sorted state interface
>       - Per each group, we just create one sorted state. When a new record
>       arrives, it will insert into this sorted state, in the meanwhile it
> will be
>       accumulated to the aggregate accumulator.
>       - For over window, we keep the aggregate accumulator for the entire
>       job lifelong time. This is different than the case where we delete
> the
>       accumulator for each group/window when a grouping-window is finished.
>       - When an over window is up to trigger, we grab the
>       previous accumulator from the state and accumulate values onto it
> with all
>       the records till the upperBoundary of the current window, and
> retract all
>       the out of scope records till its lowerBoundary. We emit the
>       aggregate result and save the accumulator for the next window.
>
>
> Hello Fabian,
> I would suggest we should first start working on runtime design of over
> window and aggregate. Once we have a good design there, one can easily add
> the support for SQL as well as tableAPI. What do you think?
>
> Regards,
> Shaoxuan
>
> On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]> wrote:
>
> > Hi Radu,
> >
> > thanks for your comments!
> >
> > Yes, my intention is to open new JIRA issues to structure the
> > development process. Everybody is very welcome to pick up issues and
> > discuss the design proposals.
> > At the moment I see the following six issues to start with:
> >
> > - streaming SQL OVER ROW for processing time
> >   - bounded PRECEDING
> >   - unbounded PRECEDING
> >
> > - streaming SQL OVER RANGE for processing time
> >   - bounded PRECEDING
> >   - unbounded PRECEDING
> >
> > - streaming SQL OVER RANGE for event time
> >   - bounded PRECEDING
> >   - unbounded PRECEDING
> >
> > For each of these windows we need corresponding translation rules and
> > execution code.
> >
> > Subsequent JIRAs would be
> > - extending the Table API for supported SQL windows
> > - add support for FOLLOWING
> > - etc.
> >
> > Regarding the requirement for a sorted state. I am not sure if the
> > OVER windows should be implemented using Flink's DataStream window
> framework.
> > We need a good design document to figure out what is the best
> > approach. A ProcessFunction with a sorted state might be a good solution
> as well.
> >
> > Best, Fabian
> >
> >
> > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> >
> > > Hi all,
> > >
> > > Thanks for starting these discussion - it is very useful.
> > > It does make sense indeed to refactor all these and coordinate a bit
> > > the efforts not to have overlapping implementations and incompatible
> > solutions.
> > >
> > > If you close the 3 jira issues you mentioned - do you plan to
> > > redesign them and open new ones? Do you need help from our side - we
> > > can also pick the redesign of some of these new jira issues. For
> > > example we already
> > have
> > > an implementation for this and we can help with the design.
> > > Nevertheless, let's coordinate the effort.
> > >
> > > Regarding the support for the different types of window - I think
> > > the
> > best
> > > option is to split the implementation in small units. We can easily
> > > do
> > this
> > > from the transformation rule class and with this each particular
> > > type of window (session/sliding/sliderows/processing time/...) will
> > > have a clear implementation and a corresponding architecture within
> the jira issue?
> > What
> > > do you think about such a granularity?
> > >
> > > Regarding the issue of " Q4: The implementaion of SlideRows still
> > > need a custom operator that collects records in a priority queue
> > > ordered by the "rowtime", which is similar to the design we
> > > discussed in FLINK-4697, right? "
> > > Why would you need this operator? The window buffer can act to some
> > extent
> > > as a priority queue as long as the trigger and evictor is set to
> > > work
> > based
> > > on the rowtime - or maybe I am missing something... Can you please
> > clarify
> > > this.
> > >
> > >
> > > Dr. Radu Tudoran
> > > Senior Research Engineer - Big Data Expert IT R&D Division
> > >
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > European Research Center
> > > Riesstrasse 25, 80992 München
> > >
> > > E-mail: [hidden email]
> > > Mobile: +49 15209084330
> > > Telephone: +49 891588344173
> > >
> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > This e-mail and its attachments contain confidential information from
> > > HUAWEI, which is intended only for the person or entity whose address
> is
> > > listed above. Any use of the information contained herein in any way
> > > (including, but not limited to, total or partial disclosure,
> > reproduction,
> > > or dissemination) by persons other than the intended recipient(s) is
> > > prohibited. If you receive this e-mail in error, please notify the
> sender
> > > by phone or email immediately and delete it!
> > >
> > >
> > > -----Original Message-----
> > > From: Jark Wu [mailto:[hidden email]]
> > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > To: [hidden email]
> > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > for
> > > streaming tables
> > >
> > > Hi Fabian,
> > >
> > > Thanks for bringing up this discussion and the nice approach to avoid
> > > overlapping contributions.
> > >
> > > All of these make sense to me. But I have some questions.
> > >
> > > Q1: If I understand correctly, we will not support TumbleRows and
> > > SessionRows at the beginning. But maybe support them as a syntax sugar
> > (in
> > > Table API) when the SlideRows is supported in the future. Right ?
> > >
> > > Q2: How to support SessionRows based on SlideRows ?  I don't get how to
> > > partition on "gap-separated".
> > >
> > > Q3: Should we break down the approach into smaller tasks for streaming
> > > tables and batch tables ?
> > >
> > > Q4: The implementaion of SlideRows still need a custom operator that
> > > collects records in a priority queue ordered by the "rowtime", which is
> > > similar to the design we discussed in FLINK-4697, right?
> > >
> > > +1 not support for OVER ROW for event time at this point.
> > >
> > > Regards, Jark
> > >
> > >
> > > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > > >
> > > > Hi,
> > > > We are also interested in streaming sql and very willing to
> participate
> > > and contribute.
> > > >
> > > > We are now in progress and we will also contribute to calcite to push
> > > forward the window and stream-join support.
> > > >
> > > >
> > > >
> > > > --------------
> > > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> 2017年1月24日
> > > > 5:55
> > > > Receiver: [hidden email]
> > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows
> > > > for streaming tables
> > > >
> > > > Hi Haohui,
> > > >
> > > > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > Unfortunately, this issue does not seem to be very active, so I don't
> > > know what the progress is.
> > > >
> > > > I would suggest to move the discussion about group windows to a
> > separate
> > > thread and keep this one focused on the organization of the SQL OVER
> > > windows.
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > >
> > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > > >
> > > >> Hi Fabian,
> > > >>
> > > >> FLINK-4692 has added the support for tumbling window and we are
> > > >> excited to try it out and expose it as a SQL construct.
> > > >>
> > > >> Just curious -- what's your thought on the SQL syntax on tumbling
> > > window?
> > > >>
> > > >> Implementation wise it might make sense to think tumbling window as
> a
> > > >> special case of the sliding window.
> > > >>
> > > >> The problem I see is that the OVER construct might be insufficient
> to
> > > >> support all the use cases of tumbling windows. For example, it fails
> > > >> to express tumbling windows that have fractional time units (as
> > > >> pointed out in http://calcite.apache.org/docs/stream.html).
> > > >>
> > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address this
> > > issue.
> > > >>
> > > >> Do you think it is a good idea to follow the same conventions? Your
> > > >> ideas are appreciated.
> > > >>
> > > >> Regards,
> > > >> Haohui
> > > >>
> > > >>
> > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> > wrote:
> > > >>
> > > >>> +1
> > > >>>
> > > >>> We are also quite interested in these features and would love to
> > > >>> participate and contribute.
> > > >>>
> > > >>> ~Haohui
> > > >>>
> > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]>
> > > wrote:
> > > >>>
> > > >>>> Hi everybody,
> > > >>>>
> > > >>>> it seems that currently several contributors are working on new
> > > >>>> features for the streaming Table API / SQL around row windows (as
> > > >>>> defined in
> > > >>>> FLIP-11
> > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> FLINK-4680,
> > > >>>> FLINK-5584).
> > > >>>> Since these efforts overlap quite a bit I spent some time thinking
> > > >>>> about how we can approach these features and how to avoid
> > > >>>> overlapping contributions.
> > > >>>>
> > > >>>> The challenge here is the following. Some of the Table API row
> > > >>>> windows
> > > >> as
> > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while other
> > > >>>> cannot be easily expressed as such (TumbleRows for row-count
> > > >>>> intervals, SessionRows).
> > > >>>> However, since Calcite already supports SQL OVER windows, we can
> > > >>>> reuse
> > > >> the
> > > >>>> optimization logic for some of the Table API row windows. I also
> > > >>>> thought about the semantics of the TumbleRows and SessionRows
> > > >>>> windows as defined in
> > > >>>> FLIP-11 and came to the conclusion that these are not well defined
> > > >>>> in
> > > >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> > > >>>> special PARTITION BY clause.
> > > >>>>
> > > >>>> I propose to approach SQL OVER windows and Table API row windows
> as
> > > >>>> follows:
> > > >>>>
> > > >>>> We start with three simple cases for SQL OVER windows (not Table
> > > >>>> API
> > > >> yet):
> > > >>>>
> > > >>>> * OVER RANGE for event time
> > > >>>> * OVER RANGE for processing time
> > > >>>> * OVER ROW for processing time
> > > >>>>
> > > >>>> All cases fulfill the following restrictions:
> > > >>>> - All aggregations in SELECT must refer to the same window.
> > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> > > >>>> marker function that indicates processing time. Additional sort
> > > >>>> attributes are not supported initially.
> > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN
> x
> > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > >>>>
> > > >>>> OVER ROW for event time cannot be easily supported. With event
> > > >>>> time, we may have late records which need to be injected into the
> > > >>>> order of records.
> > > >>>> When
> > > >>>> a record in injected in to the order where a row-count window has
> > > >> already
> > > >>>> been computed, this and all following windows will change. We
> could
> > > >> either
> > > >>>> drop the record or sent out many retraction records. I think it is
> > > >>>> best
> > > >> to
> > > >>>> not open this can of worms at this point.
> > > >>>>
> > > >>>> The rational for all of the above restrictions is to have first
> > > >>>> versions of OVER windows soon.
> > > >>>> Once we have the above cases covered we can extend and remove
> > > >> limitations
> > > >>>> as follows:
> > > >>>>
> > > >>>> - Table API SlideRow windows (with the same restrictions as
> above).
> > > >>>> This will be mostly API work since the execution part has been
> > solved
> > > before.
> > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > >>>> - Add support for different windows in SELECT. All windows must be
> > > >>>> partitioned and ordered in the same way.
> > > >>>> - Add support for additional ORDER BY attributes (besides time).
> > > >>>>
> > > >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11
> > > >>>> are
> > > >> not
> > > >>>> well defined, IMO.
> > > >>>> They can be expressed as SlideRows windows with special
> > > >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> > > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges for
> > > >>>> SessionRows) I would not start to work on those yet.
> > > >>>>
> > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> development
> > > >>>> of these
> > > >> features
> > > >>>> as outlined above with corresponding JIRA issues.
> > > >>>>
> > > >>>> What do others think? (I cc'ed the contributors assigned to the
> > > >>>> above
> > > >> JIRA
> > > >>>> issues)
> > > >>>>
> > > >>>> Best, Fabian
> > > >>>>
> > > >>>> [1]
> > > >>>>
> > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > >> 11%3A+Table+API+Stream+Aggregations
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

jincheng sun
In reply to this post by Fabian Hueske-2
Hi,Fabian

Thank you for your reply. Yes, regarding the point you have mentioned “to
improve the efficiency of the implementation”, I agree with you. I did not
clearly explain my solution in my last email. I intended to say the
existing window mechanism can be used for bounded preceding, but will not
work for unbounded preceding (alternatively we can use processFunction).
See the details of my thoughts on the solutions of OVER window as below:

   -[The first solution]:

    BOUNDED PRECEDING uses the existing window mechanism, while UNBOUNDED
PRECEDING uses ProcessFunction (however, ProcessFunction currently only
supports for KeyedStream, thereby only working for PartitionBy. We need
extend it to DataStream API). The pros and cons for this solution are:

    Pros:  it does not need a sorted state, therefore we can quickly
implement.

    Cons: performance is a concern here, as the same record has to be
included and copied to many windows. ( this is the same as using sliding
window).

-[The second solution]:

   For both BOUNDED and UNBOUNDED, we use ProcessFunction + Sorted State.
The pros and cons for this solution are:

   Pros: we can get good performance

   Cons: We need a sorted state support , and additional efforts to manage
the state, split the logic window, trigger the calculation. And we can not
leverage the current optimization of the window mechanism.

    I noticed recently, to better support aggregate, Stephan have added
some special optimizations on the window State and windowedStream API, such
as FLINK-5590(Create a proper internal state hierarchy) and  FLINK-5582(Add
a general distributive aggregate function). Similarly, to achieve the best
performance, can we add some special APIs for "dataStream / KeyedStream"
which is dedicated for OVER window?  Just my 2 cents.

Best,

SunJincheng

2017-01-24 22:25 GMT+08:00 Fabian Hueske <[hidden email]>:

> Hi SunJincheng,
>
> thanks a lot for your comments!
>
> regarding the suitability of DataStream sliding windows: You are right
> that UNBOUNDED PRECEDING windows cannot be implemented as DataStream
> sliding windows. The same is true for OVER RANGE windows.
> I think the only OVER windows that could be done with DataStream sliding
> windows are bounded OVER ROW windows (processing time). For the other
> window types, I was thinking about implementing them using a
> ProcessFunction. We might need additional support for sorted state to
> improve the efficiency of the implementation.
>
> The motivation to restrict the first version to not support FOLLOWING was
> to keep the implementation more simple. I am not convinced that the best
> solution to implement OVER windows is to use the DataStream window
> framework (window assigner, trigger, evictor). Using a more flexible
> framework (at the cost of additional implementation overhead) might pay off
> when we want to add more features.
>
> You are right, we probably need better support for sorted state. I think
> we will need this as well, when implementing the OVER RANGE windows which
> cannot be easily implemented in the DataStream window framework.
> A thorough design document is required here.
>
> Best,
> Fabian
>
>
> 2017-01-24 7:51 GMT+01:00 jincheng sun <[hidden email]>:
>
>> Hello Fabian,
>> Your plan looks good, I totally agree with your points.
>> While I am working on FLINK-4680, I had the similar concerns about the
>> semantics of TumbleRows and SessionRows. It is much clear if we define
>> these windows as SlideRows with PARTITION BY clause.
>> Regarding to the implementation plan of Table API row windows, I would
>> also like to share my ideas/thoughts on OVER window obtained while I am
>> developing FLINK-4680:
>>
>> - Table API SlideRow windows (with the same restrictions as above). This
>> will be mostly API work since the execution part has been solved before.
>> Though the sliding window can work for the bounded preceding, but it is
>> not sufficient to support unbounded preceding. For instance, we may
>> potentially use SlidingProcessingTimeWindows and ProcessingTimeTrigger to
>> implement “OVER RANGE for processing time”, but we still need to provide a
>> certain fixed window size, which is not proper for unbounded processing.
>> Same problems exist for ”OVER RANGE for event time“  and “OVER ROW for
>> processing time”. Therefore, we need a new window assigner and trigger for
>> unbounded preceding, say SlideRowGlobalWindows and
>> SlideRowGlobalWindowXXXTrigger. What do you think?
>>
>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>> If I understand you correctly, you want to implement the SlideRow windows
>> first without the support of FOLLOWING(I guess you want to leverage the
>> existing SlidingProcessing(Event)TimeWindows and
>> Processing(Event)TimeTrigger?). IMO, when we implement SlideRow windows,
>> we could just provide a new WindowAssigner and trigger, which can support
>> both bounded preceding and following semantics (current row is just a
>> special case of FOLLOWING where the following row is equal to 0). What do
>> you think?
>>
>> - Add support for additional ORDER BY attributes (besides time).
>> This is an important and a necessary part for OVER. But to achieve this,
>> we probably need a sorted state backend, maybe sortedMapstate? Is it also
>> included in your plan.
>>
>> Best,
>> SunJincheng
>>
>> 2017-01-23 23:30 GMT+08:00 Fabian Hueske <[hidden email]>:
>>
>>> Hi everybody,
>>>
>>> it seems that currently several contributors are working on new features
>>> for the streaming Table API / SQL around row windows (as defined in FLIP-11
>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, FLINK-4680,
>>> FLINK-5584).
>>> Since these efforts overlap quite a bit I spent some time thinking about
>>> how we can approach these features and how to avoid overlapping
>>> contributions.
>>>
>>> The challenge here is the following. Some of the Table API row windows
>>> as defined by FLIP-11 [1] are basically SQL OVER windows while other cannot
>>> be easily expressed as such (TumbleRows for row-count intervals,
>>> SessionRows).
>>> However, since Calcite already supports SQL OVER windows, we can reuse
>>> the optimization logic for some of the Table API row windows. I also
>>> thought about the semantics of the TumbleRows and SessionRows windows as
>>> defined in FLIP-11 and came to the conclusion that these are not well
>>> defined in FLIP-11 and should rather be defined as SlideRows windows with a
>>> special PARTITION BY clause.
>>>
>>> I propose to approach SQL OVER windows and Table API row windows as
>>> follows:
>>>
>>> We start with three simple cases for SQL OVER windows (not Table API
>>> yet):
>>>
>>> * OVER RANGE for event time
>>> * OVER RANGE for processing time
>>> * OVER ROW for processing time
>>>
>>> All cases fulfill the following restrictions:
>>> - All aggregations in SELECT must refer to the same window.
>>> - PARTITION BY may not contain the rowtime attribute.
>>> - ORDER BY must be on rowtime attribute (for event time) or on a marker
>>> function that indicates processing time. Additional sort attributes are not
>>> supported initially.
>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEEN x
>>> PRECEDING AND CURRENT ROW" are supported.
>>>
>>> OVER ROW for event time cannot be easily supported. With event time, we
>>> may have late records which need to be injected into the order of records.
>>> When a record in injected in to the order where a row-count window has
>>> already been computed, this and all following windows will change. We could
>>> either drop the record or sent out many retraction records. I think it is
>>> best to not open this can of worms at this point.
>>>
>>> The rational for all of the above restrictions is to have first versions
>>> of OVER windows soon.
>>> Once we have the above cases covered we can extend and remove
>>> limitations as follows:
>>>
>>> - Table API SlideRow windows (with the same restrictions as above). This
>>> will be mostly API work since the execution part has been solved before.
>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
>>> - Add support for different windows in SELECT. All windows must be
>>> partitioned and ordered in the same way.
>>> - Add support for additional ORDER BY attributes (besides time).
>>>
>>> As I said before, TumbleRows and SessionRows windows as in FLIP-11 are
>>> not well defined, IMO.
>>> They can be expressed as SlideRows windows with special partitioning
>>> (partitioning on fixed, non-overlapping time ranges for TumbleRows, and
>>> gap-separated, non-overlapping time ranges for SessionRows)
>>> I would not start to work on those yet.
>>>
>>> I would like to close all related JIRA issues (FLINK-4678, FLINK-4679,
>>> FLINK-4680, FLINK-5584) and restructure the development of these features
>>> as outlined above with corresponding JIRA issues.
>>>
>>> What do others think? (I cc'ed the contributors assigned to the above
>>> JIRA issues)
>>>
>>> Best, Fabian
>>>
>>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A
>>> +Table+API+Stream+Aggregations
>>>
>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Fabian Hueske-2
In reply to this post by Shaoxuan Wang
Hi everybody,

thanks for the great discussions so far. It's awesome to see so much
interest in this topic!

First, I'd like to comment on the development process for this feature and
later on the design of the runtime:

Dev Process
----
@Shaoxuan, I completely agree with you. We should first come up with good
designs for the runtime operators of the different window types. Once we
have that, we can start implementing the operators and integrate them with
Calcite's optimization. This will be an intermediate step and as a
byproduct give us support for SQL OVER windows. Once this is done, we can
extend the Table API and translate the Table API calls into the same
RelNodes as Calcite's SQL parser does.

Runtime Design
----
I think it makes sense to distinguish the different types of OVER windows
because they have different requirements which result in different runtime
implementations (with different implementation complexity and performance).
In a previous mail I proposed to split the support for OVER windows into
the following subtasks:

# bounded PRECEDING
- OVER ROWS for processing time
  - does not require sorted state (data always arrives in processing time
order)
  - no need to consider retraction (processing time is never late)
  - defines windows on row count.
  - A GlobalWindow with evictor + trigger might be the best implementation
(basically the same as DataStream.countWindow(long, long). We need to add
timeouts to clean up state for non-used keys though.

- OVER RANGE for processing time
  - does not require sorted state (data always arrives in processing time
order)
  - no need to consider retraction (processing time is never late)
  - defines windows on row count
  - I think this could also be implemented with a GlobalWindow with evictor
+ trigger (need to verify)

- OVER RANGE for event time
  - need for sorted state (late data possible)
  - IMO, a ProcessFunction gives us the most flexibility in adding later
features (retraction, update rate, etc.)
  - @Shaoxuan, you sketched a good design. Would you like to continue with
a design proposal?

# UNBOUNDED PRECEDING
Similar considerations apply for the UNBOUNDED PRECEDING cases of the above
window types.

If we all agree that the separation into six JIRAs (bounded/unbounded *
row-pt/range-pt/ range-et) makes sense, I would suggest to move the
discussions about the design of the implementation to the individual JIRAs.

What do think?

Best, Fabian

2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:

> Hi Liuxinchun,
> I am not sure where did you get the inception: anyone has suggested "to
> process Event time window in Sliding Row Window". If you were referring my
> post, there may be some misunderstanding there. I think you were asking the
> similar question as Hongyuhong. I have just replied to him. Please take a
> look and let me know if that makes sense to you. "Retraction" is an
> important building block to compute correct incremental results in
> streaming. It is another big topic, we should discuss this in another
> thread.
>
> Regards,
> Shaoxuan
>
>
>
> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]> wrote:
>
> > I don't think it is a good idea to process Event time window in Sliding
> > Row Window. In Sliding Time window, when an element is late, we can
> trigger
> > the recalculation of the related windows. And the sliding period is
> > coarse-gained, We only need to recalculate size/sliding number of
> windows.
> > But in Sliding Row Window, the calculation is triggered when every
> element
> > is coming. The sliding period is becoming fine-gained. When an element is
> > late, there are so many "windows" are influenced. Even if we store all
> the
> > raw data, the computation is very large.
> >
> > I think if it is possible to set a standard to sliding Event Time Row
> > Window, When certain elements are late, we can only recalculate partial
> > windows and permit some error. For example, we can only recalculate the
> > windows end in range between (lateElement.timestamp - leftDelta,
> > lateElement.timestamp] and those windows begin in range between
> > [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > ////////////////////////////////////////////////////////////
> > //////////////////////////
> >  Hi everyone,
> > Thanks for this great discussion, and glad to see more and more people
> are
> > interested on stream SQL & tableAPI.
> >
> > IMO, the key problems for Over window design are the SQL semantics and
> the
> > runtime design. I totally agree with Fabian that we should skip the
> design
> > of TumbleRows and SessionRows windows for now, as they are not well
> defined
> > in SQL semantics.
> >
> > Runtime design is the most crucial part we are interested in and
> > volunteered to contribute into. We have thousands of machines running
> flink
> > streaming jobs. The costs in terms of CPU, memory, and state are the
> vital
> > factors that we have to taken into account. We have been working on the
> > design of OVER window in the past months, and planning to send out a
> > detailed design doc to DEV quite soon. But since Fabian started a good
> > discussion on OVER window, I would like to share our ideas/thoughts about
> > the runtime design for OVER window.
> >
> >    1. As SunJincheng pointed out earlier, sliding window does not work
> for
> >    unbounded preceding, we need alternative approach for unbound over
> > window.
> >    2. Though sliding window may work for some cases of bounded window,
> >    it is not very efficient thereby should not be used for production. To
> > the
> >    best of my understanding, the current runtime implementation of
> sliding
> >    window has not leveraged the concepts of state Panes yet. This means
> > that
> >    if we use sliding window for OVER window,  there will be a backend
> state
> >    created per each group (partition by) and each row, and whenever a new
> >    record arrives, it will be accumulated to all the existing windows
> that
> > has
> >    not been closed. This would cause quite a lot of overhead in terms of
> > both
> >    CPU and memory&state.
> >    3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> and
> >    a “sortedState”. I like this idea. The design details on this are not
> > quite
> >    clear yet. So I would like to add more thoughts on this. Regardless
> >    which dataStream API we are going to use (it is very likely that we
> need
> >    a new API), we should come out with an optimal approach. The purpose
> of
> >    grouping window and over window is to partition the data, such that we
> > can
> >    generate the aggregate results. So when we talk about the design of
> OVER
> >    window, we have to think about the aggregates. As we proposed in our
> > recent
> >    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator will
> be
> >    stored in the aggregate state. Besides accumulator, we have also
> > introduced
> >    a retract API for UDAGG. With aggregate accumulator and retract API, I
> > am
> >    proposing a runtime approach to implement the OVER window as
> followings.
> >    4.
> >       - We first implement a sorted state interface
> >       - Per each group, we just create one sorted state. When a new
> record
> >       arrives, it will insert into this sorted state, in the meanwhile it
> > will be
> >       accumulated to the aggregate accumulator.
> >       - For over window, we keep the aggregate accumulator for the entire
> >       job lifelong time. This is different than the case where we delete
> > the
> >       accumulator for each group/window when a grouping-window is
> finished.
> >       - When an over window is up to trigger, we grab the
> >       previous accumulator from the state and accumulate values onto it
> > with all
> >       the records till the upperBoundary of the current window, and
> > retract all
> >       the out of scope records till its lowerBoundary. We emit the
> >       aggregate result and save the accumulator for the next window.
> >
> >
> > Hello Fabian,
> > I would suggest we should first start working on runtime design of over
> > window and aggregate. Once we have a good design there, one can easily
> add
> > the support for SQL as well as tableAPI. What do you think?
> >
> > Regards,
> > Shaoxuan
> >
> > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> > > Hi Radu,
> > >
> > > thanks for your comments!
> > >
> > > Yes, my intention is to open new JIRA issues to structure the
> > > development process. Everybody is very welcome to pick up issues and
> > > discuss the design proposals.
> > > At the moment I see the following six issues to start with:
> > >
> > > - streaming SQL OVER ROW for processing time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > - streaming SQL OVER RANGE for processing time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > - streaming SQL OVER RANGE for event time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > For each of these windows we need corresponding translation rules and
> > > execution code.
> > >
> > > Subsequent JIRAs would be
> > > - extending the Table API for supported SQL windows
> > > - add support for FOLLOWING
> > > - etc.
> > >
> > > Regarding the requirement for a sorted state. I am not sure if the
> > > OVER windows should be implemented using Flink's DataStream window
> > framework.
> > > We need a good design document to figure out what is the best
> > > approach. A ProcessFunction with a sorted state might be a good
> solution
> > as well.
> > >
> > > Best, Fabian
> > >
> > >
> > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for starting these discussion - it is very useful.
> > > > It does make sense indeed to refactor all these and coordinate a bit
> > > > the efforts not to have overlapping implementations and incompatible
> > > solutions.
> > > >
> > > > If you close the 3 jira issues you mentioned - do you plan to
> > > > redesign them and open new ones? Do you need help from our side - we
> > > > can also pick the redesign of some of these new jira issues. For
> > > > example we already
> > > have
> > > > an implementation for this and we can help with the design.
> > > > Nevertheless, let's coordinate the effort.
> > > >
> > > > Regarding the support for the different types of window - I think
> > > > the
> > > best
> > > > option is to split the implementation in small units. We can easily
> > > > do
> > > this
> > > > from the transformation rule class and with this each particular
> > > > type of window (session/sliding/sliderows/processing time/...) will
> > > > have a clear implementation and a corresponding architecture within
> > the jira issue?
> > > What
> > > > do you think about such a granularity?
> > > >
> > > > Regarding the issue of " Q4: The implementaion of SlideRows still
> > > > need a custom operator that collects records in a priority queue
> > > > ordered by the "rowtime", which is similar to the design we
> > > > discussed in FLINK-4697, right? "
> > > > Why would you need this operator? The window buffer can act to some
> > > extent
> > > > as a priority queue as long as the trigger and evictor is set to
> > > > work
> > > based
> > > > on the rowtime - or maybe I am missing something... Can you please
> > > clarify
> > > > this.
> > > >
> > > >
> > > > Dr. Radu Tudoran
> > > > Senior Research Engineer - Big Data Expert IT R&D Division
> > > >
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > European Research Center
> > > > Riesstrasse 25, 80992 München
> > > >
> > > > E-mail: [hidden email]
> > > > Mobile: +49 15209084330
> > > > Telephone: +49 891588344173
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > > > This e-mail and its attachments contain confidential information from
> > > > HUAWEI, which is intended only for the person or entity whose address
> > is
> > > > listed above. Any use of the information contained herein in any way
> > > > (including, but not limited to, total or partial disclosure,
> > > reproduction,
> > > > or dissemination) by persons other than the intended recipient(s) is
> > > > prohibited. If you receive this e-mail in error, please notify the
> > sender
> > > > by phone or email immediately and delete it!
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Jark Wu [mailto:[hidden email]]
> > > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > > To: [hidden email]
> > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> Windows
> > > for
> > > > streaming tables
> > > >
> > > > Hi Fabian,
> > > >
> > > > Thanks for bringing up this discussion and the nice approach to avoid
> > > > overlapping contributions.
> > > >
> > > > All of these make sense to me. But I have some questions.
> > > >
> > > > Q1: If I understand correctly, we will not support TumbleRows and
> > > > SessionRows at the beginning. But maybe support them as a syntax
> sugar
> > > (in
> > > > Table API) when the SlideRows is supported in the future. Right ?
> > > >
> > > > Q2: How to support SessionRows based on SlideRows ?  I don't get how
> to
> > > > partition on "gap-separated".
> > > >
> > > > Q3: Should we break down the approach into smaller tasks for
> streaming
> > > > tables and batch tables ?
> > > >
> > > > Q4: The implementaion of SlideRows still need a custom operator that
> > > > collects records in a priority queue ordered by the "rowtime", which
> is
> > > > similar to the design we discussed in FLINK-4697, right?
> > > >
> > > > +1 not support for OVER ROW for event time at this point.
> > > >
> > > > Regards, Jark
> > > >
> > > >
> > > > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > > > >
> > > > > Hi,
> > > > > We are also interested in streaming sql and very willing to
> > participate
> > > > and contribute.
> > > > >
> > > > > We are now in progress and we will also contribute to calcite to
> push
> > > > forward the window and stream-join support.
> > > > >
> > > > >
> > > > >
> > > > > --------------
> > > > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> > 2017年1月24日
> > > > > 5:55
> > > > > Receiver: [hidden email]
> > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> Windows
> > > > > for streaming tables
> > > > >
> > > > > Hi Haohui,
> > > > >
> > > > > our plan was in fact to piggy-back on Calcite and use the TUMBLE
> > > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > > Unfortunately, this issue does not seem to be very active, so I
> don't
> > > > know what the progress is.
> > > > >
> > > > > I would suggest to move the discussion about group windows to a
> > > separate
> > > > thread and keep this one focused on the organization of the SQL OVER
> > > > windows.
> > > > >
> > > > > Best,
> > > > > Fabian
> > > > >
> > > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > > >
> > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > > > >
> > > > >> Hi Fabian,
> > > > >>
> > > > >> FLINK-4692 has added the support for tumbling window and we are
> > > > >> excited to try it out and expose it as a SQL construct.
> > > > >>
> > > > >> Just curious -- what's your thought on the SQL syntax on tumbling
> > > > window?
> > > > >>
> > > > >> Implementation wise it might make sense to think tumbling window
> as
> > a
> > > > >> special case of the sliding window.
> > > > >>
> > > > >> The problem I see is that the OVER construct might be insufficient
> > to
> > > > >> support all the use cases of tumbling windows. For example, it
> fails
> > > > >> to express tumbling windows that have fractional time units (as
> > > > >> pointed out in http://calcite.apache.org/docs/stream.html).
> > > > >>
> > > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address
> this
> > > > issue.
> > > > >>
> > > > >> Do you think it is a good idea to follow the same conventions?
> Your
> > > > >> ideas are appreciated.
> > > > >>
> > > > >> Regards,
> > > > >> Haohui
> > > > >>
> > > > >>
> > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <[hidden email]>
> > > wrote:
> > > > >>
> > > > >>> +1
> > > > >>>
> > > > >>> We are also quite interested in these features and would love to
> > > > >>> participate and contribute.
> > > > >>>
> > > > >>> ~Haohui
> > > > >>>
> > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske <[hidden email]
> >
> > > > wrote:
> > > > >>>
> > > > >>>> Hi everybody,
> > > > >>>>
> > > > >>>> it seems that currently several contributors are working on new
> > > > >>>> features for the streaming Table API / SQL around row windows
> (as
> > > > >>>> defined in
> > > > >>>> FLIP-11
> > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > FLINK-4680,
> > > > >>>> FLINK-5584).
> > > > >>>> Since these efforts overlap quite a bit I spent some time
> thinking
> > > > >>>> about how we can approach these features and how to avoid
> > > > >>>> overlapping contributions.
> > > > >>>>
> > > > >>>> The challenge here is the following. Some of the Table API row
> > > > >>>> windows
> > > > >> as
> > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> other
> > > > >>>> cannot be easily expressed as such (TumbleRows for row-count
> > > > >>>> intervals, SessionRows).
> > > > >>>> However, since Calcite already supports SQL OVER windows, we can
> > > > >>>> reuse
> > > > >> the
> > > > >>>> optimization logic for some of the Table API row windows. I also
> > > > >>>> thought about the semantics of the TumbleRows and SessionRows
> > > > >>>> windows as defined in
> > > > >>>> FLIP-11 and came to the conclusion that these are not well
> defined
> > > > >>>> in
> > > > >>>> FLIP-11 and should rather be defined as SlideRows windows with a
> > > > >>>> special PARTITION BY clause.
> > > > >>>>
> > > > >>>> I propose to approach SQL OVER windows and Table API row windows
> > as
> > > > >>>> follows:
> > > > >>>>
> > > > >>>> We start with three simple cases for SQL OVER windows (not Table
> > > > >>>> API
> > > > >> yet):
> > > > >>>>
> > > > >>>> * OVER RANGE for event time
> > > > >>>> * OVER RANGE for processing time
> > > > >>>> * OVER ROW for processing time
> > > > >>>>
> > > > >>>> All cases fulfill the following restrictions:
> > > > >>>> - All aggregations in SELECT must refer to the same window.
> > > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or on a
> > > > >>>> marker function that indicates processing time. Additional sort
> > > > >>>> attributes are not supported initially.
> > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> "BETWEEN
> > x
> > > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > > >>>>
> > > > >>>> OVER ROW for event time cannot be easily supported. With event
> > > > >>>> time, we may have late records which need to be injected into
> the
> > > > >>>> order of records.
> > > > >>>> When
> > > > >>>> a record in injected in to the order where a row-count window
> has
> > > > >> already
> > > > >>>> been computed, this and all following windows will change. We
> > could
> > > > >> either
> > > > >>>> drop the record or sent out many retraction records. I think it
> is
> > > > >>>> best
> > > > >> to
> > > > >>>> not open this can of worms at this point.
> > > > >>>>
> > > > >>>> The rational for all of the above restrictions is to have first
> > > > >>>> versions of OVER windows soon.
> > > > >>>> Once we have the above cases covered we can extend and remove
> > > > >> limitations
> > > > >>>> as follows:
> > > > >>>>
> > > > >>>> - Table API SlideRow windows (with the same restrictions as
> > above).
> > > > >>>> This will be mostly API work since the execution part has been
> > > solved
> > > > before.
> > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > > >>>> - Add support for different windows in SELECT. All windows must
> be
> > > > >>>> partitioned and ordered in the same way.
> > > > >>>> - Add support for additional ORDER BY attributes (besides time).
> > > > >>>>
> > > > >>>> As I said before, TumbleRows and SessionRows windows as in
> FLIP-11
> > > > >>>> are
> > > > >> not
> > > > >>>> well defined, IMO.
> > > > >>>> They can be expressed as SlideRows windows with special
> > > > >>>> partitioning (partitioning on fixed, non-overlapping time ranges
> > > > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges
> for
> > > > >>>> SessionRows) I would not start to work on those yet.
> > > > >>>>
> > > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > development
> > > > >>>> of these
> > > > >> features
> > > > >>>> as outlined above with corresponding JIRA issues.
> > > > >>>>
> > > > >>>> What do others think? (I cc'ed the contributors assigned to the
> > > > >>>> above
> > > > >> JIRA
> > > > >>>> issues)
> > > > >>>>
> > > > >>>> Best, Fabian
> > > > >>>>
> > > > >>>> [1]
> > > > >>>>
> > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > >> 11%3A+Table+API+Stream+Aggregations
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

RE: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Radu Tudoran
Hi,

I think it is a good idea to open these proposed Jira issues.
I also like the approach to separate the usage of windows(globalwindows) and other operators based on the specific type of translation.
One question regarding the design of these: should we build one translation rule for each such implementation or we construct one general one for windows and havea factory method to bring up the right operator implementation. In the later case perhaps it make sense that we create from the beginning an abstract class / interface that can be accepted and run by the WindowRunner.
What do you think?

-----Original Message-----
From: Fabian Hueske [mailto:[hidden email]]
Sent: Wednesday, January 25, 2017 10:55 AM
To: [hidden email]
Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables

Hi everybody,

thanks for the great discussions so far. It's awesome to see so much interest in this topic!

First, I'd like to comment on the development process for this feature and later on the design of the runtime:

Dev Process
----
@Shaoxuan, I completely agree with you. We should first come up with good designs for the runtime operators of the different window types. Once we have that, we can start implementing the operators and integrate them with Calcite's optimization. This will be an intermediate step and as a byproduct give us support for SQL OVER windows. Once this is done, we can extend the Table API and translate the Table API calls into the same RelNodes as Calcite's SQL parser does.

Runtime Design
----
I think it makes sense to distinguish the different types of OVER windows because they have different requirements which result in different runtime implementations (with different implementation complexity and performance).
In a previous mail I proposed to split the support for OVER windows into the following subtasks:

# bounded PRECEDING
- OVER ROWS for processing time
  - does not require sorted state (data always arrives in processing time
order)
  - no need to consider retraction (processing time is never late)
  - defines windows on row count.
  - A GlobalWindow with evictor + trigger might be the best implementation (basically the same as DataStream.countWindow(long, long). We need to add timeouts to clean up state for non-used keys though.

- OVER RANGE for processing time
  - does not require sorted state (data always arrives in processing time
order)
  - no need to consider retraction (processing time is never late)
  - defines windows on row count
  - I think this could also be implemented with a GlobalWindow with evictor
+ trigger (need to verify)

- OVER RANGE for event time
  - need for sorted state (late data possible)
  - IMO, a ProcessFunction gives us the most flexibility in adding later features (retraction, update rate, etc.)
  - @Shaoxuan, you sketched a good design. Would you like to continue with a design proposal?

# UNBOUNDED PRECEDING
Similar considerations apply for the UNBOUNDED PRECEDING cases of the above window types.

If we all agree that the separation into six JIRAs (bounded/unbounded * row-pt/range-pt/ range-et) makes sense, I would suggest to move the discussions about the design of the implementation to the individual JIRAs.

What do think?

Best, Fabian

2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <[hidden email]>:

> Hi Liuxinchun,
> I am not sure where did you get the inception: anyone has suggested
> "to process Event time window in Sliding Row Window". If you were
> referring my post, there may be some misunderstanding there. I think
> you were asking the similar question as Hongyuhong. I have just
> replied to him. Please take a look and let me know if that makes sense
> to you. "Retraction" is an important building block to compute correct
> incremental results in streaming. It is another big topic, we should
> discuss this in another thread.
>
> Regards,
> Shaoxuan
>
>
>
> On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <[hidden email]> wrote:
>
> > I don't think it is a good idea to process Event time window in
> > Sliding Row Window. In Sliding Time window, when an element is late,
> > we can
> trigger
> > the recalculation of the related windows. And the sliding period is
> > coarse-gained, We only need to recalculate size/sliding number of
> windows.
> > But in Sliding Row Window, the calculation is triggered when every
> element
> > is coming. The sliding period is becoming fine-gained. When an
> > element is late, there are so many "windows" are influenced. Even if
> > we store all
> the
> > raw data, the computation is very large.
> >
> > I think if it is possible to set a standard to sliding Event Time
> > Row Window, When certain elements are late, we can only recalculate
> > partial windows and permit some error. For example, we can only
> > recalculate the windows end in range between (lateElement.timestamp
> > - leftDelta, lateElement.timestamp] and those windows begin in range
> > between [lateElement.timestamp, lateElement.timestamp + rightDelta).
> > ////////////////////////////////////////////////////////////
> > //////////////////////////
> >  Hi everyone,
> > Thanks for this great discussion, and glad to see more and more
> > people
> are
> > interested on stream SQL & tableAPI.
> >
> > IMO, the key problems for Over window design are the SQL semantics
> > and
> the
> > runtime design. I totally agree with Fabian that we should skip the
> design
> > of TumbleRows and SessionRows windows for now, as they are not well
> defined
> > in SQL semantics.
> >
> > Runtime design is the most crucial part we are interested in and
> > volunteered to contribute into. We have thousands of machines
> > running
> flink
> > streaming jobs. The costs in terms of CPU, memory, and state are the
> vital
> > factors that we have to taken into account. We have been working on
> > the design of OVER window in the past months, and planning to send
> > out a detailed design doc to DEV quite soon. But since Fabian
> > started a good discussion on OVER window, I would like to share our
> > ideas/thoughts about the runtime design for OVER window.
> >
> >    1. As SunJincheng pointed out earlier, sliding window does not
> > work
> for
> >    unbounded preceding, we need alternative approach for unbound
> > over window.
> >    2. Though sliding window may work for some cases of bounded window,
> >    it is not very efficient thereby should not be used for
> > production. To the
> >    best of my understanding, the current runtime implementation of
> sliding
> >    window has not leveraged the concepts of state Panes yet. This
> > means that
> >    if we use sliding window for OVER window,  there will be a
> > backend
> state
> >    created per each group (partition by) and each row, and whenever a new
> >    record arrives, it will be accumulated to all the existing
> > windows
> that
> > has
> >    not been closed. This would cause quite a lot of overhead in
> > terms of both
> >    CPU and memory&state.
> >    3. Fabian has mentioned an approach of leveraging “ProcessFunction”
> and
> >    a “sortedState”. I like this idea. The design details on this are
> > not quite
> >    clear yet. So I would like to add more thoughts on this. Regardless
> >    which dataStream API we are going to use (it is very likely that
> > we
> need
> >    a new API), we should come out with an optimal approach. The
> > purpose
> of
> >    grouping window and over window is to partition the data, such
> > that we can
> >    generate the aggregate results. So when we talk about the design
> > of
> OVER
> >    window, we have to think about the aggregates. As we proposed in
> > our recent
> >    UDAGG doc https://goo.gl/6ntclB,  the user defined accumulator
> > will
> be
> >    stored in the aggregate state. Besides accumulator, we have also
> > introduced
> >    a retract API for UDAGG. With aggregate accumulator and retract
> > API, I am
> >    proposing a runtime approach to implement the OVER window as
> followings.
> >    4.
> >       - We first implement a sorted state interface
> >       - Per each group, we just create one sorted state. When a new
> record
> >       arrives, it will insert into this sorted state, in the
> > meanwhile it will be
> >       accumulated to the aggregate accumulator.
> >       - For over window, we keep the aggregate accumulator for the entire
> >       job lifelong time. This is different than the case where we
> > delete the
> >       accumulator for each group/window when a grouping-window is
> finished.
> >       - When an over window is up to trigger, we grab the
> >       previous accumulator from the state and accumulate values onto
> > it with all
> >       the records till the upperBoundary of the current window, and
> > retract all
> >       the out of scope records till its lowerBoundary. We emit the
> >       aggregate result and save the accumulator for the next window.
> >
> >
> > Hello Fabian,
> > I would suggest we should first start working on runtime design of
> > over window and aggregate. Once we have a good design there, one can
> > easily
> add
> > the support for SQL as well as tableAPI. What do you think?
> >
> > Regards,
> > Shaoxuan
> >
> > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> > > Hi Radu,
> > >
> > > thanks for your comments!
> > >
> > > Yes, my intention is to open new JIRA issues to structure the
> > > development process. Everybody is very welcome to pick up issues
> > > and discuss the design proposals.
> > > At the moment I see the following six issues to start with:
> > >
> > > - streaming SQL OVER ROW for processing time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > - streaming SQL OVER RANGE for processing time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > - streaming SQL OVER RANGE for event time
> > >   - bounded PRECEDING
> > >   - unbounded PRECEDING
> > >
> > > For each of these windows we need corresponding translation rules
> > > and execution code.
> > >
> > > Subsequent JIRAs would be
> > > - extending the Table API for supported SQL windows
> > > - add support for FOLLOWING
> > > - etc.
> > >
> > > Regarding the requirement for a sorted state. I am not sure if the
> > > OVER windows should be implemented using Flink's DataStream window
> > framework.
> > > We need a good design document to figure out what is the best
> > > approach. A ProcessFunction with a sorted state might be a good
> solution
> > as well.
> > >
> > > Best, Fabian
> > >
> > >
> > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <[hidden email]>:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for starting these discussion - it is very useful.
> > > > It does make sense indeed to refactor all these and coordinate a
> > > > bit the efforts not to have overlapping implementations and
> > > > incompatible
> > > solutions.
> > > >
> > > > If you close the 3 jira issues you mentioned - do you plan to
> > > > redesign them and open new ones? Do you need help from our side
> > > > - we can also pick the redesign of some of these new jira
> > > > issues. For example we already
> > > have
> > > > an implementation for this and we can help with the design.
> > > > Nevertheless, let's coordinate the effort.
> > > >
> > > > Regarding the support for the different types of window - I
> > > > think the
> > > best
> > > > option is to split the implementation in small units. We can
> > > > easily do
> > > this
> > > > from the transformation rule class and with this each particular
> > > > type of window (session/sliding/sliderows/processing time/...)
> > > > will have a clear implementation and a corresponding
> > > > architecture within
> > the jira issue?
> > > What
> > > > do you think about such a granularity?
> > > >
> > > > Regarding the issue of " Q4: The implementaion of SlideRows
> > > > still need a custom operator that collects records in a priority
> > > > queue ordered by the "rowtime", which is similar to the design
> > > > we discussed in FLINK-4697, right? "
> > > > Why would you need this operator? The window buffer can act to
> > > > some
> > > extent
> > > > as a priority queue as long as the trigger and evictor is set to
> > > > work
> > > based
> > > > on the rowtime - or maybe I am missing something... Can you
> > > > please
> > > clarify
> > > > this.
> > > >
> > > >
> > > > Dr. Radu Tudoran
> > > > Senior Research Engineer - Big Data Expert IT R&D Division
> > > >
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center
> > > > Riesstrasse 25, 80992 München
> > > >
> > > > E-mail: [hidden email]
> > > > Mobile: +49 15209084330
> > > > Telephone: +49 891588344173
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
> > > > Düsseldorf, Germany, www.huawei.com Registered Office:
> > > > Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der
> > > > Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail
> > > > and its attachments contain confidential information from
> > > > HUAWEI, which is intended only for the person or entity whose
> > > > address
> > is
> > > > listed above. Any use of the information contained herein in any
> > > > way (including, but not limited to, total or partial disclosure,
> > > reproduction,
> > > > or dissemination) by persons other than the intended
> > > > recipient(s) is prohibited. If you receive this e-mail in error,
> > > > please notify the
> > sender
> > > > by phone or email immediately and delete it!
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Jark Wu [mailto:[hidden email]]
> > > > Sent: Tuesday, January 24, 2017 6:53 AM
> > > > To: [hidden email]
> > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row
> Windows
> > > for
> > > > streaming tables
> > > >
> > > > Hi Fabian,
> > > >
> > > > Thanks for bringing up this discussion and the nice approach to
> > > > avoid overlapping contributions.
> > > >
> > > > All of these make sense to me. But I have some questions.
> > > >
> > > > Q1: If I understand correctly, we will not support TumbleRows
> > > > and SessionRows at the beginning. But maybe support them as a
> > > > syntax
> sugar
> > > (in
> > > > Table API) when the SlideRows is supported in the future. Right ?
> > > >
> > > > Q2: How to support SessionRows based on SlideRows ?  I don't get
> > > > how
> to
> > > > partition on "gap-separated".
> > > >
> > > > Q3: Should we break down the approach into smaller tasks for
> streaming
> > > > tables and batch tables ?
> > > >
> > > > Q4: The implementaion of SlideRows still need a custom operator
> > > > that collects records in a priority queue ordered by the
> > > > "rowtime", which
> is
> > > > similar to the design we discussed in FLINK-4697, right?
> > > >
> > > > +1 not support for OVER ROW for event time at this point.
> > > >
> > > > Regards, Jark
> > > >
> > > >
> > > > > 在 2017年1月24日,上午10:28,Hongyuhong <[hidden email]> 写道:
> > > > >
> > > > > Hi,
> > > > > We are also interested in streaming sql and very willing to
> > participate
> > > > and contribute.
> > > > >
> > > > > We are now in progress and we will also contribute to calcite
> > > > > to
> push
> > > > forward the window and stream-join support.
> > > > >
> > > > >
> > > > >
> > > > > --------------
> > > > > Sender: Fabian Hueske [mailto:[hidden email]] Send Time:
> > 2017年1月24日
> > > > > 5:55
> > > > > Receiver: [hidden email]
> > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row
> Windows
> > > > > for streaming tables
> > > > >
> > > > > Hi Haohui,
> > > > >
> > > > > our plan was in fact to piggy-back on Calcite and use the
> > > > > TUMBLE
> > > > function [1] once is it is available (CALCITE-1345 [2]).
> > > > > Unfortunately, this issue does not seem to be very active, so
> > > > > I
> don't
> > > > know what the progress is.
> > > > >
> > > > > I would suggest to move the discussion about group windows to
> > > > > a
> > > separate
> > > > thread and keep this one focused on the organization of the SQL
> > > > OVER windows.
> > > > >
> > > > > Best,
> > > > > Fabian
> > > > >
> > > > > [1] http://calcite.apache.org/docs/stream.html)
> > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345
> > > > >
> > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <[hidden email]>:
> > > > >
> > > > >> Hi Fabian,
> > > > >>
> > > > >> FLINK-4692 has added the support for tumbling window and we
> > > > >> are excited to try it out and expose it as a SQL construct.
> > > > >>
> > > > >> Just curious -- what's your thought on the SQL syntax on
> > > > >> tumbling
> > > > window?
> > > > >>
> > > > >> Implementation wise it might make sense to think tumbling
> > > > >> window
> as
> > a
> > > > >> special case of the sliding window.
> > > > >>
> > > > >> The problem I see is that the OVER construct might be
> > > > >> insufficient
> > to
> > > > >> support all the use cases of tumbling windows. For example,
> > > > >> it
> fails
> > > > >> to express tumbling windows that have fractional time units
> > > > >> (as pointed out in http://calcite.apache.org/docs/stream.html).
> > > > >>
> > > > >> It looks to me that the Calcite / Azure Stream Analytics have
> > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to
> > > > >> address
> this
> > > > issue.
> > > > >>
> > > > >> Do you think it is a good idea to follow the same conventions?
> Your
> > > > >> ideas are appreciated.
> > > > >>
> > > > >> Regards,
> > > > >> Haohui
> > > > >>
> > > > >>
> > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai
> > > > >> <[hidden email]>
> > > wrote:
> > > > >>
> > > > >>> +1
> > > > >>>
> > > > >>> We are also quite interested in these features and would
> > > > >>> love to participate and contribute.
> > > > >>>
> > > > >>> ~Haohui
> > > > >>>
> > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske
> > > > >>> <[hidden email]
> >
> > > > wrote:
> > > > >>>
> > > > >>>> Hi everybody,
> > > > >>>>
> > > > >>>> it seems that currently several contributors are working on
> > > > >>>> new features for the streaming Table API / SQL around row
> > > > >>>> windows
> (as
> > > > >>>> defined in
> > > > >>>> FLIP-11
> > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679,
> > FLINK-4680,
> > > > >>>> FLINK-5584).
> > > > >>>> Since these efforts overlap quite a bit I spent some time
> thinking
> > > > >>>> about how we can approach these features and how to avoid
> > > > >>>> overlapping contributions.
> > > > >>>>
> > > > >>>> The challenge here is the following. Some of the Table API
> > > > >>>> row windows
> > > > >> as
> > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while
> other
> > > > >>>> cannot be easily expressed as such (TumbleRows for
> > > > >>>> row-count intervals, SessionRows).
> > > > >>>> However, since Calcite already supports SQL OVER windows,
> > > > >>>> we can reuse
> > > > >> the
> > > > >>>> optimization logic for some of the Table API row windows. I
> > > > >>>> also thought about the semantics of the TumbleRows and
> > > > >>>> SessionRows windows as defined in
> > > > >>>> FLIP-11 and came to the conclusion that these are not well
> defined
> > > > >>>> in
> > > > >>>> FLIP-11 and should rather be defined as SlideRows windows
> > > > >>>> with a special PARTITION BY clause.
> > > > >>>>
> > > > >>>> I propose to approach SQL OVER windows and Table API row
> > > > >>>> windows
> > as
> > > > >>>> follows:
> > > > >>>>
> > > > >>>> We start with three simple cases for SQL OVER windows (not
> > > > >>>> Table API
> > > > >> yet):
> > > > >>>>
> > > > >>>> * OVER RANGE for event time
> > > > >>>> * OVER RANGE for processing time
> > > > >>>> * OVER ROW for processing time
> > > > >>>>
> > > > >>>> All cases fulfill the following restrictions:
> > > > >>>> - All aggregations in SELECT must refer to the same window.
> > > > >>>> - PARTITION BY may not contain the rowtime attribute.
> > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or
> > > > >>>> on a marker function that indicates processing time.
> > > > >>>> Additional sort attributes are not supported initially.
> > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and
> "BETWEEN
> > x
> > > > >>>> PRECEDING AND CURRENT ROW" are supported.
> > > > >>>>
> > > > >>>> OVER ROW for event time cannot be easily supported. With
> > > > >>>> event time, we may have late records which need to be
> > > > >>>> injected into
> the
> > > > >>>> order of records.
> > > > >>>> When
> > > > >>>> a record in injected in to the order where a row-count
> > > > >>>> window
> has
> > > > >> already
> > > > >>>> been computed, this and all following windows will change.
> > > > >>>> We
> > could
> > > > >> either
> > > > >>>> drop the record or sent out many retraction records. I
> > > > >>>> think it
> is
> > > > >>>> best
> > > > >> to
> > > > >>>> not open this can of worms at this point.
> > > > >>>>
> > > > >>>> The rational for all of the above restrictions is to have
> > > > >>>> first versions of OVER windows soon.
> > > > >>>> Once we have the above cases covered we can extend and
> > > > >>>> remove
> > > > >> limitations
> > > > >>>> as follows:
> > > > >>>>
> > > > >>>> - Table API SlideRow windows (with the same restrictions as
> > above).
> > > > >>>> This will be mostly API work since the execution part has
> > > > >>>> been
> > > solved
> > > > before.
> > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING)
> > > > >>>> - Add support for different windows in SELECT. All windows
> > > > >>>> must
> be
> > > > >>>> partitioned and ordered in the same way.
> > > > >>>> - Add support for additional ORDER BY attributes (besides time).
> > > > >>>>
> > > > >>>> As I said before, TumbleRows and SessionRows windows as in
> FLIP-11
> > > > >>>> are
> > > > >> not
> > > > >>>> well defined, IMO.
> > > > >>>> They can be expressed as SlideRows windows with special
> > > > >>>> partitioning (partitioning on fixed, non-overlapping time
> > > > >>>> ranges for TumbleRows, and gap-separated, non-overlapping
> > > > >>>> time ranges
> for
> > > > >>>> SessionRows) I would not start to work on those yet.
> > > > >>>>
> > > > >>>> I would like to close all related JIRA issues (FLINK-4678,
> > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the
> > development
> > > > >>>> of these
> > > > >> features
> > > > >>>> as outlined above with corresponding JIRA issues.
> > > > >>>>
> > > > >>>> What do others think? (I cc'ed the contributors assigned to
> > > > >>>> the above
> > > > >> JIRA
> > > > >>>> issues)
> > > > >>>>
> > > > >>>> Best, Fabian
> > > > >>>>
> > > > >>>> [1]
> > > > >>>>
> > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > > > >> 11%3A+Table+API+Stream+Aggregations
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
>
12