Hi all!
Fabian and I worked on a FLIP for Stream Aggregations in the Table API. You can find the FLIP-11 here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations Motivation for the FLIP: The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support: - Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group. - Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows. Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables. We are looking forward to your feedback. Timo |
Hi Jark,
you had asked for non-windowed aggregates in the Table API a few times. FLIP-11 proposes row-window aggregates which are a generalization of running aggregates (SlideRow unboundedPreceding). Can you have a look at the FLIP and give feedback whether this is what you are looking for? Improvement suggestions are very welcome as well. Thank you, Fabian 2016-09-01 16:12 GMT+02:00 Timo Walther <[hidden email]>: > Hi all! > > Fabian and I worked on a FLIP for Stream Aggregations in the Table API. > You can find the FLIP-11 here: > > <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% > 3A+Table+API+Stream+Aggregations > > Motivation for the FLIP: > > The Table API is a declarative API to define queries on static and > streaming tables. So far, only projection, selection, and union are > supported operations on streaming tables. > > This FLIP proposes to add support for different types of aggregations on > top of streaming tables. In particular, we seek to support: > > - Group-window aggregates, i.e., aggregates which are computed for a group > of elements. A (time or row-count) window is required to bound the infinite > input stream into a finite group. > > - Row-window aggregates, i.e., aggregates which are computed for each row, > based on a window (range) of preceding and succeeding rows. > Each type of aggregate shall be supported on keyed/grouped or > non-keyed/grouped data streams for streaming tables as well as batch tables. > > We are looking forward to your feedback. > > Timo > |
Hi all,
I thought about the API of the FLIP again. If we allow the "systemtime" attribute, we cannot implement a nice method chaining where the user can define a "allowLateness" only on event time. So even if the user expressed that "systemtime" is used we have to offer a "allowLateness" method because we have to assume that this attribute can also be the batch event time column, which is not very nice. class TumblingWindow(size: Expression) extends Window { def on(timeField: Expression): TumblingEventTimeWindow = new TumblingEventTimeWindow(alias, timeField, size) // has allowLateness() method } What do you think? Timo Am 05/09/16 um 10:41 schrieb Fabian Hueske: > Hi Jark, > > you had asked for non-windowed aggregates in the Table API a few times. > FLIP-11 proposes row-window aggregates which are a generalization of > running aggregates (SlideRow unboundedPreceding). > > Can you have a look at the FLIP and give feedback whether this is what you > are looking for? > Improvement suggestions are very welcome as well. > > Thank you, > Fabian > > 2016-09-01 16:12 GMT+02:00 Timo Walther <[hidden email]>: > >> Hi all! >> >> Fabian and I worked on a FLIP for Stream Aggregations in the Table API. >> You can find the FLIP-11 here: >> >> <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% >> 3A+Table+API+Stream+Aggregations >> >> Motivation for the FLIP: >> >> The Table API is a declarative API to define queries on static and >> streaming tables. So far, only projection, selection, and union are >> supported operations on streaming tables. >> >> This FLIP proposes to add support for different types of aggregations on >> top of streaming tables. In particular, we seek to support: >> >> - Group-window aggregates, i.e., aggregates which are computed for a group >> of elements. A (time or row-count) window is required to bound the infinite >> input stream into a finite group. >> >> - Row-window aggregates, i.e., aggregates which are computed for each row, >> based on a window (range) of preceding and succeeding rows. >> Each type of aggregate shall be supported on keyed/grouped or >> non-keyed/grouped data streams for streaming tables as well as batch tables. >> >> We are looking forward to your feedback. >> >> Timo >> -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
Hi all,
I'm on vacation for about five days , sorry to have missed this great FLIP. Yes, the non-windowed aggregates is a special case of row-window. And the proposal looks really good. Can we have a simplified form for the special case? Such as : table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…) can be simplified to table.groupBy(‘a).select(…). The latter will actually call the former. Another question is about the rowtime. As the FLIP said, DataStream and StreamTableSource is responsible to assign timestamps and watermarks, furthermore “rowtime” and “systemtime” are not real column. IMO, it is different with Calcite’s rowtime, which is a real column in the table. In FLIP's way, we will lose some flexibility. Because the timestamp column may be created after some transformations or join operation, not created at beginning. So why do we have to define rowtime at beginning? (because of watermark?) Can we have a way to define rowtime after source table like TimestampAssinger? Regarding to “allowLateness” method. I come up a trick that we can make ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API will looks like this : window(Tumble over 10.minutes on rowtime allowLateness as ‘w) The implementation will look like this: class TumblingWindow(size: Expression) extends Window { def on(time: rowtime.type): TumblingEventTimeWindow = new TumblingEventTimeWindow(alias, ‘rowtime, size) // has allowLateness() method def on(time: systemtime.type): TumblingProcessingTimeWindow= new TumblingProcessingTimeWindow(alias, ‘systemtime, size) // hasn’t allowLateness() method } object rowtime object systemtime What do you think about this? - Jark Wu > 在 2016年9月6日,下午11:00,Timo Walther <[hidden email]> 写道: > > Hi all, > > I thought about the API of the FLIP again. If we allow the "systemtime" attribute, we cannot implement a nice method chaining where the user can define a "allowLateness" only on event time. So even if the user expressed that "systemtime" is used we have to offer a "allowLateness" method because we have to assume that this attribute can also be the batch event time column, which is not very nice. > > class TumblingWindow(size: Expression) extends Window { > def on(timeField: Expression): TumblingEventTimeWindow = > new TumblingEventTimeWindow(alias, timeField, size) // has allowLateness() method > } > > What do you think? > > Timo > > > Am 05/09/16 um 10:41 schrieb Fabian Hueske: >> Hi Jark, >> >> you had asked for non-windowed aggregates in the Table API a few times. >> FLIP-11 proposes row-window aggregates which are a generalization of >> running aggregates (SlideRow unboundedPreceding). >> >> Can you have a look at the FLIP and give feedback whether this is what you >> are looking for? >> Improvement suggestions are very welcome as well. >> >> Thank you, >> Fabian >> >> 2016-09-01 16:12 GMT+02:00 Timo Walther <[hidden email]>: >> >>> Hi all! >>> >>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API. >>> You can find the FLIP-11 here: >>> >>> <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% >>> 3A+Table+API+Stream+Aggregations >>> >>> Motivation for the FLIP: >>> >>> The Table API is a declarative API to define queries on static and >>> streaming tables. So far, only projection, selection, and union are >>> supported operations on streaming tables. >>> >>> This FLIP proposes to add support for different types of aggregations on >>> top of streaming tables. In particular, we seek to support: >>> >>> - Group-window aggregates, i.e., aggregates which are computed for a group >>> of elements. A (time or row-count) window is required to bound the infinite >>> input stream into a finite group. >>> >>> - Row-window aggregates, i.e., aggregates which are computed for each row, >>> based on a window (range) of preceding and succeeding rows. >>> Each type of aggregate shall be supported on keyed/grouped or >>> non-keyed/grouped data streams for streaming tables as well as batch tables. >>> >>> We are looking forward to your feedback. >>> >>> Timo >>> > > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr |
Hi,
thanks for your comments and questions! Actually, you are bringing up the points that Timo and I discussed the most when designing the FLIP ;-) - We also thought about the syntactic shortcut for running aggregates like you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow this shortcut is to prevent users from accidentally performing a "dangerous" operation. The problem with unbounded sliding row-windows is that their state does never expire. If you have an evolving key space, you will likely run into problems at some point because the operator state grows too large. IMO, a row-window session is a better approach, because it defines a timeout after which state can be discarded. groupBy.select is a very common operation in batch but its semantics in streaming are very different. In my opinion it makes sense to make users aware of these differences through the API. - Reassigning timestamps and watermarks is a very delicate issue. You are right, that Calcite exposes this field which is necessary due to the semantics of SQL. However, also in Calcite you cannot freely choose the timestamp attribute for streaming queries (it must be a monotone or quasi-monotone attribute) which is hard to reason about (and guarantee) after a few operators have been applied. Streaming tables in Flink will likely have a time attribute which is identical to the initial rowtime. However, Flink does modify timestamps internally, e.g., for records that are emitted from time windows, in order to ensure that consecutive windows perform as expected. Modify or reassign timestamps in the middle of a job can result in unexpected results which are very hard to reason about. Do you have a concrete use case in mind for reassigning timestamps? - The idea to represent rowtime and systime as object is good. Our motivation to go for reserved Scala symbols was to have a uniform syntax with windows over streaming and batch tables. On batch tables you can compute time windows basically over every time attribute (they are treated similar to grouping attributes with a bit of extra logic to extract the grouping key for sliding and session windows). If you write window(Tumble over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate event-time. On a batch table with a 'rowtime attribute, the same operator would be internally converted into a group by. By going for the object approach we would lose this compatibility (or would need to introduce an additional column attribute to specifiy the window attribute for batch tables). As usual some of the design decisions are based on preferences. Do they make sense to you? Let me know what you think. Best, Fabian 2016-09-07 5:12 GMT+02:00 Jark Wu <[hidden email]>: > Hi all, > > I'm on vacation for about five days , sorry to have missed this great FLIP. > > Yes, the non-windowed aggregates is a special case of row-window. And the > proposal looks really good. Can we have a simplified form for the special > case? Such as : table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…) > can be simplified to table.groupBy(‘a).select(…). The latter will actually > call the former. > > Another question is about the rowtime. As the FLIP said, DataStream and > StreamTableSource is responsible to assign timestamps and watermarks, > furthermore “rowtime” and “systemtime” are not real column. IMO, it is > different with Calcite’s rowtime, which is a real column in the table. In > FLIP's way, we will lose some flexibility. Because the timestamp column may > be created after some transformations or join operation, not created at > beginning. So why do we have to define rowtime at beginning? (because of > watermark?) Can we have a way to define rowtime after source table like > TimestampAssinger? > > Regarding to “allowLateness” method. I come up a trick that we can make > ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API > will looks like this : > > window(Tumble over 10.minutes on rowtime allowLateness as ‘w) > > The implementation will look like this: > > class TumblingWindow(size: Expression) extends Window { > def on(time: rowtime.type): TumblingEventTimeWindow = > new TumblingEventTimeWindow(alias, ‘rowtime, size) // has > allowLateness() method > > def on(time: systemtime.type): TumblingProcessingTimeWindow= > new TumblingProcessingTimeWindow(alias, ‘systemtime, size) > // hasn’t allowLateness() method > } > object rowtime > object systemtime > > What do you think about this? > > - Jark Wu > > > 在 2016年9月6日,下午11:00,Timo Walther <[hidden email]> 写道: > > > > Hi all, > > > > I thought about the API of the FLIP again. If we allow the "systemtime" > attribute, we cannot implement a nice method chaining where the user can > define a "allowLateness" only on event time. So even if the user expressed > that "systemtime" is used we have to offer a "allowLateness" method because > we have to assume that this attribute can also be the batch event time > column, which is not very nice. > > > > class TumblingWindow(size: Expression) extends Window { > > def on(timeField: Expression): TumblingEventTimeWindow = > > new TumblingEventTimeWindow(alias, timeField, size) // has > allowLateness() method > > } > > > > What do you think? > > > > Timo > > > > > > Am 05/09/16 um 10:41 schrieb Fabian Hueske: > >> Hi Jark, > >> > >> you had asked for non-windowed aggregates in the Table API a few times. > >> FLIP-11 proposes row-window aggregates which are a generalization of > >> running aggregates (SlideRow unboundedPreceding). > >> > >> Can you have a look at the FLIP and give feedback whether this is what > you > >> are looking for? > >> Improvement suggestions are very welcome as well. > >> > >> Thank you, > >> Fabian > >> > >> 2016-09-01 16:12 GMT+02:00 Timo Walther <[hidden email]>: > >> > >>> Hi all! > >>> > >>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API. > >>> You can find the FLIP-11 here: > >>> > >>> <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% > >>> 3A+Table+API+Stream+Aggregations > >>> > >>> Motivation for the FLIP: > >>> > >>> The Table API is a declarative API to define queries on static and > >>> streaming tables. So far, only projection, selection, and union are > >>> supported operations on streaming tables. > >>> > >>> This FLIP proposes to add support for different types of aggregations > on > >>> top of streaming tables. In particular, we seek to support: > >>> > >>> - Group-window aggregates, i.e., aggregates which are computed for a > group > >>> of elements. A (time or row-count) window is required to bound the > infinite > >>> input stream into a finite group. > >>> > >>> - Row-window aggregates, i.e., aggregates which are computed for each > row, > >>> based on a window (range) of preceding and succeeding rows. > >>> Each type of aggregate shall be supported on keyed/grouped or > >>> non-keyed/grouped data streams for streaming tables as well as batch > tables. > >>> > >>> We are looking forward to your feedback. > >>> > >>> Timo > >>> > > > > > > -- > > Freundliche Grüße / Kind Regards > > > > Timo Walther > > > > Follow me: @twalthr > > https://www.linkedin.com/in/twalthr > > |
Hi Fabian,
Thanks for sharing your ideas. They all make sense to me. Regarding to reassigning timestamp, I do not have an use case. I come up with this because DataStream has a TimestampAssigner :) +1 for this FLIP. - Jark Wu > 在 2016年9月7日,下午2:59,Fabian Hueske <[hidden email]> 写道: > > Hi, > > thanks for your comments and questions! > Actually, you are bringing up the points that Timo and I discussed the most > when designing the FLIP ;-) > > - We also thought about the syntactic shortcut for running aggregates like > you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow > this shortcut is to prevent users from accidentally performing a > "dangerous" operation. The problem with unbounded sliding row-windows is > that their state does never expire. If you have an evolving key space, you > will likely run into problems at some point because the operator state > grows too large. IMO, a row-window session is a better approach, because it > defines a timeout after which state can be discarded. groupBy.select is a > very common operation in batch but its semantics in streaming are very > different. In my opinion it makes sense to make users aware of these > differences through the API. > > - Reassigning timestamps and watermarks is a very delicate issue. You are > right, that Calcite exposes this field which is necessary due to the > semantics of SQL. However, also in Calcite you cannot freely choose the > timestamp attribute for streaming queries (it must be a monotone or > quasi-monotone attribute) which is hard to reason about (and guarantee) > after a few operators have been applied. Streaming tables in Flink will > likely have a time attribute which is identical to the initial rowtime. > However, Flink does modify timestamps internally, e.g., for records that > are emitted from time windows, in order to ensure that consecutive windows > perform as expected. Modify or reassign timestamps in the middle of a job > can result in unexpected results which are very hard to reason about. Do > you have a concrete use case in mind for reassigning timestamps? > > - The idea to represent rowtime and systime as object is good. Our > motivation to go for reserved Scala symbols was to have a uniform syntax > with windows over streaming and batch tables. On batch tables you can > compute time windows basically over every time attribute (they are treated > similar to grouping attributes with a bit of extra logic to extract the > grouping key for sliding and session windows). If you write window(Tumble > over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate > event-time. On a batch table with a 'rowtime attribute, the same operator > would be internally converted into a group by. By going for the object > approach we would lose this compatibility (or would need to introduce an > additional column attribute to specifiy the window attribute for batch > tables). > > As usual some of the design decisions are based on preferences. > Do they make sense to you? Let me know what you think. > > Best, Fabian > > > 2016-09-07 5:12 GMT+02:00 Jark Wu <[hidden email]>: > >> Hi all, >> >> I'm on vacation for about five days , sorry to have missed this great FLIP. >> >> Yes, the non-windowed aggregates is a special case of row-window. And the >> proposal looks really good. Can we have a simplified form for the special >> case? Such as : table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…) >> can be simplified to table.groupBy(‘a).select(…). The latter will actually >> call the former. >> >> Another question is about the rowtime. As the FLIP said, DataStream and >> StreamTableSource is responsible to assign timestamps and watermarks, >> furthermore “rowtime” and “systemtime” are not real column. IMO, it is >> different with Calcite’s rowtime, which is a real column in the table. In >> FLIP's way, we will lose some flexibility. Because the timestamp column may >> be created after some transformations or join operation, not created at >> beginning. So why do we have to define rowtime at beginning? (because of >> watermark?) Can we have a way to define rowtime after source table like >> TimestampAssinger? >> >> Regarding to “allowLateness” method. I come up a trick that we can make >> ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API >> will looks like this : >> >> window(Tumble over 10.minutes on rowtime allowLateness as ‘w) >> >> The implementation will look like this: >> >> class TumblingWindow(size: Expression) extends Window { >> def on(time: rowtime.type): TumblingEventTimeWindow = >> new TumblingEventTimeWindow(alias, ‘rowtime, size) // has >> allowLateness() method >> >> def on(time: systemtime.type): TumblingProcessingTimeWindow= >> new TumblingProcessingTimeWindow(alias, ‘systemtime, size) >> // hasn’t allowLateness() method >> } >> object rowtime >> object systemtime >> >> What do you think about this? >> >> - Jark Wu >> >>> 在 2016年9月6日,下午11:00,Timo Walther <[hidden email]> 写道: >>> >>> Hi all, >>> >>> I thought about the API of the FLIP again. If we allow the "systemtime" >> attribute, we cannot implement a nice method chaining where the user can >> define a "allowLateness" only on event time. So even if the user expressed >> that "systemtime" is used we have to offer a "allowLateness" method because >> we have to assume that this attribute can also be the batch event time >> column, which is not very nice. >>> >>> class TumblingWindow(size: Expression) extends Window { >>> def on(timeField: Expression): TumblingEventTimeWindow = >>> new TumblingEventTimeWindow(alias, timeField, size) // has >> allowLateness() method >>> } >>> >>> What do you think? >>> >>> Timo >>> >>> >>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: >>>> Hi Jark, >>>> >>>> you had asked for non-windowed aggregates in the Table API a few times. >>>> FLIP-11 proposes row-window aggregates which are a generalization of >>>> running aggregates (SlideRow unboundedPreceding). >>>> >>>> Can you have a look at the FLIP and give feedback whether this is what >> you >>>> are looking for? >>>> Improvement suggestions are very welcome as well. >>>> >>>> Thank you, >>>> Fabian >>>> >>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <[hidden email]>: >>>> >>>>> Hi all! >>>>> >>>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API. >>>>> You can find the FLIP-11 here: >>>>> >>>>> <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% >>>>> 3A+Table+API+Stream+Aggregations >>>>> >>>>> Motivation for the FLIP: >>>>> >>>>> The Table API is a declarative API to define queries on static and >>>>> streaming tables. So far, only projection, selection, and union are >>>>> supported operations on streaming tables. >>>>> >>>>> This FLIP proposes to add support for different types of aggregations >> on >>>>> top of streaming tables. In particular, we seek to support: >>>>> >>>>> - Group-window aggregates, i.e., aggregates which are computed for a >> group >>>>> of elements. A (time or row-count) window is required to bound the >> infinite >>>>> input stream into a finite group. >>>>> >>>>> - Row-window aggregates, i.e., aggregates which are computed for each >> row, >>>>> based on a window (range) of preceding and succeeding rows. >>>>> Each type of aggregate shall be supported on keyed/grouped or >>>>> non-keyed/grouped data streams for streaming tables as well as batch >> tables. >>>>> >>>>> We are looking forward to your feedback. >>>>> >>>>> Timo >>>>> >>> >>> >>> -- >>> Freundliche Grüße / Kind Regards >>> >>> Timo Walther >>> >>> Follow me: @twalthr >>> https://www.linkedin.com/in/twalthr >> >> |
Hi all,
It seems that there’s no objections to the window design. So could we open subtasks to start working on it now ? - Jark Wu > 在 2016年9月7日,下午4:29,Jark Wu <[hidden email]> 写道: > > Hi Fabian, > > Thanks for sharing your ideas. > > They all make sense to me. Regarding to reassigning timestamp, I do not have an use case. I come up with this because DataStream has a TimestampAssigner :) > > +1 for this FLIP. > > - Jark Wu > >> 在 2016年9月7日,下午2:59,Fabian Hueske <[hidden email] <mailto:[hidden email]>> 写道: >> >> Hi, >> >> thanks for your comments and questions! >> Actually, you are bringing up the points that Timo and I discussed the most >> when designing the FLIP ;-) >> >> - We also thought about the syntactic shortcut for running aggregates like >> you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow >> this shortcut is to prevent users from accidentally performing a >> "dangerous" operation. The problem with unbounded sliding row-windows is >> that their state does never expire. If you have an evolving key space, you >> will likely run into problems at some point because the operator state >> grows too large. IMO, a row-window session is a better approach, because it >> defines a timeout after which state can be discarded. groupBy.select is a >> very common operation in batch but its semantics in streaming are very >> different. In my opinion it makes sense to make users aware of these >> differences through the API. >> >> - Reassigning timestamps and watermarks is a very delicate issue. You are >> right, that Calcite exposes this field which is necessary due to the >> semantics of SQL. However, also in Calcite you cannot freely choose the >> timestamp attribute for streaming queries (it must be a monotone or >> quasi-monotone attribute) which is hard to reason about (and guarantee) >> after a few operators have been applied. Streaming tables in Flink will >> likely have a time attribute which is identical to the initial rowtime. >> However, Flink does modify timestamps internally, e.g., for records that >> are emitted from time windows, in order to ensure that consecutive windows >> perform as expected. Modify or reassign timestamps in the middle of a job >> can result in unexpected results which are very hard to reason about. Do >> you have a concrete use case in mind for reassigning timestamps? >> >> - The idea to represent rowtime and systime as object is good. Our >> motivation to go for reserved Scala symbols was to have a uniform syntax >> with windows over streaming and batch tables. On batch tables you can >> compute time windows basically over every time attribute (they are treated >> similar to grouping attributes with a bit of extra logic to extract the >> grouping key for sliding and session windows). If you write window(Tumble >> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate >> event-time. On a batch table with a 'rowtime attribute, the same operator >> would be internally converted into a group by. By going for the object >> approach we would lose this compatibility (or would need to introduce an >> additional column attribute to specifiy the window attribute for batch >> tables). >> >> As usual some of the design decisions are based on preferences. >> Do they make sense to you? Let me know what you think. >> >> Best, Fabian >> >> >> 2016-09-07 5:12 GMT+02:00 Jark Wu <[hidden email] <mailto:[hidden email]>>: >> >>> Hi all, >>> >>> I'm on vacation for about five days , sorry to have missed this great FLIP. >>> >>> Yes, the non-windowed aggregates is a special case of row-window. And the >>> proposal looks really good. Can we have a simplified form for the special >>> case? Such as : table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…) >>> can be simplified to table.groupBy(‘a).select(…). The latter will actually >>> call the former. >>> >>> Another question is about the rowtime. As the FLIP said, DataStream and >>> StreamTableSource is responsible to assign timestamps and watermarks, >>> furthermore “rowtime” and “systemtime” are not real column. IMO, it is >>> different with Calcite’s rowtime, which is a real column in the table. In >>> FLIP's way, we will lose some flexibility. Because the timestamp column may >>> be created after some transformations or join operation, not created at >>> beginning. So why do we have to define rowtime at beginning? (because of >>> watermark?) Can we have a way to define rowtime after source table like >>> TimestampAssinger? >>> >>> Regarding to “allowLateness” method. I come up a trick that we can make >>> ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API >>> will looks like this : >>> >>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w) >>> >>> The implementation will look like this: >>> >>> class TumblingWindow(size: Expression) extends Window { >>> def on(time: rowtime.type): TumblingEventTimeWindow = >>> new TumblingEventTimeWindow(alias, ‘rowtime, size) // has >>> allowLateness() method >>> >>> def on(time: systemtime.type): TumblingProcessingTimeWindow= >>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size) >>> // hasn’t allowLateness() method >>> } >>> object rowtime >>> object systemtime >>> >>> What do you think about this? >>> >>> - Jark Wu >>> >>>> 在 2016年9月6日,下午11:00,Timo Walther <[hidden email] <mailto:[hidden email]>> 写道: >>>> >>>> Hi all, >>>> >>>> I thought about the API of the FLIP again. If we allow the "systemtime" >>> attribute, we cannot implement a nice method chaining where the user can >>> define a "allowLateness" only on event time. So even if the user expressed >>> that "systemtime" is used we have to offer a "allowLateness" method because >>> we have to assume that this attribute can also be the batch event time >>> column, which is not very nice. >>>> >>>> class TumblingWindow(size: Expression) extends Window { >>>> def on(timeField: Expression): TumblingEventTimeWindow = >>>> new TumblingEventTimeWindow(alias, timeField, size) // has >>> allowLateness() method >>>> } >>>> >>>> What do you think? >>>> >>>> Timo >>>> >>>> >>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: >>>>> Hi Jark, >>>>> >>>>> you had asked for non-windowed aggregates in the Table API a few times. >>>>> FLIP-11 proposes row-window aggregates which are a generalization of >>>>> running aggregates (SlideRow unboundedPreceding). >>>>> >>>>> Can you have a look at the FLIP and give feedback whether this is what >>> you >>>>> are looking for? >>>>> Improvement suggestions are very welcome as well. >>>>> >>>>> Thank you, >>>>> Fabian >>>>> >>>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <[hidden email] <mailto:[hidden email]>>: >>>>> >>>>>> Hi all! >>>>>> >>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API. >>>>>> You can find the FLIP-11 here: >>>>>> >>>>>> <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% <<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%> >>>>>> 3A+Table+API+Stream+Aggregations >>>>>> >>>>>> Motivation for the FLIP: >>>>>> >>>>>> The Table API is a declarative API to define queries on static and >>>>>> streaming tables. So far, only projection, selection, and union are >>>>>> supported operations on streaming tables. >>>>>> >>>>>> This FLIP proposes to add support for different types of aggregations >>> on >>>>>> top of streaming tables. In particular, we seek to support: >>>>>> >>>>>> - Group-window aggregates, i.e., aggregates which are computed for a >>> group >>>>>> of elements. A (time or row-count) window is required to bound the >>> infinite >>>>>> input stream into a finite group. >>>>>> >>>>>> - Row-window aggregates, i.e., aggregates which are computed for each >>> row, >>>>>> based on a window (range) of preceding and succeeding rows. >>>>>> Each type of aggregate shall be supported on keyed/grouped or >>>>>> non-keyed/grouped data streams for streaming tables as well as batch >>> tables. >>>>>> >>>>>> We are looking forward to your feedback. >>>>>> >>>>>> Timo >>>>>> >>>> >>>> >>>> -- >>>> Freundliche Grüße / Kind Regards >>>> >>>> Timo Walther >>>> >>>> Follow me: @twalthr >>>> https://www.linkedin.com/in/twalthr <https://www.linkedin.com/in/twalthr> >>> >>> > |
Hi Jark,
yes I think enough time has passed. We can start implementing the changes. What do you think Fabian? If there are no objections, I will create the subtasks in Jira today. For FLIP-11/1 I already have implemented a prototype, I just have to do some refactoring/documentation before opening a PR. Timo Am 18/09/16 um 04:46 schrieb Jark Wu: > Hi all, > > It seems that there’s no objections to the window design. So could we open subtasks to start working on it now ? > > - Jark Wu > >> 在 2016年9月7日,下午4:29,Jark Wu <[hidden email]> 写道: >> >> Hi Fabian, >> >> Thanks for sharing your ideas. >> >> They all make sense to me. Regarding to reassigning timestamp, I do not have an use case. I come up with this because DataStream has a TimestampAssigner :) >> >> +1 for this FLIP. >> >> - Jark Wu >> >>> 在 2016年9月7日,下午2:59,Fabian Hueske <[hidden email] <mailto:[hidden email]>> 写道: >>> >>> Hi, >>> >>> thanks for your comments and questions! >>> Actually, you are bringing up the points that Timo and I discussed the most >>> when designing the FLIP ;-) >>> >>> - We also thought about the syntactic shortcut for running aggregates like >>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow >>> this shortcut is to prevent users from accidentally performing a >>> "dangerous" operation. The problem with unbounded sliding row-windows is >>> that their state does never expire. If you have an evolving key space, you >>> will likely run into problems at some point because the operator state >>> grows too large. IMO, a row-window session is a better approach, because it >>> defines a timeout after which state can be discarded. groupBy.select is a >>> very common operation in batch but its semantics in streaming are very >>> different. In my opinion it makes sense to make users aware of these >>> differences through the API. >>> >>> - Reassigning timestamps and watermarks is a very delicate issue. You are >>> right, that Calcite exposes this field which is necessary due to the >>> semantics of SQL. However, also in Calcite you cannot freely choose the >>> timestamp attribute for streaming queries (it must be a monotone or >>> quasi-monotone attribute) which is hard to reason about (and guarantee) >>> after a few operators have been applied. Streaming tables in Flink will >>> likely have a time attribute which is identical to the initial rowtime. >>> However, Flink does modify timestamps internally, e.g., for records that >>> are emitted from time windows, in order to ensure that consecutive windows >>> perform as expected. Modify or reassign timestamps in the middle of a job >>> can result in unexpected results which are very hard to reason about. Do >>> you have a concrete use case in mind for reassigning timestamps? >>> >>> - The idea to represent rowtime and systime as object is good. Our >>> motivation to go for reserved Scala symbols was to have a uniform syntax >>> with windows over streaming and batch tables. On batch tables you can >>> compute time windows basically over every time attribute (they are treated >>> similar to grouping attributes with a bit of extra logic to extract the >>> grouping key for sliding and session windows). If you write window(Tumble >>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate >>> event-time. On a batch table with a 'rowtime attribute, the same operator >>> would be internally converted into a group by. By going for the object >>> approach we would lose this compatibility (or would need to introduce an >>> additional column attribute to specifiy the window attribute for batch >>> tables). >>> >>> As usual some of the design decisions are based on preferences. >>> Do they make sense to you? Let me know what you think. >>> >>> Best, Fabian >>> >>> >>> 2016-09-07 5:12 GMT+02:00 Jark Wu <[hidden email] <mailto:[hidden email]>>: >>> >>>> Hi all, >>>> >>>> I'm on vacation for about five days , sorry to have missed this great FLIP. >>>> >>>> Yes, the non-windowed aggregates is a special case of row-window. And the >>>> proposal looks really good. Can we have a simplified form for the special >>>> case? Such as : table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…) >>>> can be simplified to table.groupBy(‘a).select(…). The latter will actually >>>> call the former. >>>> >>>> Another question is about the rowtime. As the FLIP said, DataStream and >>>> StreamTableSource is responsible to assign timestamps and watermarks, >>>> furthermore “rowtime” and “systemtime” are not real column. IMO, it is >>>> different with Calcite’s rowtime, which is a real column in the table. In >>>> FLIP's way, we will lose some flexibility. Because the timestamp column may >>>> be created after some transformations or join operation, not created at >>>> beginning. So why do we have to define rowtime at beginning? (because of >>>> watermark?) Can we have a way to define rowtime after source table like >>>> TimestampAssinger? >>>> >>>> Regarding to “allowLateness” method. I come up a trick that we can make >>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API >>>> will looks like this : >>>> >>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w) >>>> >>>> The implementation will look like this: >>>> >>>> class TumblingWindow(size: Expression) extends Window { >>>> def on(time: rowtime.type): TumblingEventTimeWindow = >>>> new TumblingEventTimeWindow(alias, ‘rowtime, size) // has >>>> allowLateness() method >>>> >>>> def on(time: systemtime.type): TumblingProcessingTimeWindow= >>>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size) >>>> // hasn’t allowLateness() method >>>> } >>>> object rowtime >>>> object systemtime >>>> >>>> What do you think about this? >>>> >>>> - Jark Wu >>>> >>>>> 在 2016年9月6日,下午11:00,Timo Walther <[hidden email] <mailto:[hidden email]>> 写道: >>>>> >>>>> Hi all, >>>>> >>>>> I thought about the API of the FLIP again. If we allow the "systemtime" >>>> attribute, we cannot implement a nice method chaining where the user can >>>> define a "allowLateness" only on event time. So even if the user expressed >>>> that "systemtime" is used we have to offer a "allowLateness" method because >>>> we have to assume that this attribute can also be the batch event time >>>> column, which is not very nice. >>>>> class TumblingWindow(size: Expression) extends Window { >>>>> def on(timeField: Expression): TumblingEventTimeWindow = >>>>> new TumblingEventTimeWindow(alias, timeField, size) // has >>>> allowLateness() method >>>>> } >>>>> >>>>> What do you think? >>>>> >>>>> Timo >>>>> >>>>> >>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: >>>>>> Hi Jark, >>>>>> >>>>>> you had asked for non-windowed aggregates in the Table API a few times. >>>>>> FLIP-11 proposes row-window aggregates which are a generalization of >>>>>> running aggregates (SlideRow unboundedPreceding). >>>>>> >>>>>> Can you have a look at the FLIP and give feedback whether this is what >>>> you >>>>>> are looking for? >>>>>> Improvement suggestions are very welcome as well. >>>>>> >>>>>> Thank you, >>>>>> Fabian >>>>>> >>>>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <[hidden email] <mailto:[hidden email]>>: >>>>>> >>>>>>> Hi all! >>>>>>> >>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API. >>>>>>> You can find the FLIP-11 here: >>>>>>> >>>>>>> <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% <<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%> >>>>>>> 3A+Table+API+Stream+Aggregations >>>>>>> >>>>>>> Motivation for the FLIP: >>>>>>> >>>>>>> The Table API is a declarative API to define queries on static and >>>>>>> streaming tables. So far, only projection, selection, and union are >>>>>>> supported operations on streaming tables. >>>>>>> >>>>>>> This FLIP proposes to add support for different types of aggregations >>>> on >>>>>>> top of streaming tables. In particular, we seek to support: >>>>>>> >>>>>>> - Group-window aggregates, i.e., aggregates which are computed for a >>>> group >>>>>>> of elements. A (time or row-count) window is required to bound the >>>> infinite >>>>>>> input stream into a finite group. >>>>>>> >>>>>>> - Row-window aggregates, i.e., aggregates which are computed for each >>>> row, >>>>>>> based on a window (range) of preceding and succeeding rows. >>>>>>> Each type of aggregate shall be supported on keyed/grouped or >>>>>>> non-keyed/grouped data streams for streaming tables as well as batch >>>> tables. >>>>>>> We are looking forward to your feedback. >>>>>>> >>>>>>> Timo >>>>>>> >>>>> >>>>> -- >>>>> Freundliche Grüße / Kind Regards >>>>> >>>>> Timo Walther >>>>> >>>>> Follow me: @twalthr >>>>> https://www.linkedin.com/in/twalthr <https://www.linkedin.com/in/twalthr> >>>> > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
Hi everybody,
Timo proposed our FLIP-11 a bit more than three weeks ago. I will update the status of the FLIP to accepted. Thanks, Fabian 2016-09-19 9:16 GMT+02:00 Timo Walther <[hidden email]>: > Hi Jark, > > yes I think enough time has passed. We can start implementing the changes. > What do you think Fabian? > > If there are no objections, I will create the subtasks in Jira today. For > FLIP-11/1 I already have implemented a prototype, I just have to do some > refactoring/documentation before opening a PR. > > Timo > > > Am 18/09/16 um 04:46 schrieb Jark Wu: > > Hi all, >> >> It seems that there’s no objections to the window design. So could we >> open subtasks to start working on it now ? >> >> - Jark Wu >> >> 在 2016年9月7日,下午4:29,Jark Wu <[hidden email]> 写道: >>> >>> Hi Fabian, >>> >>> Thanks for sharing your ideas. >>> >>> They all make sense to me. Regarding to reassigning timestamp, I do not >>> have an use case. I come up with this because DataStream has a >>> TimestampAssigner :) >>> >>> +1 for this FLIP. >>> >>> - Jark Wu >>> >>> 在 2016年9月7日,下午2:59,Fabian Hueske <[hidden email] <mailto: >>>> [hidden email]>> 写道: >>>> >>>> Hi, >>>> >>>> thanks for your comments and questions! >>>> Actually, you are bringing up the points that Timo and I discussed the >>>> most >>>> when designing the FLIP ;-) >>>> >>>> - We also thought about the syntactic shortcut for running aggregates >>>> like >>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow >>>> this shortcut is to prevent users from accidentally performing a >>>> "dangerous" operation. The problem with unbounded sliding row-windows is >>>> that their state does never expire. If you have an evolving key space, >>>> you >>>> will likely run into problems at some point because the operator state >>>> grows too large. IMO, a row-window session is a better approach, >>>> because it >>>> defines a timeout after which state can be discarded. groupBy.select is >>>> a >>>> very common operation in batch but its semantics in streaming are very >>>> different. In my opinion it makes sense to make users aware of these >>>> differences through the API. >>>> >>>> - Reassigning timestamps and watermarks is a very delicate issue. You >>>> are >>>> right, that Calcite exposes this field which is necessary due to the >>>> semantics of SQL. However, also in Calcite you cannot freely choose the >>>> timestamp attribute for streaming queries (it must be a monotone or >>>> quasi-monotone attribute) which is hard to reason about (and guarantee) >>>> after a few operators have been applied. Streaming tables in Flink will >>>> likely have a time attribute which is identical to the initial rowtime. >>>> However, Flink does modify timestamps internally, e.g., for records that >>>> are emitted from time windows, in order to ensure that consecutive >>>> windows >>>> perform as expected. Modify or reassign timestamps in the middle of a >>>> job >>>> can result in unexpected results which are very hard to reason about. Do >>>> you have a concrete use case in mind for reassigning timestamps? >>>> >>>> - The idea to represent rowtime and systime as object is good. Our >>>> motivation to go for reserved Scala symbols was to have a uniform syntax >>>> with windows over streaming and batch tables. On batch tables you can >>>> compute time windows basically over every time attribute (they are >>>> treated >>>> similar to grouping attributes with a bit of extra logic to extract the >>>> grouping key for sliding and session windows). If you write >>>> window(Tumble >>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would >>>> indicate >>>> event-time. On a batch table with a 'rowtime attribute, the same >>>> operator >>>> would be internally converted into a group by. By going for the object >>>> approach we would lose this compatibility (or would need to introduce an >>>> additional column attribute to specifiy the window attribute for batch >>>> tables). >>>> >>>> As usual some of the design decisions are based on preferences. >>>> Do they make sense to you? Let me know what you think. >>>> >>>> Best, Fabian >>>> >>>> >>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <[hidden email] <mailto: >>>> [hidden email]>>: >>>> >>>> Hi all, >>>>> >>>>> I'm on vacation for about five days , sorry to have missed this great >>>>> FLIP. >>>>> >>>>> Yes, the non-windowed aggregates is a special case of row-window. And >>>>> the >>>>> proposal looks really good. Can we have a simplified form for the >>>>> special >>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl >>>>> ideRows.unboundedPreceding).select(…) >>>>> can be simplified to table.groupBy(‘a).select(…). The latter will >>>>> actually >>>>> call the former. >>>>> >>>>> Another question is about the rowtime. As the FLIP said, DataStream and >>>>> StreamTableSource is responsible to assign timestamps and watermarks, >>>>> furthermore “rowtime” and “systemtime” are not real column. IMO, it is >>>>> different with Calcite’s rowtime, which is a real column in the table. >>>>> In >>>>> FLIP's way, we will lose some flexibility. Because the timestamp >>>>> column may >>>>> be created after some transformations or join operation, not created at >>>>> beginning. So why do we have to define rowtime at beginning? (because >>>>> of >>>>> watermark?) Can we have a way to define rowtime after source table >>>>> like >>>>> TimestampAssinger? >>>>> >>>>> Regarding to “allowLateness” method. I come up a trick that we can make >>>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression. >>>>> The API >>>>> will looks like this : >>>>> >>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w) >>>>> >>>>> The implementation will look like this: >>>>> >>>>> class TumblingWindow(size: Expression) extends Window { >>>>> def on(time: rowtime.type): TumblingEventTimeWindow = >>>>> new TumblingEventTimeWindow(alias, ‘rowtime, size) // has >>>>> allowLateness() method >>>>> >>>>> def on(time: systemtime.type): TumblingProcessingTimeWindow= >>>>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size) >>>>> // hasn’t allowLateness() method >>>>> } >>>>> object rowtime >>>>> object systemtime >>>>> >>>>> What do you think about this? >>>>> >>>>> - Jark Wu >>>>> >>>>> 在 2016年9月6日,下午11:00,Timo Walther <[hidden email] <mailto: >>>>>> [hidden email]>> 写道: >>>>>> >>>>>> Hi all, >>>>>> >>>>>> I thought about the API of the FLIP again. If we allow the >>>>>> "systemtime" >>>>>> >>>>> attribute, we cannot implement a nice method chaining where the user >>>>> can >>>>> define a "allowLateness" only on event time. So even if the user >>>>> expressed >>>>> that "systemtime" is used we have to offer a "allowLateness" method >>>>> because >>>>> we have to assume that this attribute can also be the batch event time >>>>> column, which is not very nice. >>>>> >>>>>> class TumblingWindow(size: Expression) extends Window { >>>>>> def on(timeField: Expression): TumblingEventTimeWindow = >>>>>> new TumblingEventTimeWindow(alias, timeField, size) // has >>>>>> >>>>> allowLateness() method >>>>> >>>>>> } >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Timo >>>>>> >>>>>> >>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: >>>>>> >>>>>>> Hi Jark, >>>>>>> >>>>>>> you had asked for non-windowed aggregates in the Table API a few >>>>>>> times. >>>>>>> FLIP-11 proposes row-window aggregates which are a generalization of >>>>>>> running aggregates (SlideRow unboundedPreceding). >>>>>>> >>>>>>> Can you have a look at the FLIP and give feedback whether this is >>>>>>> what >>>>>>> >>>>>> you >>>>> >>>>>> are looking for? >>>>>>> Improvement suggestions are very welcome as well. >>>>>>> >>>>>>> Thank you, >>>>>>> Fabian >>>>>>> >>>>>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <[hidden email] <mailto: >>>>>>> [hidden email]>>: >>>>>>> >>>>>>> Hi all! >>>>>>>> >>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table >>>>>>>> API. >>>>>>>> You can find the FLIP-11 here: >>>>>>>> >>>>>>>> <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% < >>>>>>>> <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%">https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%> >>>>>>>> 3A+Table+API+Stream+Aggregations >>>>>>>> >>>>>>>> Motivation for the FLIP: >>>>>>>> >>>>>>>> The Table API is a declarative API to define queries on static and >>>>>>>> streaming tables. So far, only projection, selection, and union are >>>>>>>> supported operations on streaming tables. >>>>>>>> >>>>>>>> This FLIP proposes to add support for different types of >>>>>>>> aggregations >>>>>>>> >>>>>>> on >>>>> >>>>>> top of streaming tables. In particular, we seek to support: >>>>>>>> >>>>>>>> - Group-window aggregates, i.e., aggregates which are computed for a >>>>>>>> >>>>>>> group >>>>> >>>>>> of elements. A (time or row-count) window is required to bound the >>>>>>>> >>>>>>> infinite >>>>> >>>>>> input stream into a finite group. >>>>>>>> >>>>>>>> - Row-window aggregates, i.e., aggregates which are computed for >>>>>>>> each >>>>>>>> >>>>>>> row, >>>>> >>>>>> based on a window (range) of preceding and succeeding rows. >>>>>>>> Each type of aggregate shall be supported on keyed/grouped or >>>>>>>> non-keyed/grouped data streams for streaming tables as well as batch >>>>>>>> >>>>>>> tables. >>>>> >>>>>> We are looking forward to your feedback. >>>>>>>> >>>>>>>> Timo >>>>>>>> >>>>>>>> >>>>>> -- >>>>>> Freundliche Grüße / Kind Regards >>>>>> >>>>>> Timo Walther >>>>>> >>>>>> Follow me: @twalthr >>>>>> https://www.linkedin.com/in/twalthr <https://www.linkedin.com/in/t >>>>>> walthr> >>>>>> >>>>> >>>>> >> > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr > > |
Free forum by Nabble | Edit this page |