Hi All,
As Jincheng brought up in the previous email, there are a set of improvements needed to make Table API more complete/self-contained. To give a better overview on this, Jincheng, Jiangjie, Shaoxuan and myself discussed offline a bit and came up with an initial outline. Table API Enhancement Outline <https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit?usp=sharing> Please take a look and your comments are welcome! Regards, Xiaowei |
Hi, Xiaowei,
Thanks for bring up the discuss of Table API Enhancement Outline ! I quickly looked at the overall content, these are good expressions of our offline discussions. But from the points of my view, we should add the usage of public interfaces that we will introduce in this propose. So, I added the following usage description of interface and operators in google doc: 1. Map Operator Map operator is a new operator of Table, Map operator can apply a scalar function, and can return multi-column. The usage as follows: val res = tab .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) .select(‘a, ‘c) 2. FlatMap Operator FaltMap operator is a new operator of Table, FlatMap operator can apply a table function, and can return multi-row. The usage as follows: val res = tab .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) .select(‘a, ‘c) 3. Agg Operator Agg operator is a new operator of Table/GroupedTable, Agg operator can apply a aggregate function, and can return multi-column. The usage as follows: val res = tab .groupBy(‘a) // leave groupBy-Clause out to define global aggregates .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) .select(‘a, ‘c) 4. FlatAgg Operator FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg operator can apply a table aggregate function, and can return multi-row. The usage as follows: val res = tab .groupBy(‘a) // leave groupBy-Clause out to define global table aggregates .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) .select(‘a, ‘c) 5. TableAggregateFunction The behavior of table aggregates is most like GroupReduceFunction did, which computed for a group of elements, and output a group of elements. The TableAggregateFunction can be applied on GroupedTable.flatAgg() . The interface of TableAggregateFunction has a lot of content, so I don't copy it here, Please look at the detail in google doc: https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit I will be very appreciate to anyone for reviewing and commenting. Best, Jincheng |
Hi Jincheng,
Thanks for adding the public interfaces! I think that it's a very good start. There are a few points that we need to have more discussions. - TableAggregateFunction - this is a very complex beast, definitely the most complex user defined objects we introduced so far. I think there are quite some interesting questions here. For example, do we allow multi-staged TableAggregate in this case? What is the semantics of emit? Is it amendments to the previous output, or replacing it? I think that this subject itself is worth a discussion to make sure we get the details right. - GroupedTable.agg - does the group keys automatically appear in the output? how about the case of windowing aggregation? Regards, Xiaowei On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <[hidden email]> wrote: > Hi, Xiaowei, > > Thanks for bring up the discuss of Table API Enhancement Outline ! > > I quickly looked at the overall content, these are good expressions of our > offline discussions. But from the points of my view, we should add the > usage of public interfaces that we will introduce in this propose. So, I > added the following usage description of interface and operators in > google doc: > > 1. Map Operator > Map operator is a new operator of Table, Map operator can apply a > scalar function, and can return multi-column. The usage as follows: > > val res = tab > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > .select(‘a, ‘c) > > 2. FlatMap Operator > FaltMap operator is a new operator of Table, FlatMap operator can apply > a table function, and can return multi-row. The usage as follows: > > val res = tab > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > .select(‘a, ‘c) > > 3. Agg Operator > Agg operator is a new operator of Table/GroupedTable, Agg operator can > apply a aggregate function, and can return multi-column. The usage as > follows: > > val res = tab > .groupBy(‘a) // leave groupBy-Clause out to define global aggregates > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > .select(‘a, ‘c) > > 4. FlatAgg Operator > FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg > operator can apply a table aggregate function, and can return multi-row. > The usage as follows: > > val res = tab > .groupBy(‘a) // leave groupBy-Clause out to define global table > aggregates > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > .select(‘a, ‘c) > > 5. TableAggregateFunction > The behavior of table aggregates is most like GroupReduceFunction did, > which computed for a group of elements, and output a group of elements. > The TableAggregateFunction can be applied on GroupedTable.flatAgg() . The > interface of TableAggregateFunction has a lot of content, so I don't copy > it here, Please look at the detail in google doc: > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > I will be very appreciate to anyone for reviewing and commenting. > > Best, > Jincheng > |
Hi,
Thanks for the great design document! It answers my question regarding handling of retraction messages. Overall, I like the proposal. It is well scoped and the proposed changes are well described. I left a question regarding the handling of time attributes for multi-column output functions. Thanks, Fabian Am Di., 6. Nov. 2018 um 12:16 Uhr schrieb Xiaowei Jiang <[hidden email] >: > Hi Jincheng, > > Thanks for adding the public interfaces! I think that it's a very good > start. There are a few points that we need to have more discussions. > > - TableAggregateFunction - this is a very complex beast, definitely the > most complex user defined objects we introduced so far. I think there > are > quite some interesting questions here. For example, do we allow > multi-staged TableAggregate in this case? What is the semantics of > emit? Is > it amendments to the previous output, or replacing it? I think that this > subject itself is worth a discussion to make sure we get the details > right. > - GroupedTable.agg - does the group keys automatically appear in the > output? how about the case of windowing aggregation? > > Regards, > Xiaowei > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <[hidden email]> > wrote: > > > Hi, Xiaowei, > > > > Thanks for bring up the discuss of Table API Enhancement Outline ! > > > > I quickly looked at the overall content, these are good expressions of > our > > offline discussions. But from the points of my view, we should add the > > usage of public interfaces that we will introduce in this propose. So, I > > added the following usage description of interface and operators in > > google doc: > > > > 1. Map Operator > > Map operator is a new operator of Table, Map operator can apply a > > scalar function, and can return multi-column. The usage as follows: > > > > val res = tab > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 2. FlatMap Operator > > FaltMap operator is a new operator of Table, FlatMap operator can > apply > > a table function, and can return multi-row. The usage as follows: > > > > val res = tab > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 3. Agg Operator > > Agg operator is a new operator of Table/GroupedTable, Agg operator > can > > apply a aggregate function, and can return multi-column. The usage as > > follows: > > > > val res = tab > > .groupBy(‘a) // leave groupBy-Clause out to define global > aggregates > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 4. FlatAgg Operator > > FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg > > operator can apply a table aggregate function, and can return multi-row. > > The usage as follows: > > > > val res = tab > > .groupBy(‘a) // leave groupBy-Clause out to define global table > > aggregates > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 5. TableAggregateFunction > > The behavior of table aggregates is most like GroupReduceFunction > did, > > which computed for a group of elements, and output a group of elements. > > The TableAggregateFunction can be applied on GroupedTable.flatAgg() . The > > interface of TableAggregateFunction has a lot of content, so I don't copy > > it here, Please look at the detail in google doc: > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > Best, > > Jincheng > > > |
In reply to this post by Xiaowei Jiang
Hi Xiaowei,
Thank you for mentioned such key points. Yes, I think those points are very important for the clear definition of the semantics of Table AggregateFunction!I'd like share my thoughts about the those questions: 1. Do we allow multi-staged TableAggregate in this case? From the points of my view, both Aggregates and TableAggregate should be support multi-staged ! Aggregates support multi-staged means the pre-aggregation of data, e.g.: table.select(count(*)), the optimizer to generate a new plan with the new statistics(May be hints), i.e.: Separate the count aggregate into partial aggregate which to do count, and global aggregate which to do sum. Pre-aggregation can solve hot issues. Partial-global aggregate is an important optimization for aggregate!Look at the interface of TableAggregateFunction, The only difference between aggregateFunction and TableAggregateFunction is the definition of output: getValue VS emitValue, and other calculation logic is the same. So I think we can get more benefit from supported multi-staged for TableAggregate。 2. What is the semantics of emit? Is it amendments to the previous output, or replacing it? I think currently the aggregate using getValue to update the old result, like the replacing behavior you said! Frankly speaking, I don't quite understand the consideration for you said about “the previous to the previous”. I will be very grateful if you can explain it in detail? 3. Does the group keys automatically appear in the output of GroupedTable.agg ? I think so, Because users usually calculate by keys, and 99% of the cases are expected to include keys in the output. What do you think? Best, Jincheng Xiaowei Jiang <[hidden email]> 于2018年11月6日周二 下午7:16写道: > Hi Jincheng, > > Thanks for adding the public interfaces! I think that it's a very good > start. There are a few points that we need to have more discussions. > > - TableAggregateFunction - this is a very complex beast, definitely the > most complex user defined objects we introduced so far. I think there > are > quite some interesting questions here. For example, do we allow > multi-staged TableAggregate in this case? What is the semantics of > emit? Is > it amendments to the previous output, or replacing it? I think that this > subject itself is worth a discussion to make sure we get the details > right. > - GroupedTable.agg - does the group keys automatically appear in the > output? how about the case of windowing aggregation? > > Regards, > Xiaowei > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <[hidden email]> > wrote: > > > Hi, Xiaowei, > > > > Thanks for bring up the discuss of Table API Enhancement Outline ! > > > > I quickly looked at the overall content, these are good expressions of > our > > offline discussions. But from the points of my view, we should add the > > usage of public interfaces that we will introduce in this propose. So, I > > added the following usage description of interface and operators in > > google doc: > > > > 1. Map Operator > > Map operator is a new operator of Table, Map operator can apply a > > scalar function, and can return multi-column. The usage as follows: > > > > val res = tab > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 2. FlatMap Operator > > FaltMap operator is a new operator of Table, FlatMap operator can > apply > > a table function, and can return multi-row. The usage as follows: > > > > val res = tab > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 3. Agg Operator > > Agg operator is a new operator of Table/GroupedTable, Agg operator > can > > apply a aggregate function, and can return multi-column. The usage as > > follows: > > > > val res = tab > > .groupBy(‘a) // leave groupBy-Clause out to define global > aggregates > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 4. FlatAgg Operator > > FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg > > operator can apply a table aggregate function, and can return multi-row. > > The usage as follows: > > > > val res = tab > > .groupBy(‘a) // leave groupBy-Clause out to define global table > > aggregates > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 5. TableAggregateFunction > > The behavior of table aggregates is most like GroupReduceFunction > did, > > which computed for a group of elements, and output a group of elements. > > The TableAggregateFunction can be applied on GroupedTable.flatAgg() . The > > interface of TableAggregateFunction has a lot of content, so I don't copy > > it here, Please look at the detail in google doc: > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > Best, > > Jincheng > > > |
In reply to this post by Xiaowei Jiang
Hi xiaowei,
Yes, I agree with you that the semantics of TableAggregateFunction emit is much more complex than AggregateFunction. The fundamental difference is that TableAggregateFunction emits a "table" while AggregateFunction outputs (a column of) a "row". In the case of AggregateFunction it only has one mode which is “replacing” (complete update). But for TableAggregateFunction, it could be incremental (only emit the new updated results) update or complete update (always emit the entire table when “emit" is triggered). From the performance perspective, we might want to use incremental update. But we need review and design this carefully, especially taking into account the cases of the failover (instead of just back-up the ACC it may also needs to remember the emit offset) and retractions, as the semantics of TableAggregateFunction emit are different than other UDFs. TableFunction also emits a table, but it does not need to worry this due to the nature of stateless. Regards, Shaoxuan On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email]> wrote: > Hi Jincheng, > > Thanks for adding the public interfaces! I think that it's a very good > start. There are a few points that we need to have more discussions. > > - TableAggregateFunction - this is a very complex beast, definitely the > most complex user defined objects we introduced so far. I think there > are > quite some interesting questions here. For example, do we allow > multi-staged TableAggregate in this case? What is the semantics of > emit? Is > it amendments to the previous output, or replacing it? I think that this > subject itself is worth a discussion to make sure we get the details > right. > - GroupedTable.agg - does the group keys automatically appear in the > output? how about the case of windowing aggregation? > > Regards, > Xiaowei > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <[hidden email]> > wrote: > > > Hi, Xiaowei, > > > > Thanks for bring up the discuss of Table API Enhancement Outline ! > > > > I quickly looked at the overall content, these are good expressions of > our > > offline discussions. But from the points of my view, we should add the > > usage of public interfaces that we will introduce in this propose. So, I > > added the following usage description of interface and operators in > > google doc: > > > > 1. Map Operator > > Map operator is a new operator of Table, Map operator can apply a > > scalar function, and can return multi-column. The usage as follows: > > > > val res = tab > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 2. FlatMap Operator > > FaltMap operator is a new operator of Table, FlatMap operator can > apply > > a table function, and can return multi-row. The usage as follows: > > > > val res = tab > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 3. Agg Operator > > Agg operator is a new operator of Table/GroupedTable, Agg operator > can > > apply a aggregate function, and can return multi-column. The usage as > > follows: > > > > val res = tab > > .groupBy(‘a) // leave groupBy-Clause out to define global > aggregates > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 4. FlatAgg Operator > > FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg > > operator can apply a table aggregate function, and can return multi-row. > > The usage as follows: > > > > val res = tab > > .groupBy(‘a) // leave groupBy-Clause out to define global table > > aggregates > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > .select(‘a, ‘c) > > > > 5. TableAggregateFunction > > The behavior of table aggregates is most like GroupReduceFunction > did, > > which computed for a group of elements, and output a group of elements. > > The TableAggregateFunction can be applied on GroupedTable.flatAgg() . The > > interface of TableAggregateFunction has a lot of content, so I don't copy > > it here, Please look at the detail in google doc: > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > Best, > > Jincheng > > > |
Hi,
* Re emit: I think we should start with a well understood semantics of full replacement. This is how the other agg functions work. As was said before, there are open questions regarding an append mode (checkpointing, whether supporting retractions or not and if yes how to declare them, ...). Since this seems to be an optimization, I'd postpone it. * Re grouping keys: I don't think we should automatically add them because the result schema would not be intuitive. Would they be added at the beginning of the tuple or at the end? What metadata fields of windows would be added? In which order would they be added? However, we could support syntax like this: val t: Table = ??? t .window(Tumble ... as 'w) .groupBy('a, 'b) .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as 'rtime) The result schema would be clearly defined as [b, a, f1, f2, ..., fn, wend, rtime]. (f1, f2, ...fn) are the result attributes of the UDF. * Re Multi-staged evaluation: I think this should be an optimization that can be applied if the UDF implements the merge() method. Best, Fabian Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <[hidden email] >: > Hi xiaowei, > > Yes, I agree with you that the semantics of TableAggregateFunction emit is > much more complex than AggregateFunction. The fundamental difference is > that TableAggregateFunction emits a "table" while AggregateFunction outputs > (a column of) a "row". In the case of AggregateFunction it only has one > mode which is “replacing” (complete update). But for > TableAggregateFunction, it could be incremental (only emit the new updated > results) update or complete update (always emit the entire table when > “emit" is triggered). From the performance perspective, we might want to > use incremental update. But we need review and design this carefully, > especially taking into account the cases of the failover (instead of just > back-up the ACC it may also needs to remember the emit offset) and > retractions, as the semantics of TableAggregateFunction emit are different > than other UDFs. TableFunction also emits a table, but it does not need to > worry this due to the nature of stateless. > > Regards, > Shaoxuan > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email]> wrote: > > > Hi Jincheng, > > > > Thanks for adding the public interfaces! I think that it's a very good > > start. There are a few points that we need to have more discussions. > > > > - TableAggregateFunction - this is a very complex beast, definitely > the > > most complex user defined objects we introduced so far. I think there > > are > > quite some interesting questions here. For example, do we allow > > multi-staged TableAggregate in this case? What is the semantics of > > emit? Is > > it amendments to the previous output, or replacing it? I think that > this > > subject itself is worth a discussion to make sure we get the details > > right. > > - GroupedTable.agg - does the group keys automatically appear in the > > output? how about the case of windowing aggregation? > > > > Regards, > > Xiaowei > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <[hidden email]> > > wrote: > > > > > Hi, Xiaowei, > > > > > > Thanks for bring up the discuss of Table API Enhancement Outline ! > > > > > > I quickly looked at the overall content, these are good expressions of > > our > > > offline discussions. But from the points of my view, we should add the > > > usage of public interfaces that we will introduce in this propose. > So, I > > > added the following usage description of interface and operators in > > > google doc: > > > > > > 1. Map Operator > > > Map operator is a new operator of Table, Map operator can apply a > > > scalar function, and can return multi-column. The usage as follows: > > > > > > val res = tab > > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > > .select(‘a, ‘c) > > > > > > 2. FlatMap Operator > > > FaltMap operator is a new operator of Table, FlatMap operator can > > apply > > > a table function, and can return multi-row. The usage as follows: > > > > > > val res = tab > > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > > .select(‘a, ‘c) > > > > > > 3. Agg Operator > > > Agg operator is a new operator of Table/GroupedTable, Agg operator > > can > > > apply a aggregate function, and can return multi-column. The usage as > > > follows: > > > > > > val res = tab > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > aggregates > > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > > .select(‘a, ‘c) > > > > > > 4. FlatAgg Operator > > > FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg > > > operator can apply a table aggregate function, and can return > multi-row. > > > The usage as follows: > > > > > > val res = tab > > > .groupBy(‘a) // leave groupBy-Clause out to define global table > > > aggregates > > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > > .select(‘a, ‘c) > > > > > > 5. TableAggregateFunction > > > The behavior of table aggregates is most like GroupReduceFunction > > did, > > > which computed for a group of elements, and output a group of > elements. > > > The TableAggregateFunction can be applied on GroupedTable.flatAgg() . > The > > > interface of TableAggregateFunction has a lot of content, so I don't > copy > > > it here, Please look at the detail in google doc: > > > > > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > > > Best, > > > Jincheng > > > > > > |
Hi Fabian,
I think that the key question you raised is if we allow extra parameters in the methods map/flatMap/agg/flatAgg. I can see why allowing that may appear more convenient in some cases. However, it might also cause some confusions if we do that. For example, do we allow multiple UDFs in these expressions? If we do, the semantics may be weird to define, e.g. what does table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even though not allowing it may appear less powerful, but it can make things more intuitive too. In the case of agg/flatAgg, we can define the keys to be implied in the result table and appears at the beginning. You can use a select method if you want to modify this behavior. I think that eventually we will have some API which allows other expressions as additional parameters, but I think it's better to do that after we introduce the concept of nested tables. A lot of things we suggested here can be considered as special cases of that. But things are much simpler if we leave that to later. Regards, Xiaowei On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> wrote: > Hi, > > * Re emit: > I think we should start with a well understood semantics of full > replacement. This is how the other agg functions work. > As was said before, there are open questions regarding an append mode > (checkpointing, whether supporting retractions or not and if yes how to > declare them, ...). > Since this seems to be an optimization, I'd postpone it. > > * Re grouping keys: > I don't think we should automatically add them because the result schema > would not be intuitive. > Would they be added at the beginning of the tuple or at the end? What > metadata fields of windows would be added? In which order would they be > added? > > However, we could support syntax like this: > val t: Table = ??? > t > .window(Tumble ... as 'w) > .groupBy('a, 'b) > .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as 'rtime) > > The result schema would be clearly defined as [b, a, f1, f2, ..., fn, wend, > rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > > * Re Multi-staged evaluation: > I think this should be an optimization that can be applied if the UDF > implements the merge() method. > > Best, Fabian > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > [hidden email] > >: > > > Hi xiaowei, > > > > Yes, I agree with you that the semantics of TableAggregateFunction emit > is > > much more complex than AggregateFunction. The fundamental difference is > > that TableAggregateFunction emits a "table" while AggregateFunction > outputs > > (a column of) a "row". In the case of AggregateFunction it only has one > > mode which is “replacing” (complete update). But for > > TableAggregateFunction, it could be incremental (only emit the new > updated > > results) update or complete update (always emit the entire table when > > “emit" is triggered). From the performance perspective, we might want to > > use incremental update. But we need review and design this carefully, > > especially taking into account the cases of the failover (instead of just > > back-up the ACC it may also needs to remember the emit offset) and > > retractions, as the semantics of TableAggregateFunction emit are > different > > than other UDFs. TableFunction also emits a table, but it does not need > to > > worry this due to the nature of stateless. > > > > Regards, > > Shaoxuan > > > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email]> wrote: > > > > > Hi Jincheng, > > > > > > Thanks for adding the public interfaces! I think that it's a very good > > > start. There are a few points that we need to have more discussions. > > > > > > - TableAggregateFunction - this is a very complex beast, definitely > > the > > > most complex user defined objects we introduced so far. I think > there > > > are > > > quite some interesting questions here. For example, do we allow > > > multi-staged TableAggregate in this case? What is the semantics of > > > emit? Is > > > it amendments to the previous output, or replacing it? I think that > > this > > > subject itself is worth a discussion to make sure we get the details > > > right. > > > - GroupedTable.agg - does the group keys automatically appear in the > > > output? how about the case of windowing aggregation? > > > > > > Regards, > > > Xiaowei > > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <[hidden email]> > > > wrote: > > > > > > > Hi, Xiaowei, > > > > > > > > Thanks for bring up the discuss of Table API Enhancement Outline ! > > > > > > > > I quickly looked at the overall content, these are good expressions > of > > > our > > > > offline discussions. But from the points of my view, we should add > the > > > > usage of public interfaces that we will introduce in this propose. > > So, I > > > > added the following usage description of interface and operators in > > > > google doc: > > > > > > > > 1. Map Operator > > > > Map operator is a new operator of Table, Map operator can apply a > > > > scalar function, and can return multi-column. The usage as follows: > > > > > > > > val res = tab > > > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > > > .select(‘a, ‘c) > > > > > > > > 2. FlatMap Operator > > > > FaltMap operator is a new operator of Table, FlatMap operator can > > > apply > > > > a table function, and can return multi-row. The usage as follows: > > > > > > > > val res = tab > > > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > > > .select(‘a, ‘c) > > > > > > > > 3. Agg Operator > > > > Agg operator is a new operator of Table/GroupedTable, Agg > operator > > > can > > > > apply a aggregate function, and can return multi-column. The usage as > > > > follows: > > > > > > > > val res = tab > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > > aggregates > > > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > > > .select(‘a, ‘c) > > > > > > > > 4. FlatAgg Operator > > > > FlatAgg operator is a new operator of Table/GroupedTable, FaltAgg > > > > operator can apply a table aggregate function, and can return > > multi-row. > > > > The usage as follows: > > > > > > > > val res = tab > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > table > > > > aggregates > > > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > > > .select(‘a, ‘c) > > > > > > > > 5. TableAggregateFunction > > > > The behavior of table aggregates is most like > GroupReduceFunction > > > did, > > > > which computed for a group of elements, and output a group of > > elements. > > > > The TableAggregateFunction can be applied on GroupedTable.flatAgg() . > > The > > > > interface of TableAggregateFunction has a lot of content, so I don't > > copy > > > > it here, Please look at the detail in google doc: > > > > > > > > > > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > > > > > Best, > > > > Jincheng > > > > > > > > > > |
Hi all,
We are discussing very detailed content about this proposal. We are trying to design the API in many aspects (functionality, compatibility, ease of use, etc.). I think this is a very good process. Only such a detailed discussion, In order to develop PR more clearly and smoothly in the later stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot of good ideas. About the definition of method signatures I want to share my points here which I am discussing with fabian in google doc (not yet completed), as follows: Assume we have a table: val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, 'proctime.proctime) Approach 1: case1: Map follows Source Table val result = tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied in the output .window(Tumble over 5.millis on 'proctime as 'w) case2: FatAgg follows Window (Fabian mentioned above) val result = tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) // 'w should be a group key. .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) .select('k1, 'col1, 'w.rowtime as 'rtime) Approach 2: Similar to Fabian‘s approach, which the result schema would be clearly defined, but add a built-in append UDF. That make map/flatmap/agg/flatAgg interface only accept one Expression. val result = tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, 'long, 'proctime) .window(Tumble over 5.millis on 'proctime as 'w) Note: Append is a special UDF for built-in that can pass through any column. So, May be we can defined the as table.map(Expression) first, If necessary, we can extend to table.map(Expression*) in the future ? Of course, I also hope that we can do more perfection in this proposal through discussion. Thanks, Jincheng Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: > Hi Fabian, > > I think that the key question you raised is if we allow extra parameters in > the methods map/flatMap/agg/flatAgg. I can see why allowing that may appear > more convenient in some cases. However, it might also cause some confusions > if we do that. For example, do we allow multiple UDFs in these expressions? > If we do, the semantics may be weird to define, e.g. what does > table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even though > not allowing it may appear less powerful, but it can make things more > intuitive too. In the case of agg/flatAgg, we can define the keys to be > implied in the result table and appears at the beginning. You can use a > select method if you want to modify this behavior. I think that eventually > we will have some API which allows other expressions as additional > parameters, but I think it's better to do that after we introduce the > concept of nested tables. A lot of things we suggested here can be > considered as special cases of that. But things are much simpler if we > leave that to later. > > Regards, > Xiaowei > > On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> wrote: > > > Hi, > > > > * Re emit: > > I think we should start with a well understood semantics of full > > replacement. This is how the other agg functions work. > > As was said before, there are open questions regarding an append mode > > (checkpointing, whether supporting retractions or not and if yes how to > > declare them, ...). > > Since this seems to be an optimization, I'd postpone it. > > > > * Re grouping keys: > > I don't think we should automatically add them because the result schema > > would not be intuitive. > > Would they be added at the beginning of the tuple or at the end? What > > metadata fields of windows would be added? In which order would they be > > added? > > > > However, we could support syntax like this: > > val t: Table = ??? > > t > > .window(Tumble ... as 'w) > > .groupBy('a, 'b) > > .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as 'rtime) > > > > The result schema would be clearly defined as [b, a, f1, f2, ..., fn, > wend, > > rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > > > > * Re Multi-staged evaluation: > > I think this should be an optimization that can be applied if the UDF > > implements the merge() method. > > > > Best, Fabian > > > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > > [hidden email] > > >: > > > > > Hi xiaowei, > > > > > > Yes, I agree with you that the semantics of TableAggregateFunction emit > > is > > > much more complex than AggregateFunction. The fundamental difference is > > > that TableAggregateFunction emits a "table" while AggregateFunction > > outputs > > > (a column of) a "row". In the case of AggregateFunction it only has one > > > mode which is “replacing” (complete update). But for > > > TableAggregateFunction, it could be incremental (only emit the new > > updated > > > results) update or complete update (always emit the entire table when > > > “emit" is triggered). From the performance perspective, we might want > to > > > use incremental update. But we need review and design this carefully, > > > especially taking into account the cases of the failover (instead of > just > > > back-up the ACC it may also needs to remember the emit offset) and > > > retractions, as the semantics of TableAggregateFunction emit are > > different > > > than other UDFs. TableFunction also emits a table, but it does not need > > to > > > worry this due to the nature of stateless. > > > > > > Regards, > > > Shaoxuan > > > > > > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email]> > wrote: > > > > > > > Hi Jincheng, > > > > > > > > Thanks for adding the public interfaces! I think that it's a very > good > > > > start. There are a few points that we need to have more discussions. > > > > > > > > - TableAggregateFunction - this is a very complex beast, > definitely > > > the > > > > most complex user defined objects we introduced so far. I think > > there > > > > are > > > > quite some interesting questions here. For example, do we allow > > > > multi-staged TableAggregate in this case? What is the semantics of > > > > emit? Is > > > > it amendments to the previous output, or replacing it? I think > that > > > this > > > > subject itself is worth a discussion to make sure we get the > details > > > > right. > > > > - GroupedTable.agg - does the group keys automatically appear in > the > > > > output? how about the case of windowing aggregation? > > > > > > > > Regards, > > > > Xiaowei > > > > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > [hidden email]> > > > > wrote: > > > > > > > > > Hi, Xiaowei, > > > > > > > > > > Thanks for bring up the discuss of Table API Enhancement Outline ! > > > > > > > > > > I quickly looked at the overall content, these are good expressions > > of > > > > our > > > > > offline discussions. But from the points of my view, we should add > > the > > > > > usage of public interfaces that we will introduce in this propose. > > > So, I > > > > > added the following usage description of interface and operators > in > > > > > google doc: > > > > > > > > > > 1. Map Operator > > > > > Map operator is a new operator of Table, Map operator can > apply a > > > > > scalar function, and can return multi-column. The usage as follows: > > > > > > > > > > val res = tab > > > > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > > > > .select(‘a, ‘c) > > > > > > > > > > 2. FlatMap Operator > > > > > FaltMap operator is a new operator of Table, FlatMap operator > can > > > > apply > > > > > a table function, and can return multi-row. The usage as follows: > > > > > > > > > > val res = tab > > > > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > > > > .select(‘a, ‘c) > > > > > > > > > > 3. Agg Operator > > > > > Agg operator is a new operator of Table/GroupedTable, Agg > > operator > > > > can > > > > > apply a aggregate function, and can return multi-column. The usage > as > > > > > follows: > > > > > > > > > > val res = tab > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > > > aggregates > > > > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > > > > .select(‘a, ‘c) > > > > > > > > > > 4. FlatAgg Operator > > > > > FlatAgg operator is a new operator of Table/GroupedTable, > FaltAgg > > > > > operator can apply a table aggregate function, and can return > > > multi-row. > > > > > The usage as follows: > > > > > > > > > > val res = tab > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > table > > > > > aggregates > > > > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > > > > .select(‘a, ‘c) > > > > > > > > > > 5. TableAggregateFunction > > > > > The behavior of table aggregates is most like > > GroupReduceFunction > > > > did, > > > > > which computed for a group of elements, and output a group of > > > elements. > > > > > The TableAggregateFunction can be applied on > GroupedTable.flatAgg() . > > > The > > > > > interface of TableAggregateFunction has a lot of content, so I > don't > > > copy > > > > > it here, Please look at the detail in google doc: > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > > > > > > > Best, > > > > > Jincheng > > > > > > > > > > > > > > > |
Hi Jincheng,
Thanks for the summary! I like the approach with append() better than the implicit forwarding as it clearly indicates which fields are forwarded. However, I don't see much benefit over the flatMap(Expression*) variant, as we would still need to analyze the full expression tree to ensure that at most (or exactly?) one Scalar / TableFunction is used. Best, Fabian Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < [hidden email]>: > Hi all, > > We are discussing very detailed content about this proposal. We are trying > to design the API in many aspects (functionality, compatibility, ease of > use, etc.). I think this is a very good process. Only such a detailed > discussion, In order to develop PR more clearly and smoothly in the later > stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot of > good ideas. > About the definition of method signatures I want to share my points here > which I am discussing with fabian in google doc (not yet completed), as > follows: > > Assume we have a table: > val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, > 'proctime.proctime) > > Approach 1: > case1: Map follows Source Table > val result = > tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied in > the output > .window(Tumble over 5.millis on 'proctime as 'w) > > case2: FatAgg follows Window (Fabian mentioned above) > val result = > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > .select('k1, 'col1, 'w.rowtime as 'rtime) > > Approach 2: Similar to Fabian‘s approach, which the result schema would be > clearly defined, but add a built-in append UDF. That make > map/flatmap/agg/flatAgg interface only accept one Expression. > val result = > tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, > 'long, 'proctime) > .window(Tumble over 5.millis on 'proctime as 'w) > > Note: Append is a special UDF for built-in that can pass through any > column. > > So, May be we can defined the as table.map(Expression) first, If > necessary, we can extend to table.map(Expression*) in the future ? Of > course, I also hope that we can do more perfection in this proposal through > discussion. > > Thanks, > Jincheng > > > > > > Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: > > > Hi Fabian, > > > > I think that the key question you raised is if we allow extra parameters > in > > the methods map/flatMap/agg/flatAgg. I can see why allowing that may > appear > > more convenient in some cases. However, it might also cause some > confusions > > if we do that. For example, do we allow multiple UDFs in these > expressions? > > If we do, the semantics may be weird to define, e.g. what does > > table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even though > > not allowing it may appear less powerful, but it can make things more > > intuitive too. In the case of agg/flatAgg, we can define the keys to be > > implied in the result table and appears at the beginning. You can use a > > select method if you want to modify this behavior. I think that > eventually > > we will have some API which allows other expressions as additional > > parameters, but I think it's better to do that after we introduce the > > concept of nested tables. A lot of things we suggested here can be > > considered as special cases of that. But things are much simpler if we > > leave that to later. > > > > Regards, > > Xiaowei > > > > On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> wrote: > > > > > Hi, > > > > > > * Re emit: > > > I think we should start with a well understood semantics of full > > > replacement. This is how the other agg functions work. > > > As was said before, there are open questions regarding an append mode > > > (checkpointing, whether supporting retractions or not and if yes how to > > > declare them, ...). > > > Since this seems to be an optimization, I'd postpone it. > > > > > > * Re grouping keys: > > > I don't think we should automatically add them because the result > schema > > > would not be intuitive. > > > Would they be added at the beginning of the tuple or at the end? What > > > metadata fields of windows would be added? In which order would they be > > > added? > > > > > > However, we could support syntax like this: > > > val t: Table = ??? > > > t > > > .window(Tumble ... as 'w) > > > .groupBy('a, 'b) > > > .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as > 'rtime) > > > > > > The result schema would be clearly defined as [b, a, f1, f2, ..., fn, > > wend, > > > rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > > > > > > * Re Multi-staged evaluation: > > > I think this should be an optimization that can be applied if the UDF > > > implements the merge() method. > > > > > > Best, Fabian > > > > > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > > > [hidden email] > > > >: > > > > > > > Hi xiaowei, > > > > > > > > Yes, I agree with you that the semantics of TableAggregateFunction > emit > > > is > > > > much more complex than AggregateFunction. The fundamental difference > is > > > > that TableAggregateFunction emits a "table" while AggregateFunction > > > outputs > > > > (a column of) a "row". In the case of AggregateFunction it only has > one > > > > mode which is “replacing” (complete update). But for > > > > TableAggregateFunction, it could be incremental (only emit the new > > > updated > > > > results) update or complete update (always emit the entire table when > > > > “emit" is triggered). From the performance perspective, we might > want > > to > > > > use incremental update. But we need review and design this carefully, > > > > especially taking into account the cases of the failover (instead of > > just > > > > back-up the ACC it may also needs to remember the emit offset) and > > > > retractions, as the semantics of TableAggregateFunction emit are > > > different > > > > than other UDFs. TableFunction also emits a table, but it does not > need > > > to > > > > worry this due to the nature of stateless. > > > > > > > > Regards, > > > > Shaoxuan > > > > > > > > > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email]> > > wrote: > > > > > > > > > Hi Jincheng, > > > > > > > > > > Thanks for adding the public interfaces! I think that it's a very > > good > > > > > start. There are a few points that we need to have more > discussions. > > > > > > > > > > - TableAggregateFunction - this is a very complex beast, > > definitely > > > > the > > > > > most complex user defined objects we introduced so far. I think > > > there > > > > > are > > > > > quite some interesting questions here. For example, do we allow > > > > > multi-staged TableAggregate in this case? What is the semantics > of > > > > > emit? Is > > > > > it amendments to the previous output, or replacing it? I think > > that > > > > this > > > > > subject itself is worth a discussion to make sure we get the > > details > > > > > right. > > > > > - GroupedTable.agg - does the group keys automatically appear in > > the > > > > > output? how about the case of windowing aggregation? > > > > > > > > > > Regards, > > > > > Xiaowei > > > > > > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > > [hidden email]> > > > > > wrote: > > > > > > > > > > > Hi, Xiaowei, > > > > > > > > > > > > Thanks for bring up the discuss of Table API Enhancement Outline > ! > > > > > > > > > > > > I quickly looked at the overall content, these are good > expressions > > > of > > > > > our > > > > > > offline discussions. But from the points of my view, we should > add > > > the > > > > > > usage of public interfaces that we will introduce in this > propose. > > > > So, I > > > > > > added the following usage description of interface and operators > > in > > > > > > google doc: > > > > > > > > > > > > 1. Map Operator > > > > > > Map operator is a new operator of Table, Map operator can > > apply a > > > > > > scalar function, and can return multi-column. The usage as > follows: > > > > > > > > > > > > val res = tab > > > > > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > 2. FlatMap Operator > > > > > > FaltMap operator is a new operator of Table, FlatMap operator > > can > > > > > apply > > > > > > a table function, and can return multi-row. The usage as follows: > > > > > > > > > > > > val res = tab > > > > > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > 3. Agg Operator > > > > > > Agg operator is a new operator of Table/GroupedTable, Agg > > > operator > > > > > can > > > > > > apply a aggregate function, and can return multi-column. The > usage > > as > > > > > > follows: > > > > > > > > > > > > val res = tab > > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > > > > aggregates > > > > > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > 4. FlatAgg Operator > > > > > > FlatAgg operator is a new operator of Table/GroupedTable, > > FaltAgg > > > > > > operator can apply a table aggregate function, and can return > > > > multi-row. > > > > > > The usage as follows: > > > > > > > > > > > > val res = tab > > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > > table > > > > > > aggregates > > > > > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > 5. TableAggregateFunction > > > > > > The behavior of table aggregates is most like > > > GroupReduceFunction > > > > > did, > > > > > > which computed for a group of elements, and output a group of > > > > elements. > > > > > > The TableAggregateFunction can be applied on > > GroupedTable.flatAgg() . > > > > The > > > > > > interface of TableAggregateFunction has a lot of content, so I > > don't > > > > copy > > > > > > it here, Please look at the detail in google doc: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > > > > > > > I will be very appreciate to anyone for reviewing and commenting. > > > > > > > > > > > > Best, > > > > > > Jincheng > > > > > > > > > > > > > > > > > > > > > |
Hi Fabian/Xiaowei,
I am very sorry for my late reply! Glad to see your reply, and sounds pretty good! I agree that the approach with append() which can clearly defined the result schema is better which Fabian mentioned. In addition and append() and also contains non-time attributes, e.g.: tab('name, 'age, 'address, 'rowtime) tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, 'address, 'rowtime) .window(Tumble over 5.millis on 'rowtime as 'w) .groupBy('w, 'address) In this way the append() is very useful, and the behavior is very similar to withForwardedFields() in DataSet. So +1 to using append() approach for the map()&flatmap()! But how about the agg() and flatAgg()? In agg/flatAgg case I agree Xiaowei's approach that define the keys to be implied in the result table and appears at the beginning, for example as follows: tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) // 'w should be a group key. .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) .select('k1, 'col1, 'w.rowtime as 'rtime) What to you think? @Fabian @Xiaowei Thanks, Jincheng Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: > Hi Jincheng, > > Thanks for the summary! > I like the approach with append() better than the implicit forwarding as it > clearly indicates which fields are forwarded. > However, I don't see much benefit over the flatMap(Expression*) variant, as > we would still need to analyze the full expression tree to ensure that at > most (or exactly?) one Scalar / TableFunction is used. > > Best, > Fabian > > Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < > [hidden email]>: > > > Hi all, > > > > We are discussing very detailed content about this proposal. We are > trying > > to design the API in many aspects (functionality, compatibility, ease of > > use, etc.). I think this is a very good process. Only such a detailed > > discussion, In order to develop PR more clearly and smoothly in the later > > stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot of > > good ideas. > > About the definition of method signatures I want to share my points here > > which I am discussing with fabian in google doc (not yet completed), as > > follows: > > > > Assume we have a table: > > val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, > > 'proctime.proctime) > > > > Approach 1: > > case1: Map follows Source Table > > val result = > > tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied in > > the output > > .window(Tumble over 5.millis on 'proctime as 'w) > > > > case2: FatAgg follows Window (Fabian mentioned above) > > val result = > > tab.window(Tumble ... as 'w) > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > > .select('k1, 'col1, 'w.rowtime as 'rtime) > > > > Approach 2: Similar to Fabian‘s approach, which the result schema would > be > > clearly defined, but add a built-in append UDF. That make > > map/flatmap/agg/flatAgg interface only accept one Expression. > > val result = > > tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, > > 'long, 'proctime) > > .window(Tumble over 5.millis on 'proctime as 'w) > > > > Note: Append is a special UDF for built-in that can pass through any > > column. > > > > So, May be we can defined the as table.map(Expression) first, If > > necessary, we can extend to table.map(Expression*) in the future ? Of > > course, I also hope that we can do more perfection in this proposal > through > > discussion. > > > > Thanks, > > Jincheng > > > > > > > > > > > > Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: > > > > > Hi Fabian, > > > > > > I think that the key question you raised is if we allow extra > parameters > > in > > > the methods map/flatMap/agg/flatAgg. I can see why allowing that may > > appear > > > more convenient in some cases. However, it might also cause some > > confusions > > > if we do that. For example, do we allow multiple UDFs in these > > expressions? > > > If we do, the semantics may be weird to define, e.g. what does > > > table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even > though > > > not allowing it may appear less powerful, but it can make things more > > > intuitive too. In the case of agg/flatAgg, we can define the keys to be > > > implied in the result table and appears at the beginning. You can use a > > > select method if you want to modify this behavior. I think that > > eventually > > > we will have some API which allows other expressions as additional > > > parameters, but I think it's better to do that after we introduce the > > > concept of nested tables. A lot of things we suggested here can be > > > considered as special cases of that. But things are much simpler if we > > > leave that to later. > > > > > > Regards, > > > Xiaowei > > > > > > On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> > wrote: > > > > > > > Hi, > > > > > > > > * Re emit: > > > > I think we should start with a well understood semantics of full > > > > replacement. This is how the other agg functions work. > > > > As was said before, there are open questions regarding an append mode > > > > (checkpointing, whether supporting retractions or not and if yes how > to > > > > declare them, ...). > > > > Since this seems to be an optimization, I'd postpone it. > > > > > > > > * Re grouping keys: > > > > I don't think we should automatically add them because the result > > schema > > > > would not be intuitive. > > > > Would they be added at the beginning of the tuple or at the end? What > > > > metadata fields of windows would be added? In which order would they > be > > > > added? > > > > > > > > However, we could support syntax like this: > > > > val t: Table = ??? > > > > t > > > > .window(Tumble ... as 'w) > > > > .groupBy('a, 'b) > > > > .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as > > 'rtime) > > > > > > > > The result schema would be clearly defined as [b, a, f1, f2, ..., fn, > > > wend, > > > > rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > > > > > > > > * Re Multi-staged evaluation: > > > > I think this should be an optimization that can be applied if the UDF > > > > implements the merge() method. > > > > > > > > Best, Fabian > > > > > > > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > > > > [hidden email] > > > > >: > > > > > > > > > Hi xiaowei, > > > > > > > > > > Yes, I agree with you that the semantics of TableAggregateFunction > > emit > > > > is > > > > > much more complex than AggregateFunction. The fundamental > difference > > is > > > > > that TableAggregateFunction emits a "table" while AggregateFunction > > > > outputs > > > > > (a column of) a "row". In the case of AggregateFunction it only has > > one > > > > > mode which is “replacing” (complete update). But for > > > > > TableAggregateFunction, it could be incremental (only emit the new > > > > updated > > > > > results) update or complete update (always emit the entire table > when > > > > > “emit" is triggered). From the performance perspective, we might > > want > > > to > > > > > use incremental update. But we need review and design this > carefully, > > > > > especially taking into account the cases of the failover (instead > of > > > just > > > > > back-up the ACC it may also needs to remember the emit offset) and > > > > > retractions, as the semantics of TableAggregateFunction emit are > > > > different > > > > > than other UDFs. TableFunction also emits a table, but it does not > > need > > > > to > > > > > worry this due to the nature of stateless. > > > > > > > > > > Regards, > > > > > Shaoxuan > > > > > > > > > > > > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email]> > > > wrote: > > > > > > > > > > > Hi Jincheng, > > > > > > > > > > > > Thanks for adding the public interfaces! I think that it's a very > > > good > > > > > > start. There are a few points that we need to have more > > discussions. > > > > > > > > > > > > - TableAggregateFunction - this is a very complex beast, > > > definitely > > > > > the > > > > > > most complex user defined objects we introduced so far. I > think > > > > there > > > > > > are > > > > > > quite some interesting questions here. For example, do we > allow > > > > > > multi-staged TableAggregate in this case? What is the > semantics > > of > > > > > > emit? Is > > > > > > it amendments to the previous output, or replacing it? I think > > > that > > > > > this > > > > > > subject itself is worth a discussion to make sure we get the > > > details > > > > > > right. > > > > > > - GroupedTable.agg - does the group keys automatically appear > in > > > the > > > > > > output? how about the case of windowing aggregation? > > > > > > > > > > > > Regards, > > > > > > Xiaowei > > > > > > > > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > Hi, Xiaowei, > > > > > > > > > > > > > > Thanks for bring up the discuss of Table API Enhancement > Outline > > ! > > > > > > > > > > > > > > I quickly looked at the overall content, these are good > > expressions > > > > of > > > > > > our > > > > > > > offline discussions. But from the points of my view, we should > > add > > > > the > > > > > > > usage of public interfaces that we will introduce in this > > propose. > > > > > So, I > > > > > > > added the following usage description of interface and > operators > > > in > > > > > > > google doc: > > > > > > > > > > > > > > 1. Map Operator > > > > > > > Map operator is a new operator of Table, Map operator can > > > apply a > > > > > > > scalar function, and can return multi-column. The usage as > > follows: > > > > > > > > > > > > > > val res = tab > > > > > > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > > > 2. FlatMap Operator > > > > > > > FaltMap operator is a new operator of Table, FlatMap > operator > > > can > > > > > > apply > > > > > > > a table function, and can return multi-row. The usage as > follows: > > > > > > > > > > > > > > val res = tab > > > > > > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > > > 3. Agg Operator > > > > > > > Agg operator is a new operator of Table/GroupedTable, Agg > > > > operator > > > > > > can > > > > > > > apply a aggregate function, and can return multi-column. The > > usage > > > as > > > > > > > follows: > > > > > > > > > > > > > > val res = tab > > > > > > > .groupBy(‘a) // leave groupBy-Clause out to define global > > > > > > aggregates > > > > > > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > > > 4. FlatAgg Operator > > > > > > > FlatAgg operator is a new operator of Table/GroupedTable, > > > FaltAgg > > > > > > > operator can apply a table aggregate function, and can return > > > > > multi-row. > > > > > > > The usage as follows: > > > > > > > > > > > > > > val res = tab > > > > > > > .groupBy(‘a) // leave groupBy-Clause out to define > global > > > > table > > > > > > > aggregates > > > > > > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > > > 5. TableAggregateFunction > > > > > > > The behavior of table aggregates is most like > > > > GroupReduceFunction > > > > > > did, > > > > > > > which computed for a group of elements, and output a group of > > > > > elements. > > > > > > > The TableAggregateFunction can be applied on > > > GroupedTable.flatAgg() . > > > > > The > > > > > > > interface of TableAggregateFunction has a lot of content, so I > > > don't > > > > > copy > > > > > > > it here, Please look at the detail in google doc: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > > > > > > > > > I will be very appreciate to anyone for reviewing and > commenting. > > > > > > > > > > > > > > Best, > > > > > > > Jincheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi Jincheng,
I said before, that I think that the append() method is better than implicitly forwarding keys, but still, I believe it adds unnecessary boiler plate code. Moreover, I haven't seen a convincing argument why map(Expression*) is worse than map(Expression). In either case we need to do all kinds of checks to prevent invalid use of functions. If the method is not correctly used, we can emit a good error message and documenting map(Expression*) will be easier than map(append(Expression*)), in my opinion. I think we should not add unnessary syntax unless there is a good reason and to be honest, I haven't seen this reason yet. Regarding the groupBy.agg() method, I think it should behave just like any other method, i.e., not do any implicit forwarding. Let's take the example of the windowed group by, that you posted before. tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) // 'w should be a group key. .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) .select('k1, 'col1, 'w.rowtime as 'rtime) What happens if 'w.rowtime is not selected? What is the data type of the field 'w in the resulting Table? Is it a regular field at all or just a system field that disappears if it is not selected? IMO, the following syntax is shorter, more explicit, and better aligned with the regular window.groupBy.select aggregations that are supported today. tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) // 'w should be a group key. .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) Best, Fabian Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < [hidden email]>: > Hi Fabian/Xiaowei, > > I am very sorry for my late reply! Glad to see your reply, and sounds > pretty good! > I agree that the approach with append() which can clearly defined the > result schema is better which Fabian mentioned. > In addition and append() and also contains non-time attributes, e.g.: > > tab('name, 'age, 'address, 'rowtime) > tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, > 'address, 'rowtime) > .window(Tumble over 5.millis on 'rowtime as 'w) > .groupBy('w, 'address) > > In this way the append() is very useful, and the behavior is very similar > to withForwardedFields() in DataSet. > So +1 to using append() approach for the map()&flatmap()! > > But how about the agg() and flatAgg()? In agg/flatAgg case I agree > Xiaowei's approach that define the keys to be implied in the result table > and appears at the beginning, for example as follows: > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > .select('k1, 'col1, 'w.rowtime as 'rtime) > > What to you think? @Fabian @Xiaowei > > Thanks, > Jincheng > > Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: > > > Hi Jincheng, > > > > Thanks for the summary! > > I like the approach with append() better than the implicit forwarding as > it > > clearly indicates which fields are forwarded. > > However, I don't see much benefit over the flatMap(Expression*) variant, > as > > we would still need to analyze the full expression tree to ensure that at > > most (or exactly?) one Scalar / TableFunction is used. > > > > Best, > > Fabian > > > > Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < > > [hidden email]>: > > > > > Hi all, > > > > > > We are discussing very detailed content about this proposal. We are > > trying > > > to design the API in many aspects (functionality, compatibility, ease > of > > > use, etc.). I think this is a very good process. Only such a detailed > > > discussion, In order to develop PR more clearly and smoothly in the > later > > > stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot of > > > good ideas. > > > About the definition of method signatures I want to share my points > here > > > which I am discussing with fabian in google doc (not yet completed), as > > > follows: > > > > > > Assume we have a table: > > > val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, > > > 'proctime.proctime) > > > > > > Approach 1: > > > case1: Map follows Source Table > > > val result = > > > tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied > in > > > the output > > > .window(Tumble over 5.millis on 'proctime as 'w) > > > > > > case2: FatAgg follows Window (Fabian mentioned above) > > > val result = > > > tab.window(Tumble ... as 'w) > > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > > .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > > > .select('k1, 'col1, 'w.rowtime as 'rtime) > > > > > > Approach 2: Similar to Fabian‘s approach, which the result schema would > > be > > > clearly defined, but add a built-in append UDF. That make > > > map/flatmap/agg/flatAgg interface only accept one Expression. > > > val result = > > > tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, > > > 'long, 'proctime) > > > .window(Tumble over 5.millis on 'proctime as 'w) > > > > > > Note: Append is a special UDF for built-in that can pass through any > > > column. > > > > > > So, May be we can defined the as table.map(Expression) first, If > > > necessary, we can extend to table.map(Expression*) in the future ? Of > > > course, I also hope that we can do more perfection in this proposal > > through > > > discussion. > > > > > > Thanks, > > > Jincheng > > > > > > > > > > > > > > > > > > Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: > > > > > > > Hi Fabian, > > > > > > > > I think that the key question you raised is if we allow extra > > parameters > > > in > > > > the methods map/flatMap/agg/flatAgg. I can see why allowing that may > > > appear > > > > more convenient in some cases. However, it might also cause some > > > confusions > > > > if we do that. For example, do we allow multiple UDFs in these > > > expressions? > > > > If we do, the semantics may be weird to define, e.g. what does > > > > table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even > > though > > > > not allowing it may appear less powerful, but it can make things more > > > > intuitive too. In the case of agg/flatAgg, we can define the keys to > be > > > > implied in the result table and appears at the beginning. You can > use a > > > > select method if you want to modify this behavior. I think that > > > eventually > > > > we will have some API which allows other expressions as additional > > > > parameters, but I think it's better to do that after we introduce the > > > > concept of nested tables. A lot of things we suggested here can be > > > > considered as special cases of that. But things are much simpler if > we > > > > leave that to later. > > > > > > > > Regards, > > > > Xiaowei > > > > > > > > On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> > > wrote: > > > > > > > > > Hi, > > > > > > > > > > * Re emit: > > > > > I think we should start with a well understood semantics of full > > > > > replacement. This is how the other agg functions work. > > > > > As was said before, there are open questions regarding an append > mode > > > > > (checkpointing, whether supporting retractions or not and if yes > how > > to > > > > > declare them, ...). > > > > > Since this seems to be an optimization, I'd postpone it. > > > > > > > > > > * Re grouping keys: > > > > > I don't think we should automatically add them because the result > > > schema > > > > > would not be intuitive. > > > > > Would they be added at the beginning of the tuple or at the end? > What > > > > > metadata fields of windows would be added? In which order would > they > > be > > > > > added? > > > > > > > > > > However, we could support syntax like this: > > > > > val t: Table = ??? > > > > > t > > > > > .window(Tumble ... as 'w) > > > > > .groupBy('a, 'b) > > > > > .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as > > > 'rtime) > > > > > > > > > > The result schema would be clearly defined as [b, a, f1, f2, ..., > fn, > > > > wend, > > > > > rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > > > > > > > > > > * Re Multi-staged evaluation: > > > > > I think this should be an optimization that can be applied if the > UDF > > > > > implements the merge() method. > > > > > > > > > > Best, Fabian > > > > > > > > > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > > > > > [hidden email] > > > > > >: > > > > > > > > > > > Hi xiaowei, > > > > > > > > > > > > Yes, I agree with you that the semantics of > TableAggregateFunction > > > emit > > > > > is > > > > > > much more complex than AggregateFunction. The fundamental > > difference > > > is > > > > > > that TableAggregateFunction emits a "table" while > AggregateFunction > > > > > outputs > > > > > > (a column of) a "row". In the case of AggregateFunction it only > has > > > one > > > > > > mode which is “replacing” (complete update). But for > > > > > > TableAggregateFunction, it could be incremental (only emit the > new > > > > > updated > > > > > > results) update or complete update (always emit the entire table > > when > > > > > > “emit" is triggered). From the performance perspective, we might > > > want > > > > to > > > > > > use incremental update. But we need review and design this > > carefully, > > > > > > especially taking into account the cases of the failover (instead > > of > > > > just > > > > > > back-up the ACC it may also needs to remember the emit offset) > and > > > > > > retractions, as the semantics of TableAggregateFunction emit are > > > > > different > > > > > > than other UDFs. TableFunction also emits a table, but it does > not > > > need > > > > > to > > > > > > worry this due to the nature of stateless. > > > > > > > > > > > > Regards, > > > > > > Shaoxuan > > > > > > > > > > > > > > > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email] > > > > > > wrote: > > > > > > > > > > > > > Hi Jincheng, > > > > > > > > > > > > > > Thanks for adding the public interfaces! I think that it's a > very > > > > good > > > > > > > start. There are a few points that we need to have more > > > discussions. > > > > > > > > > > > > > > - TableAggregateFunction - this is a very complex beast, > > > > definitely > > > > > > the > > > > > > > most complex user defined objects we introduced so far. I > > think > > > > > there > > > > > > > are > > > > > > > quite some interesting questions here. For example, do we > > allow > > > > > > > multi-staged TableAggregate in this case? What is the > > semantics > > > of > > > > > > > emit? Is > > > > > > > it amendments to the previous output, or replacing it? I > think > > > > that > > > > > > this > > > > > > > subject itself is worth a discussion to make sure we get the > > > > details > > > > > > > right. > > > > > > > - GroupedTable.agg - does the group keys automatically > appear > > in > > > > the > > > > > > > output? how about the case of windowing aggregation? > > > > > > > > > > > > > > Regards, > > > > > > > Xiaowei > > > > > > > > > > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, Xiaowei, > > > > > > > > > > > > > > > > Thanks for bring up the discuss of Table API Enhancement > > Outline > > > ! > > > > > > > > > > > > > > > > I quickly looked at the overall content, these are good > > > expressions > > > > > of > > > > > > > our > > > > > > > > offline discussions. But from the points of my view, we > should > > > add > > > > > the > > > > > > > > usage of public interfaces that we will introduce in this > > > propose. > > > > > > So, I > > > > > > > > added the following usage description of interface and > > operators > > > > in > > > > > > > > google doc: > > > > > > > > > > > > > > > > 1. Map Operator > > > > > > > > Map operator is a new operator of Table, Map operator can > > > > apply a > > > > > > > > scalar function, and can return multi-column. The usage as > > > follows: > > > > > > > > > > > > > > > > val res = tab > > > > > > > > .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > > > > > 2. FlatMap Operator > > > > > > > > FaltMap operator is a new operator of Table, FlatMap > > operator > > > > can > > > > > > > apply > > > > > > > > a table function, and can return multi-row. The usage as > > follows: > > > > > > > > > > > > > > > > val res = tab > > > > > > > > .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > > > > > 3. Agg Operator > > > > > > > > Agg operator is a new operator of Table/GroupedTable, Agg > > > > > operator > > > > > > > can > > > > > > > > apply a aggregate function, and can return multi-column. The > > > usage > > > > as > > > > > > > > follows: > > > > > > > > > > > > > > > > val res = tab > > > > > > > > .groupBy(‘a) // leave groupBy-Clause out to define > global > > > > > > > aggregates > > > > > > > > .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > > > > > 4. FlatAgg Operator > > > > > > > > FlatAgg operator is a new operator of Table/GroupedTable, > > > > FaltAgg > > > > > > > > operator can apply a table aggregate function, and can return > > > > > > multi-row. > > > > > > > > The usage as follows: > > > > > > > > > > > > > > > > val res = tab > > > > > > > > .groupBy(‘a) // leave groupBy-Clause out to define > > global > > > > > table > > > > > > > > aggregates > > > > > > > > .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > > > > > > > .select(‘a, ‘c) > > > > > > > > > > > > > > > > 5. TableAggregateFunction > > > > > > > > The behavior of table aggregates is most like > > > > > GroupReduceFunction > > > > > > > did, > > > > > > > > which computed for a group of elements, and output a group > of > > > > > > elements. > > > > > > > > The TableAggregateFunction can be applied on > > > > GroupedTable.flatAgg() . > > > > > > The > > > > > > > > interface of TableAggregateFunction has a lot of content, so > I > > > > don't > > > > > > copy > > > > > > > > it here, Please look at the detail in google doc: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > > > > > > > > > > > > > > > I will be very appreciate to anyone for reviewing and > > commenting. > > > > > > > > > > > > > > > > Best, > > > > > > > > Jincheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi,
Isn’t the problem of multiple expressions limited only to `flat***` functions and to be more specific only to having two (or more) different table functions passed as an expressions? `.flatAgg(TableAggA('a), scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined (duplicate result of every scalar function to every record. Or am I missing something? Another remark, I would be in favour of not using abbreviations and naming `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. Piotrek > On 15 Nov 2018, at 14:15, Fabian Hueske <[hidden email]> wrote: > > Hi Jincheng, > > I said before, that I think that the append() method is better than > implicitly forwarding keys, but still, I believe it adds unnecessary boiler > plate code. > > Moreover, I haven't seen a convincing argument why map(Expression*) is > worse than map(Expression). In either case we need to do all kinds of > checks to prevent invalid use of functions. > If the method is not correctly used, we can emit a good error message and > documenting map(Expression*) will be easier than map(append(Expression*)), > in my opinion. > I think we should not add unnessary syntax unless there is a good reason > and to be honest, I haven't seen this reason yet. > > Regarding the groupBy.agg() method, I think it should behave just like any > other method, i.e., not do any implicit forwarding. > Let's take the example of the windowed group by, that you posted before. > > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > .select('k1, 'col1, 'w.rowtime as 'rtime) > > What happens if 'w.rowtime is not selected? What is the data type of the > field 'w in the resulting Table? Is it a regular field at all or just a > system field that disappears if it is not selected? > > IMO, the following syntax is shorter, more explicit, and better aligned > with the regular window.groupBy.select aggregations that are supported > today. > > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) > > > Best, Fabian > > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < > [hidden email]>: > >> Hi Fabian/Xiaowei, >> >> I am very sorry for my late reply! Glad to see your reply, and sounds >> pretty good! >> I agree that the approach with append() which can clearly defined the >> result schema is better which Fabian mentioned. >> In addition and append() and also contains non-time attributes, e.g.: >> >> tab('name, 'age, 'address, 'rowtime) >> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >> 'address, 'rowtime) >> .window(Tumble over 5.millis on 'rowtime as 'w) >> .groupBy('w, 'address) >> >> In this way the append() is very useful, and the behavior is very similar >> to withForwardedFields() in DataSet. >> So +1 to using append() approach for the map()&flatmap()! >> >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree >> Xiaowei's approach that define the keys to be implied in the result table >> and appears at the beginning, for example as follows: >> tab.window(Tumble ... as 'w) >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >> What to you think? @Fabian @Xiaowei >> >> Thanks, >> Jincheng >> >> Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: >> >>> Hi Jincheng, >>> >>> Thanks for the summary! >>> I like the approach with append() better than the implicit forwarding as >> it >>> clearly indicates which fields are forwarded. >>> However, I don't see much benefit over the flatMap(Expression*) variant, >> as >>> we would still need to analyze the full expression tree to ensure that at >>> most (or exactly?) one Scalar / TableFunction is used. >>> >>> Best, >>> Fabian >>> >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >>> [hidden email]>: >>> >>>> Hi all, >>>> >>>> We are discussing very detailed content about this proposal. We are >>> trying >>>> to design the API in many aspects (functionality, compatibility, ease >> of >>>> use, etc.). I think this is a very good process. Only such a detailed >>>> discussion, In order to develop PR more clearly and smoothly in the >> later >>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot of >>>> good ideas. >>>> About the definition of method signatures I want to share my points >> here >>>> which I am discussing with fabian in google doc (not yet completed), as >>>> follows: >>>> >>>> Assume we have a table: >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, >>>> 'proctime.proctime) >>>> >>>> Approach 1: >>>> case1: Map follows Source Table >>>> val result = >>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied >> in >>>> the output >>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>> >>>> case2: FatAgg follows Window (Fabian mentioned above) >>>> val result = >>>> tab.window(Tumble ... as 'w) >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>> >>>> Approach 2: Similar to Fabian‘s approach, which the result schema would >>> be >>>> clearly defined, but add a built-in append UDF. That make >>>> map/flatmap/agg/flatAgg interface only accept one Expression. >>>> val result = >>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, >>>> 'long, 'proctime) >>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>> >>>> Note: Append is a special UDF for built-in that can pass through any >>>> column. >>>> >>>> So, May be we can defined the as table.map(Expression) first, If >>>> necessary, we can extend to table.map(Expression*) in the future ? Of >>>> course, I also hope that we can do more perfection in this proposal >>> through >>>> discussion. >>>> >>>> Thanks, >>>> Jincheng >>>> >>>> >>>> >>>> >>>> >>>> Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: >>>> >>>>> Hi Fabian, >>>>> >>>>> I think that the key question you raised is if we allow extra >>> parameters >>>> in >>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that may >>>> appear >>>>> more convenient in some cases. However, it might also cause some >>>> confusions >>>>> if we do that. For example, do we allow multiple UDFs in these >>>> expressions? >>>>> If we do, the semantics may be weird to define, e.g. what does >>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even >>> though >>>>> not allowing it may appear less powerful, but it can make things more >>>>> intuitive too. In the case of agg/flatAgg, we can define the keys to >> be >>>>> implied in the result table and appears at the beginning. You can >> use a >>>>> select method if you want to modify this behavior. I think that >>>> eventually >>>>> we will have some API which allows other expressions as additional >>>>> parameters, but I think it's better to do that after we introduce the >>>>> concept of nested tables. A lot of things we suggested here can be >>>>> considered as special cases of that. But things are much simpler if >> we >>>>> leave that to later. >>>>> >>>>> Regards, >>>>> Xiaowei >>>>> >>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> >>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> * Re emit: >>>>>> I think we should start with a well understood semantics of full >>>>>> replacement. This is how the other agg functions work. >>>>>> As was said before, there are open questions regarding an append >> mode >>>>>> (checkpointing, whether supporting retractions or not and if yes >> how >>> to >>>>>> declare them, ...). >>>>>> Since this seems to be an optimization, I'd postpone it. >>>>>> >>>>>> * Re grouping keys: >>>>>> I don't think we should automatically add them because the result >>>> schema >>>>>> would not be intuitive. >>>>>> Would they be added at the beginning of the tuple or at the end? >> What >>>>>> metadata fields of windows would be added? In which order would >> they >>> be >>>>>> added? >>>>>> >>>>>> However, we could support syntax like this: >>>>>> val t: Table = ??? >>>>>> t >>>>>> .window(Tumble ... as 'w) >>>>>> .groupBy('a, 'b) >>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as >>>> 'rtime) >>>>>> >>>>>> The result schema would be clearly defined as [b, a, f1, f2, ..., >> fn, >>>>> wend, >>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. >>>>>> >>>>>> * Re Multi-staged evaluation: >>>>>> I think this should be an optimization that can be applied if the >> UDF >>>>>> implements the merge() method. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >>>>>> [hidden email] >>>>>>> : >>>>>> >>>>>>> Hi xiaowei, >>>>>>> >>>>>>> Yes, I agree with you that the semantics of >> TableAggregateFunction >>>> emit >>>>>> is >>>>>>> much more complex than AggregateFunction. The fundamental >>> difference >>>> is >>>>>>> that TableAggregateFunction emits a "table" while >> AggregateFunction >>>>>> outputs >>>>>>> (a column of) a "row". In the case of AggregateFunction it only >> has >>>> one >>>>>>> mode which is “replacing” (complete update). But for >>>>>>> TableAggregateFunction, it could be incremental (only emit the >> new >>>>>> updated >>>>>>> results) update or complete update (always emit the entire table >>> when >>>>>>> “emit" is triggered). From the performance perspective, we might >>>> want >>>>> to >>>>>>> use incremental update. But we need review and design this >>> carefully, >>>>>>> especially taking into account the cases of the failover (instead >>> of >>>>> just >>>>>>> back-up the ACC it may also needs to remember the emit offset) >> and >>>>>>> retractions, as the semantics of TableAggregateFunction emit are >>>>>> different >>>>>>> than other UDFs. TableFunction also emits a table, but it does >> not >>>> need >>>>>> to >>>>>>> worry this due to the nature of stateless. >>>>>>> >>>>>>> Regards, >>>>>>> Shaoxuan >>>>>>> >>>>>>> >>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email] >>> >>>>> wrote: >>>>>>> >>>>>>>> Hi Jincheng, >>>>>>>> >>>>>>>> Thanks for adding the public interfaces! I think that it's a >> very >>>>> good >>>>>>>> start. There are a few points that we need to have more >>>> discussions. >>>>>>>> >>>>>>>> - TableAggregateFunction - this is a very complex beast, >>>>> definitely >>>>>>> the >>>>>>>> most complex user defined objects we introduced so far. I >>> think >>>>>> there >>>>>>>> are >>>>>>>> quite some interesting questions here. For example, do we >>> allow >>>>>>>> multi-staged TableAggregate in this case? What is the >>> semantics >>>> of >>>>>>>> emit? Is >>>>>>>> it amendments to the previous output, or replacing it? I >> think >>>>> that >>>>>>> this >>>>>>>> subject itself is worth a discussion to make sure we get the >>>>> details >>>>>>>> right. >>>>>>>> - GroupedTable.agg - does the group keys automatically >> appear >>> in >>>>> the >>>>>>>> output? how about the case of windowing aggregation? >>>>>>>> >>>>>>>> Regards, >>>>>>>> Xiaowei >>>>>>>> >>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >>>>> [hidden email]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, Xiaowei, >>>>>>>>> >>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >>> Outline >>>> ! >>>>>>>>> >>>>>>>>> I quickly looked at the overall content, these are good >>>> expressions >>>>>> of >>>>>>>> our >>>>>>>>> offline discussions. But from the points of my view, we >> should >>>> add >>>>>> the >>>>>>>>> usage of public interfaces that we will introduce in this >>>> propose. >>>>>>> So, I >>>>>>>>> added the following usage description of interface and >>> operators >>>>> in >>>>>>>>> google doc: >>>>>>>>> >>>>>>>>> 1. Map Operator >>>>>>>>> Map operator is a new operator of Table, Map operator can >>>>> apply a >>>>>>>>> scalar function, and can return multi-column. The usage as >>>> follows: >>>>>>>>> >>>>>>>>> val res = tab >>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >>>>>>>>> .select(‘a, ‘c) >>>>>>>>> >>>>>>>>> 2. FlatMap Operator >>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >>> operator >>>>> can >>>>>>>> apply >>>>>>>>> a table function, and can return multi-row. The usage as >>> follows: >>>>>>>>> >>>>>>>>> val res = tab >>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >>>>>>>>> .select(‘a, ‘c) >>>>>>>>> >>>>>>>>> 3. Agg Operator >>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg >>>>>> operator >>>>>>>> can >>>>>>>>> apply a aggregate function, and can return multi-column. The >>>> usage >>>>> as >>>>>>>>> follows: >>>>>>>>> >>>>>>>>> val res = tab >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> global >>>>>>>> aggregates >>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>> .select(‘a, ‘c) >>>>>>>>> >>>>>>>>> 4. FlatAgg Operator >>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, >>>>> FaltAgg >>>>>>>>> operator can apply a table aggregate function, and can return >>>>>>> multi-row. >>>>>>>>> The usage as follows: >>>>>>>>> >>>>>>>>> val res = tab >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >>> global >>>>>> table >>>>>>>>> aggregates >>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>> .select(‘a, ‘c) >>>>>>>>> >>>>>>>>> 5. TableAggregateFunction >>>>>>>>> The behavior of table aggregates is most like >>>>>> GroupReduceFunction >>>>>>>> did, >>>>>>>>> which computed for a group of elements, and output a group >> of >>>>>>> elements. >>>>>>>>> The TableAggregateFunction can be applied on >>>>> GroupedTable.flatAgg() . >>>>>>> The >>>>>>>>> interface of TableAggregateFunction has a lot of content, so >> I >>>>> don't >>>>>>> copy >>>>>>>>> it here, Please look at the detail in google doc: >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >>>>>>>>> >>>>>>>>> I will be very appreciate to anyone for reviewing and >>> commenting. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jincheng >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> |
Hi Fabian & Piotr, thanks for the feedback!
I appreciate your concerns, both on timestamp attributes as well as on implicit group keys. At the same time, I'm also concerned with the proposed approach of allowing Expression* as parameters, especially for flatMap/flatAgg. So far, we never allowed a scalar expression to appear together with table expressions. With the Expression* approach, this will happen for the parameters to flatMap/flatAgg. I'm a bit concerned on if we fully understand the consequences when we try to extend our system in the future. I would be extra cautious in doing this. To avoid this, I think an implicit group key for flatAgg is safer. For flatMap, if users want to keep the rowtime column, he can use crossApply/join instead. So we are not losing any real functionality here. Also a clarification on the following example: tab.window(Tumble ... as 'w) .groupBy('w, 'k1, 'k2) // 'w should be a group key. .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) .select('k1, 'col1, 'w.rowtime as 'rtime) If we did not have the select clause in this example, we will have 'w as a regular column in the output. It should not magically disappear. The concern is not as strong for Table.map/Table.agg because we are not mixing scalar and table expressions. But we also want to be a bit consistent with these methods. If we used implicit group keys for Table.flatAgg, we probably should do the same for Table.agg. Now we only have to choose what to do with Table.map. I can see good arguments from both sides. But starting with a single Expression seems safer because that we can always extend to Expression* in the future. While thinking about this problem, it appears that we may need more work in our handling of watermarks for SQL/Table API. Our current way of propagating the watermarks from source all the way to sink might not be optimal. For example, after a tumbling window, the watermark can actually be advanced to just before the expiring of next window. I think that in general, each operator may need to generate new watermarks instead of simply propagating them. Once we accept that watermarks may change during the execution, it appears that the timestamp columns may also change, as long as we have some way to associate watermark with it. My intuition is that once we have a through solution for the watermark issue, we may be able to solve the problem we encountered for Table.map in a cleaner way. But this is a complex issue which deserves a discussion on its own. Regards, Xiaowei On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <[hidden email]> wrote: > Hi, > > Isn’t the problem of multiple expressions limited only to `flat***` > functions and to be more specific only to having two (or more) different > table functions passed as an expressions? `.flatAgg(TableAggA('a), > scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined > (duplicate result of every scalar function to every record. Or am I missing > something? > > Another remark, I would be in favour of not using abbreviations and naming > `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. > > Piotrek > > > On 15 Nov 2018, at 14:15, Fabian Hueske <[hidden email]> wrote: > > > > Hi Jincheng, > > > > I said before, that I think that the append() method is better than > > implicitly forwarding keys, but still, I believe it adds unnecessary > boiler > > plate code. > > > > Moreover, I haven't seen a convincing argument why map(Expression*) is > > worse than map(Expression). In either case we need to do all kinds of > > checks to prevent invalid use of functions. > > If the method is not correctly used, we can emit a good error message and > > documenting map(Expression*) will be easier than > map(append(Expression*)), > > in my opinion. > > I think we should not add unnessary syntax unless there is a good reason > > and to be honest, I haven't seen this reason yet. > > > > Regarding the groupBy.agg() method, I think it should behave just like > any > > other method, i.e., not do any implicit forwarding. > > Let's take the example of the windowed group by, that you posted before. > > > > tab.window(Tumble ... as 'w) > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > > .select('k1, 'col1, 'w.rowtime as 'rtime) > > > > What happens if 'w.rowtime is not selected? What is the data type of the > > field 'w in the resulting Table? Is it a regular field at all or just a > > system field that disappears if it is not selected? > > > > IMO, the following syntax is shorter, more explicit, and better aligned > > with the regular window.groupBy.select aggregations that are supported > > today. > > > > tab.window(Tumble ... as 'w) > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) > > > > > > Best, Fabian > > > > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < > > [hidden email]>: > > > >> Hi Fabian/Xiaowei, > >> > >> I am very sorry for my late reply! Glad to see your reply, and sounds > >> pretty good! > >> I agree that the approach with append() which can clearly defined the > >> result schema is better which Fabian mentioned. > >> In addition and append() and also contains non-time attributes, e.g.: > >> > >> tab('name, 'age, 'address, 'rowtime) > >> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, > >> 'address, 'rowtime) > >> .window(Tumble over 5.millis on 'rowtime as 'w) > >> .groupBy('w, 'address) > >> > >> In this way the append() is very useful, and the behavior is very > similar > >> to withForwardedFields() in DataSet. > >> So +1 to using append() approach for the map()&flatmap()! > >> > >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree > >> Xiaowei's approach that define the keys to be implied in the result > table > >> and appears at the beginning, for example as follows: > >> tab.window(Tumble ... as 'w) > >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >> .select('k1, 'col1, 'w.rowtime as 'rtime) > >> > >> What to you think? @Fabian @Xiaowei > >> > >> Thanks, > >> Jincheng > >> > >> Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: > >> > >>> Hi Jincheng, > >>> > >>> Thanks for the summary! > >>> I like the approach with append() better than the implicit forwarding > as > >> it > >>> clearly indicates which fields are forwarded. > >>> However, I don't see much benefit over the flatMap(Expression*) > variant, > >> as > >>> we would still need to analyze the full expression tree to ensure that > at > >>> most (or exactly?) one Scalar / TableFunction is used. > >>> > >>> Best, > >>> Fabian > >>> > >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < > >>> [hidden email]>: > >>> > >>>> Hi all, > >>>> > >>>> We are discussing very detailed content about this proposal. We are > >>> trying > >>>> to design the API in many aspects (functionality, compatibility, ease > >> of > >>>> use, etc.). I think this is a very good process. Only such a detailed > >>>> discussion, In order to develop PR more clearly and smoothly in the > >> later > >>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot > of > >>>> good ideas. > >>>> About the definition of method signatures I want to share my points > >> here > >>>> which I am discussing with fabian in google doc (not yet completed), > as > >>>> follows: > >>>> > >>>> Assume we have a table: > >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, > >>>> 'proctime.proctime) > >>>> > >>>> Approach 1: > >>>> case1: Map follows Source Table > >>>> val result = > >>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied > >> in > >>>> the output > >>>> .window(Tumble over 5.millis on 'proctime as 'w) > >>>> > >>>> case2: FatAgg follows Window (Fabian mentioned above) > >>>> val result = > >>>> tab.window(Tumble ... as 'w) > >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>> > >>>> Approach 2: Similar to Fabian‘s approach, which the result schema > would > >>> be > >>>> clearly defined, but add a built-in append UDF. That make > >>>> map/flatmap/agg/flatAgg interface only accept one Expression. > >>>> val result = > >>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, > >>>> 'long, 'proctime) > >>>> .window(Tumble over 5.millis on 'proctime as 'w) > >>>> > >>>> Note: Append is a special UDF for built-in that can pass through any > >>>> column. > >>>> > >>>> So, May be we can defined the as table.map(Expression) first, If > >>>> necessary, we can extend to table.map(Expression*) in the future ? > Of > >>>> course, I also hope that we can do more perfection in this proposal > >>> through > >>>> discussion. > >>>> > >>>> Thanks, > >>>> Jincheng > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: > >>>> > >>>>> Hi Fabian, > >>>>> > >>>>> I think that the key question you raised is if we allow extra > >>> parameters > >>>> in > >>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that may > >>>> appear > >>>>> more convenient in some cases. However, it might also cause some > >>>> confusions > >>>>> if we do that. For example, do we allow multiple UDFs in these > >>>> expressions? > >>>>> If we do, the semantics may be weird to define, e.g. what does > >>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even > >>> though > >>>>> not allowing it may appear less powerful, but it can make things more > >>>>> intuitive too. In the case of agg/flatAgg, we can define the keys to > >> be > >>>>> implied in the result table and appears at the beginning. You can > >> use a > >>>>> select method if you want to modify this behavior. I think that > >>>> eventually > >>>>> we will have some API which allows other expressions as additional > >>>>> parameters, but I think it's better to do that after we introduce the > >>>>> concept of nested tables. A lot of things we suggested here can be > >>>>> considered as special cases of that. But things are much simpler if > >> we > >>>>> leave that to later. > >>>>> > >>>>> Regards, > >>>>> Xiaowei > >>>>> > >>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> > >>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> * Re emit: > >>>>>> I think we should start with a well understood semantics of full > >>>>>> replacement. This is how the other agg functions work. > >>>>>> As was said before, there are open questions regarding an append > >> mode > >>>>>> (checkpointing, whether supporting retractions or not and if yes > >> how > >>> to > >>>>>> declare them, ...). > >>>>>> Since this seems to be an optimization, I'd postpone it. > >>>>>> > >>>>>> * Re grouping keys: > >>>>>> I don't think we should automatically add them because the result > >>>> schema > >>>>>> would not be intuitive. > >>>>>> Would they be added at the beginning of the tuple or at the end? > >> What > >>>>>> metadata fields of windows would be added? In which order would > >> they > >>> be > >>>>>> added? > >>>>>> > >>>>>> However, we could support syntax like this: > >>>>>> val t: Table = ??? > >>>>>> t > >>>>>> .window(Tumble ... as 'w) > >>>>>> .groupBy('a, 'b) > >>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as > >>>> 'rtime) > >>>>>> > >>>>>> The result schema would be clearly defined as [b, a, f1, f2, ..., > >> fn, > >>>>> wend, > >>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > >>>>>> > >>>>>> * Re Multi-staged evaluation: > >>>>>> I think this should be an optimization that can be applied if the > >> UDF > >>>>>> implements the merge() method. > >>>>>> > >>>>>> Best, Fabian > >>>>>> > >>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > >>>>>> [hidden email] > >>>>>>> : > >>>>>> > >>>>>>> Hi xiaowei, > >>>>>>> > >>>>>>> Yes, I agree with you that the semantics of > >> TableAggregateFunction > >>>> emit > >>>>>> is > >>>>>>> much more complex than AggregateFunction. The fundamental > >>> difference > >>>> is > >>>>>>> that TableAggregateFunction emits a "table" while > >> AggregateFunction > >>>>>> outputs > >>>>>>> (a column of) a "row". In the case of AggregateFunction it only > >> has > >>>> one > >>>>>>> mode which is “replacing” (complete update). But for > >>>>>>> TableAggregateFunction, it could be incremental (only emit the > >> new > >>>>>> updated > >>>>>>> results) update or complete update (always emit the entire table > >>> when > >>>>>>> “emit" is triggered). From the performance perspective, we might > >>>> want > >>>>> to > >>>>>>> use incremental update. But we need review and design this > >>> carefully, > >>>>>>> especially taking into account the cases of the failover (instead > >>> of > >>>>> just > >>>>>>> back-up the ACC it may also needs to remember the emit offset) > >> and > >>>>>>> retractions, as the semantics of TableAggregateFunction emit are > >>>>>> different > >>>>>>> than other UDFs. TableFunction also emits a table, but it does > >> not > >>>> need > >>>>>> to > >>>>>>> worry this due to the nature of stateless. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Shaoxuan > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email] > >>> > >>>>> wrote: > >>>>>>> > >>>>>>>> Hi Jincheng, > >>>>>>>> > >>>>>>>> Thanks for adding the public interfaces! I think that it's a > >> very > >>>>> good > >>>>>>>> start. There are a few points that we need to have more > >>>> discussions. > >>>>>>>> > >>>>>>>> - TableAggregateFunction - this is a very complex beast, > >>>>> definitely > >>>>>>> the > >>>>>>>> most complex user defined objects we introduced so far. I > >>> think > >>>>>> there > >>>>>>>> are > >>>>>>>> quite some interesting questions here. For example, do we > >>> allow > >>>>>>>> multi-staged TableAggregate in this case? What is the > >>> semantics > >>>> of > >>>>>>>> emit? Is > >>>>>>>> it amendments to the previous output, or replacing it? I > >> think > >>>>> that > >>>>>>> this > >>>>>>>> subject itself is worth a discussion to make sure we get the > >>>>> details > >>>>>>>> right. > >>>>>>>> - GroupedTable.agg - does the group keys automatically > >> appear > >>> in > >>>>> the > >>>>>>>> output? how about the case of windowing aggregation? > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Xiaowei > >>>>>>>> > >>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > >>>>> [hidden email]> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, Xiaowei, > >>>>>>>>> > >>>>>>>>> Thanks for bring up the discuss of Table API Enhancement > >>> Outline > >>>> ! > >>>>>>>>> > >>>>>>>>> I quickly looked at the overall content, these are good > >>>> expressions > >>>>>> of > >>>>>>>> our > >>>>>>>>> offline discussions. But from the points of my view, we > >> should > >>>> add > >>>>>> the > >>>>>>>>> usage of public interfaces that we will introduce in this > >>>> propose. > >>>>>>> So, I > >>>>>>>>> added the following usage description of interface and > >>> operators > >>>>> in > >>>>>>>>> google doc: > >>>>>>>>> > >>>>>>>>> 1. Map Operator > >>>>>>>>> Map operator is a new operator of Table, Map operator can > >>>>> apply a > >>>>>>>>> scalar function, and can return multi-column. The usage as > >>>> follows: > >>>>>>>>> > >>>>>>>>> val res = tab > >>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > >>>>>>>>> .select(‘a, ‘c) > >>>>>>>>> > >>>>>>>>> 2. FlatMap Operator > >>>>>>>>> FaltMap operator is a new operator of Table, FlatMap > >>> operator > >>>>> can > >>>>>>>> apply > >>>>>>>>> a table function, and can return multi-row. The usage as > >>> follows: > >>>>>>>>> > >>>>>>>>> val res = tab > >>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > >>>>>>>>> .select(‘a, ‘c) > >>>>>>>>> > >>>>>>>>> 3. Agg Operator > >>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg > >>>>>> operator > >>>>>>>> can > >>>>>>>>> apply a aggregate function, and can return multi-column. The > >>>> usage > >>>>> as > >>>>>>>>> follows: > >>>>>>>>> > >>>>>>>>> val res = tab > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > >> global > >>>>>>>> aggregates > >>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > >>>>>>>>> .select(‘a, ‘c) > >>>>>>>>> > >>>>>>>>> 4. FlatAgg Operator > >>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, > >>>>> FaltAgg > >>>>>>>>> operator can apply a table aggregate function, and can return > >>>>>>> multi-row. > >>>>>>>>> The usage as follows: > >>>>>>>>> > >>>>>>>>> val res = tab > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > >>> global > >>>>>> table > >>>>>>>>> aggregates > >>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > >>>>>>>>> .select(‘a, ‘c) > >>>>>>>>> > >>>>>>>>> 5. TableAggregateFunction > >>>>>>>>> The behavior of table aggregates is most like > >>>>>> GroupReduceFunction > >>>>>>>> did, > >>>>>>>>> which computed for a group of elements, and output a group > >> of > >>>>>>> elements. > >>>>>>>>> The TableAggregateFunction can be applied on > >>>>> GroupedTable.flatAgg() . > >>>>>>> The > >>>>>>>>> interface of TableAggregateFunction has a lot of content, so > >> I > >>>>> don't > >>>>>>> copy > >>>>>>>>> it here, Please look at the detail in google doc: > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > >>>>>>>>> > >>>>>>>>> I will be very appreciate to anyone for reviewing and > >>> commenting. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Jincheng > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > |
Hi all,
Thanks all for the feedback. @Piotr About not using abbreviations naming, +1,I like your proposal!Currently both DataSet and DataStream API are using `aggregate`, BTW,I find other language also not using abbreviations naming,such as R. Sometimes the interface of the API is really difficult to perfect, we need to spend a lot of time thinking and feedback from a large number of users, and constantly improve, but for backward compatibility issues, we have to adopt the most conservative approach when designing the API(Of course, I am more in favor of developing more rich features, when we discuss clearly). Therefore, I propose to divide the function implementation of map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that support time attributes and groupKeys. We can develop the features which we have already agreed on the design. And we will continue to discuss the uncertain design. In fact, in addition to the design of APIs, there will be various performance optimization details, such as: table Aggregate function emitValue will generate multiple calculation results, in extreme cases, each record will trigger a large number of retract messages, this will have poor performance,so we will also optimize the interface design, such as adding the emitWithRetractValue interface (I have updated the google doc) to allow the user to optionally perform incremental calculations, thus avoiding a large number of retracts. Details like this are difficult to fully discuss in the mail list, so I recommend creating JIRAs/FLIP first, we develop designs that have been agreed upon and continue to discuss non-deterministic designs! What do you think? @Fabian & Piotr & XiaoWei Best, Jincheng Xiaowei Jiang <[hidden email]> 于2018年11月19日周一 上午12:07写道: > Hi Fabian & Piotr, thanks for the feedback! > > I appreciate your concerns, both on timestamp attributes as well as on > implicit group keys. At the same time, I'm also concerned with the proposed > approach of allowing Expression* as parameters, especially for > flatMap/flatAgg. So far, we never allowed a scalar expression to appear > together with table expressions. With the Expression* approach, this will > happen for the parameters to flatMap/flatAgg. I'm a bit concerned on if we > fully understand the consequences when we try to extend our system in the > future. I would be extra cautious in doing this. To avoid this, I think an > implicit group key for flatAgg is safer. For flatMap, if users want to keep > the rowtime column, he can use crossApply/join instead. So we are not > losing any real functionality here. > > Also a clarification on the following example: > tab.window(Tumble ... as 'w) > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > .select('k1, 'col1, 'w.rowtime as 'rtime) > If we did not have the select clause in this example, we will have 'w as a > regular column in the output. It should not magically disappear. > > The concern is not as strong for Table.map/Table.agg because we are not > mixing scalar and table expressions. But we also want to be a bit > consistent with these methods. If we used implicit group keys for > Table.flatAgg, we probably should do the same for Table.agg. Now we only > have to choose what to do with Table.map. I can see good arguments from > both sides. But starting with a single Expression seems safer because that > we can always extend to Expression* in the future. > > While thinking about this problem, it appears that we may need more work in > our handling of watermarks for SQL/Table API. Our current way of > propagating the watermarks from source all the way to sink might not be > optimal. For example, after a tumbling window, the watermark can actually > be advanced to just before the expiring of next window. I think that in > general, each operator may need to generate new watermarks instead of > simply propagating them. Once we accept that watermarks may change during > the execution, it appears that the timestamp columns may also change, as > long as we have some way to associate watermark with it. My intuition is > that once we have a through solution for the watermark issue, we may be > able to solve the problem we encountered for Table.map in a cleaner way. > But this is a complex issue which deserves a discussion on its own. > > Regards, > Xiaowei > > > On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <[hidden email]> > wrote: > > > Hi, > > > > Isn’t the problem of multiple expressions limited only to `flat***` > > functions and to be more specific only to having two (or more) different > > table functions passed as an expressions? `.flatAgg(TableAggA('a), > > scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined > > (duplicate result of every scalar function to every record. Or am I > missing > > something? > > > > Another remark, I would be in favour of not using abbreviations and > naming > > `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. > > > > Piotrek > > > > > On 15 Nov 2018, at 14:15, Fabian Hueske <[hidden email]> wrote: > > > > > > Hi Jincheng, > > > > > > I said before, that I think that the append() method is better than > > > implicitly forwarding keys, but still, I believe it adds unnecessary > > boiler > > > plate code. > > > > > > Moreover, I haven't seen a convincing argument why map(Expression*) is > > > worse than map(Expression). In either case we need to do all kinds of > > > checks to prevent invalid use of functions. > > > If the method is not correctly used, we can emit a good error message > and > > > documenting map(Expression*) will be easier than > > map(append(Expression*)), > > > in my opinion. > > > I think we should not add unnessary syntax unless there is a good > reason > > > and to be honest, I haven't seen this reason yet. > > > > > > Regarding the groupBy.agg() method, I think it should behave just like > > any > > > other method, i.e., not do any implicit forwarding. > > > Let's take the example of the windowed group by, that you posted > before. > > > > > > tab.window(Tumble ... as 'w) > > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > > .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > > > .select('k1, 'col1, 'w.rowtime as 'rtime) > > > > > > What happens if 'w.rowtime is not selected? What is the data type of > the > > > field 'w in the resulting Table? Is it a regular field at all or just a > > > system field that disappears if it is not selected? > > > > > > IMO, the following syntax is shorter, more explicit, and better aligned > > > with the regular window.groupBy.select aggregations that are supported > > > today. > > > > > > tab.window(Tumble ... as 'w) > > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > > .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) > > > > > > > > > Best, Fabian > > > > > > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < > > > [hidden email]>: > > > > > >> Hi Fabian/Xiaowei, > > >> > > >> I am very sorry for my late reply! Glad to see your reply, and sounds > > >> pretty good! > > >> I agree that the approach with append() which can clearly defined the > > >> result schema is better which Fabian mentioned. > > >> In addition and append() and also contains non-time attributes, e.g.: > > >> > > >> tab('name, 'age, 'address, 'rowtime) > > >> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, > > >> 'address, 'rowtime) > > >> .window(Tumble over 5.millis on 'rowtime as 'w) > > >> .groupBy('w, 'address) > > >> > > >> In this way the append() is very useful, and the behavior is very > > similar > > >> to withForwardedFields() in DataSet. > > >> So +1 to using append() approach for the map()&flatmap()! > > >> > > >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree > > >> Xiaowei's approach that define the keys to be implied in the result > > table > > >> and appears at the beginning, for example as follows: > > >> tab.window(Tumble ... as 'w) > > >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > >> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > > >> .select('k1, 'col1, 'w.rowtime as 'rtime) > > >> > > >> What to you think? @Fabian @Xiaowei > > >> > > >> Thanks, > > >> Jincheng > > >> > > >> Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: > > >> > > >>> Hi Jincheng, > > >>> > > >>> Thanks for the summary! > > >>> I like the approach with append() better than the implicit forwarding > > as > > >> it > > >>> clearly indicates which fields are forwarded. > > >>> However, I don't see much benefit over the flatMap(Expression*) > > variant, > > >> as > > >>> we would still need to analyze the full expression tree to ensure > that > > at > > >>> most (or exactly?) one Scalar / TableFunction is used. > > >>> > > >>> Best, > > >>> Fabian > > >>> > > >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < > > >>> [hidden email]>: > > >>> > > >>>> Hi all, > > >>>> > > >>>> We are discussing very detailed content about this proposal. We are > > >>> trying > > >>>> to design the API in many aspects (functionality, compatibility, > ease > > >> of > > >>>> use, etc.). I think this is a very good process. Only such a > detailed > > >>>> discussion, In order to develop PR more clearly and smoothly in the > > >> later > > >>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a lot > > of > > >>>> good ideas. > > >>>> About the definition of method signatures I want to share my points > > >> here > > >>>> which I am discussing with fabian in google doc (not yet completed), > > as > > >>>> follows: > > >>>> > > >>>> Assume we have a table: > > >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, > > >>>> 'proctime.proctime) > > >>>> > > >>>> Approach 1: > > >>>> case1: Map follows Source Table > > >>>> val result = > > >>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime > implied > > >> in > > >>>> the output > > >>>> .window(Tumble over 5.millis on 'proctime as 'w) > > >>>> > > >>>> case2: FatAgg follows Window (Fabian mentioned above) > > >>>> val result = > > >>>> tab.window(Tumble ... as 'w) > > >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > > >>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > > >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > > >>>> > > >>>> Approach 2: Similar to Fabian‘s approach, which the result schema > > would > > >>> be > > >>>> clearly defined, but add a built-in append UDF. That make > > >>>> map/flatmap/agg/flatAgg interface only accept one Expression. > > >>>> val result = > > >>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2, > > >>>> 'long, 'proctime) > > >>>> .window(Tumble over 5.millis on 'proctime as 'w) > > >>>> > > >>>> Note: Append is a special UDF for built-in that can pass through any > > >>>> column. > > >>>> > > >>>> So, May be we can defined the as table.map(Expression) first, If > > >>>> necessary, we can extend to table.map(Expression*) in the future ? > > Of > > >>>> course, I also hope that we can do more perfection in this proposal > > >>> through > > >>>> discussion. > > >>>> > > >>>> Thanks, > > >>>> Jincheng > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: > > >>>> > > >>>>> Hi Fabian, > > >>>>> > > >>>>> I think that the key question you raised is if we allow extra > > >>> parameters > > >>>> in > > >>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that > may > > >>>> appear > > >>>>> more convenient in some cases. However, it might also cause some > > >>>> confusions > > >>>>> if we do that. For example, do we allow multiple UDFs in these > > >>>> expressions? > > >>>>> If we do, the semantics may be weird to define, e.g. what does > > >>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even > > >>> though > > >>>>> not allowing it may appear less powerful, but it can make things > more > > >>>>> intuitive too. In the case of agg/flatAgg, we can define the keys > to > > >> be > > >>>>> implied in the result table and appears at the beginning. You can > > >> use a > > >>>>> select method if you want to modify this behavior. I think that > > >>>> eventually > > >>>>> we will have some API which allows other expressions as additional > > >>>>> parameters, but I think it's better to do that after we introduce > the > > >>>>> concept of nested tables. A lot of things we suggested here can be > > >>>>> considered as special cases of that. But things are much simpler if > > >> we > > >>>>> leave that to later. > > >>>>> > > >>>>> Regards, > > >>>>> Xiaowei > > >>>>> > > >>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> > > >>> wrote: > > >>>>> > > >>>>>> Hi, > > >>>>>> > > >>>>>> * Re emit: > > >>>>>> I think we should start with a well understood semantics of full > > >>>>>> replacement. This is how the other agg functions work. > > >>>>>> As was said before, there are open questions regarding an append > > >> mode > > >>>>>> (checkpointing, whether supporting retractions or not and if yes > > >> how > > >>> to > > >>>>>> declare them, ...). > > >>>>>> Since this seems to be an optimization, I'd postpone it. > > >>>>>> > > >>>>>> * Re grouping keys: > > >>>>>> I don't think we should automatically add them because the result > > >>>> schema > > >>>>>> would not be intuitive. > > >>>>>> Would they be added at the beginning of the tuple or at the end? > > >> What > > >>>>>> metadata fields of windows would be added? In which order would > > >> they > > >>> be > > >>>>>> added? > > >>>>>> > > >>>>>> However, we could support syntax like this: > > >>>>>> val t: Table = ??? > > >>>>>> t > > >>>>>> .window(Tumble ... as 'w) > > >>>>>> .groupBy('a, 'b) > > >>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as > > >>>> 'rtime) > > >>>>>> > > >>>>>> The result schema would be clearly defined as [b, a, f1, f2, ..., > > >> fn, > > >>>>> wend, > > >>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > > >>>>>> > > >>>>>> * Re Multi-staged evaluation: > > >>>>>> I think this should be an optimization that can be applied if the > > >> UDF > > >>>>>> implements the merge() method. > > >>>>>> > > >>>>>> Best, Fabian > > >>>>>> > > >>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > > >>>>>> [hidden email] > > >>>>>>> : > > >>>>>> > > >>>>>>> Hi xiaowei, > > >>>>>>> > > >>>>>>> Yes, I agree with you that the semantics of > > >> TableAggregateFunction > > >>>> emit > > >>>>>> is > > >>>>>>> much more complex than AggregateFunction. The fundamental > > >>> difference > > >>>> is > > >>>>>>> that TableAggregateFunction emits a "table" while > > >> AggregateFunction > > >>>>>> outputs > > >>>>>>> (a column of) a "row". In the case of AggregateFunction it only > > >> has > > >>>> one > > >>>>>>> mode which is “replacing” (complete update). But for > > >>>>>>> TableAggregateFunction, it could be incremental (only emit the > > >> new > > >>>>>> updated > > >>>>>>> results) update or complete update (always emit the entire table > > >>> when > > >>>>>>> “emit" is triggered). From the performance perspective, we might > > >>>> want > > >>>>> to > > >>>>>>> use incremental update. But we need review and design this > > >>> carefully, > > >>>>>>> especially taking into account the cases of the failover (instead > > >>> of > > >>>>> just > > >>>>>>> back-up the ACC it may also needs to remember the emit offset) > > >> and > > >>>>>>> retractions, as the semantics of TableAggregateFunction emit are > > >>>>>> different > > >>>>>>> than other UDFs. TableFunction also emits a table, but it does > > >> not > > >>>> need > > >>>>>> to > > >>>>>>> worry this due to the nature of stateless. > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Shaoxuan > > >>>>>>> > > >>>>>>> > > >>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <[hidden email] > > >>> > > >>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi Jincheng, > > >>>>>>>> > > >>>>>>>> Thanks for adding the public interfaces! I think that it's a > > >> very > > >>>>> good > > >>>>>>>> start. There are a few points that we need to have more > > >>>> discussions. > > >>>>>>>> > > >>>>>>>> - TableAggregateFunction - this is a very complex beast, > > >>>>> definitely > > >>>>>>> the > > >>>>>>>> most complex user defined objects we introduced so far. I > > >>> think > > >>>>>> there > > >>>>>>>> are > > >>>>>>>> quite some interesting questions here. For example, do we > > >>> allow > > >>>>>>>> multi-staged TableAggregate in this case? What is the > > >>> semantics > > >>>> of > > >>>>>>>> emit? Is > > >>>>>>>> it amendments to the previous output, or replacing it? I > > >> think > > >>>>> that > > >>>>>>> this > > >>>>>>>> subject itself is worth a discussion to make sure we get the > > >>>>> details > > >>>>>>>> right. > > >>>>>>>> - GroupedTable.agg - does the group keys automatically > > >> appear > > >>> in > > >>>>> the > > >>>>>>>> output? how about the case of windowing aggregation? > > >>>>>>>> > > >>>>>>>> Regards, > > >>>>>>>> Xiaowei > > >>>>>>>> > > >>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > > >>>>> [hidden email]> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi, Xiaowei, > > >>>>>>>>> > > >>>>>>>>> Thanks for bring up the discuss of Table API Enhancement > > >>> Outline > > >>>> ! > > >>>>>>>>> > > >>>>>>>>> I quickly looked at the overall content, these are good > > >>>> expressions > > >>>>>> of > > >>>>>>>> our > > >>>>>>>>> offline discussions. But from the points of my view, we > > >> should > > >>>> add > > >>>>>> the > > >>>>>>>>> usage of public interfaces that we will introduce in this > > >>>> propose. > > >>>>>>> So, I > > >>>>>>>>> added the following usage description of interface and > > >>> operators > > >>>>> in > > >>>>>>>>> google doc: > > >>>>>>>>> > > >>>>>>>>> 1. Map Operator > > >>>>>>>>> Map operator is a new operator of Table, Map operator can > > >>>>> apply a > > >>>>>>>>> scalar function, and can return multi-column. The usage as > > >>>> follows: > > >>>>>>>>> > > >>>>>>>>> val res = tab > > >>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > > >>>>>>>>> .select(‘a, ‘c) > > >>>>>>>>> > > >>>>>>>>> 2. FlatMap Operator > > >>>>>>>>> FaltMap operator is a new operator of Table, FlatMap > > >>> operator > > >>>>> can > > >>>>>>>> apply > > >>>>>>>>> a table function, and can return multi-row. The usage as > > >>> follows: > > >>>>>>>>> > > >>>>>>>>> val res = tab > > >>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > > >>>>>>>>> .select(‘a, ‘c) > > >>>>>>>>> > > >>>>>>>>> 3. Agg Operator > > >>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg > > >>>>>> operator > > >>>>>>>> can > > >>>>>>>>> apply a aggregate function, and can return multi-column. The > > >>>> usage > > >>>>> as > > >>>>>>>>> follows: > > >>>>>>>>> > > >>>>>>>>> val res = tab > > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > > >> global > > >>>>>>>> aggregates > > >>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > > >>>>>>>>> .select(‘a, ‘c) > > >>>>>>>>> > > >>>>>>>>> 4. FlatAgg Operator > > >>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, > > >>>>> FaltAgg > > >>>>>>>>> operator can apply a table aggregate function, and can return > > >>>>>>> multi-row. > > >>>>>>>>> The usage as follows: > > >>>>>>>>> > > >>>>>>>>> val res = tab > > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > > >>> global > > >>>>>> table > > >>>>>>>>> aggregates > > >>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > > >>>>>>>>> .select(‘a, ‘c) > > >>>>>>>>> > > >>>>>>>>> 5. TableAggregateFunction > > >>>>>>>>> The behavior of table aggregates is most like > > >>>>>> GroupReduceFunction > > >>>>>>>> did, > > >>>>>>>>> which computed for a group of elements, and output a group > > >> of > > >>>>>>> elements. > > >>>>>>>>> The TableAggregateFunction can be applied on > > >>>>> GroupedTable.flatAgg() . > > >>>>>>> The > > >>>>>>>>> interface of TableAggregateFunction has a lot of content, so > > >> I > > >>>>> don't > > >>>>>>> copy > > >>>>>>>>> it here, Please look at the detail in google doc: > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > > >>>>>>>>> > > >>>>>>>>> I will be very appreciate to anyone for reviewing and > > >>> commenting. > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Jincheng > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > |
+1. I agree that we should open the JIRAs to start the work. We may
have better ideas on the flavor of the interface when implement/review the code. Regards, shaoxuan On 11/20/18, jincheng sun <[hidden email]> wrote: > Hi all, > > Thanks all for the feedback. > > @Piotr About not using abbreviations naming, +1,I like > your proposal!Currently both DataSet and DataStream API are using > `aggregate`, > BTW,I find other language also not using abbreviations naming,such as R. > > Sometimes the interface of the API is really difficult to perfect, we need > to spend a lot of time thinking and feedback from a large number of users, > and constantly improve, but for backward compatibility issues, we have to > adopt the most conservative approach when designing the API(Of course, I am > more in favor of developing more rich features, when we discuss clearly). > Therefore, I propose to divide the function implementation of > map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that > support time attributes and groupKeys. We can develop the features which > we have already agreed on the design. And we will continue to discuss the > uncertain design. > > In fact, in addition to the design of APIs, there will be various > performance optimization details, such as: table Aggregate function > emitValue will generate multiple calculation results, in extreme cases, > each record will trigger a large number of retract messages, this will have > poor performance,so we will also optimize the interface design, such as > adding the emitWithRetractValue interface (I have updated the google doc) > to allow the user to optionally perform incremental calculations, thus > avoiding a large number of retracts. Details like this are difficult to > fully discuss in the mail list, so I recommend creating JIRAs/FLIP first, > we develop designs that have been agreed upon and continue to discuss > non-deterministic designs! What do you think? @Fabian & Piotr & XiaoWei > > Best, > Jincheng > > Xiaowei Jiang <[hidden email]> 于2018年11月19日周一 上午12:07写道: > >> Hi Fabian & Piotr, thanks for the feedback! >> >> I appreciate your concerns, both on timestamp attributes as well as on >> implicit group keys. At the same time, I'm also concerned with the >> proposed >> approach of allowing Expression* as parameters, especially for >> flatMap/flatAgg. So far, we never allowed a scalar expression to appear >> together with table expressions. With the Expression* approach, this will >> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on if >> we >> fully understand the consequences when we try to extend our system in the >> future. I would be extra cautious in doing this. To avoid this, I think >> an >> implicit group key for flatAgg is safer. For flatMap, if users want to >> keep >> the rowtime column, he can use crossApply/join instead. So we are not >> losing any real functionality here. >> >> Also a clarification on the following example: >> tab.window(Tumble ... as 'w) >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> .select('k1, 'col1, 'w.rowtime as 'rtime) >> If we did not have the select clause in this example, we will have 'w as >> a >> regular column in the output. It should not magically disappear. >> >> The concern is not as strong for Table.map/Table.agg because we are not >> mixing scalar and table expressions. But we also want to be a bit >> consistent with these methods. If we used implicit group keys for >> Table.flatAgg, we probably should do the same for Table.agg. Now we only >> have to choose what to do with Table.map. I can see good arguments from >> both sides. But starting with a single Expression seems safer because >> that >> we can always extend to Expression* in the future. >> >> While thinking about this problem, it appears that we may need more work >> in >> our handling of watermarks for SQL/Table API. Our current way of >> propagating the watermarks from source all the way to sink might not be >> optimal. For example, after a tumbling window, the watermark can actually >> be advanced to just before the expiring of next window. I think that in >> general, each operator may need to generate new watermarks instead of >> simply propagating them. Once we accept that watermarks may change during >> the execution, it appears that the timestamp columns may also change, as >> long as we have some way to associate watermark with it. My intuition is >> that once we have a through solution for the watermark issue, we may be >> able to solve the problem we encountered for Table.map in a cleaner way. >> But this is a complex issue which deserves a discussion on its own. >> >> Regards, >> Xiaowei >> >> >> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <[hidden email]> >> wrote: >> >> > Hi, >> > >> > Isn’t the problem of multiple expressions limited only to `flat***` >> > functions and to be more specific only to having two (or more) >> > different >> > table functions passed as an expressions? `.flatAgg(TableAggA('a), >> > scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined >> > (duplicate result of every scalar function to every record. Or am I >> missing >> > something? >> > >> > Another remark, I would be in favour of not using abbreviations and >> naming >> > `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. >> > >> > Piotrek >> > >> > > On 15 Nov 2018, at 14:15, Fabian Hueske <[hidden email]> wrote: >> > > >> > > Hi Jincheng, >> > > >> > > I said before, that I think that the append() method is better than >> > > implicitly forwarding keys, but still, I believe it adds unnecessary >> > boiler >> > > plate code. >> > > >> > > Moreover, I haven't seen a convincing argument why map(Expression*) >> > > is >> > > worse than map(Expression). In either case we need to do all kinds of >> > > checks to prevent invalid use of functions. >> > > If the method is not correctly used, we can emit a good error message >> and >> > > documenting map(Expression*) will be easier than >> > map(append(Expression*)), >> > > in my opinion. >> > > I think we should not add unnessary syntax unless there is a good >> reason >> > > and to be honest, I haven't seen this reason yet. >> > > >> > > Regarding the groupBy.agg() method, I think it should behave just >> > > like >> > any >> > > other method, i.e., not do any implicit forwarding. >> > > Let's take the example of the windowed group by, that you posted >> before. >> > > >> > > tab.window(Tumble ... as 'w) >> > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> > > .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> > > .select('k1, 'col1, 'w.rowtime as 'rtime) >> > > >> > > What happens if 'w.rowtime is not selected? What is the data type of >> the >> > > field 'w in the resulting Table? Is it a regular field at all or just >> > > a >> > > system field that disappears if it is not selected? >> > > >> > > IMO, the following syntax is shorter, more explicit, and better >> > > aligned >> > > with the regular window.groupBy.select aggregations that are >> > > supported >> > > today. >> > > >> > > tab.window(Tumble ... as 'w) >> > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> > > .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) >> > > >> > > >> > > Best, Fabian >> > > >> > > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < >> > > [hidden email]>: >> > > >> > >> Hi Fabian/Xiaowei, >> > >> >> > >> I am very sorry for my late reply! Glad to see your reply, and >> > >> sounds >> > >> pretty good! >> > >> I agree that the approach with append() which can clearly defined >> > >> the >> > >> result schema is better which Fabian mentioned. >> > >> In addition and append() and also contains non-time attributes, >> > >> e.g.: >> > >> >> > >> tab('name, 'age, 'address, 'rowtime) >> > >> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >> > >> 'address, 'rowtime) >> > >> .window(Tumble over 5.millis on 'rowtime as 'w) >> > >> .groupBy('w, 'address) >> > >> >> > >> In this way the append() is very useful, and the behavior is very >> > similar >> > >> to withForwardedFields() in DataSet. >> > >> So +1 to using append() approach for the map()&flatmap()! >> > >> >> > >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree >> > >> Xiaowei's approach that define the keys to be implied in the result >> > table >> > >> and appears at the beginning, for example as follows: >> > >> tab.window(Tumble ... as 'w) >> > >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> > >> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> > >> .select('k1, 'col1, 'w.rowtime as 'rtime) >> > >> >> > >> What to you think? @Fabian @Xiaowei >> > >> >> > >> Thanks, >> > >> Jincheng >> > >> >> > >> Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: >> > >> >> > >>> Hi Jincheng, >> > >>> >> > >>> Thanks for the summary! >> > >>> I like the approach with append() better than the implicit >> > >>> forwarding >> > as >> > >> it >> > >>> clearly indicates which fields are forwarded. >> > >>> However, I don't see much benefit over the flatMap(Expression*) >> > variant, >> > >> as >> > >>> we would still need to analyze the full expression tree to ensure >> that >> > at >> > >>> most (or exactly?) one Scalar / TableFunction is used. >> > >>> >> > >>> Best, >> > >>> Fabian >> > >>> >> > >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >> > >>> [hidden email]>: >> > >>> >> > >>>> Hi all, >> > >>>> >> > >>>> We are discussing very detailed content about this proposal. We >> > >>>> are >> > >>> trying >> > >>>> to design the API in many aspects (functionality, compatibility, >> ease >> > >> of >> > >>>> use, etc.). I think this is a very good process. Only such a >> detailed >> > >>>> discussion, In order to develop PR more clearly and smoothly in >> > >>>> the >> > >> later >> > >>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a >> > >>>> lot >> > of >> > >>>> good ideas. >> > >>>> About the definition of method signatures I want to share my >> > >>>> points >> > >> here >> > >>>> which I am discussing with fabian in google doc (not yet >> > >>>> completed), >> > as >> > >>>> follows: >> > >>>> >> > >>>> Assume we have a table: >> > >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string, >> > >>>> 'proctime.proctime) >> > >>>> >> > >>>> Approach 1: >> > >>>> case1: Map follows Source Table >> > >>>> val result = >> > >>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime >> implied >> > >> in >> > >>>> the output >> > >>>> .window(Tumble over 5.millis on 'proctime as 'w) >> > >>>> >> > >>>> case2: FatAgg follows Window (Fabian mentioned above) >> > >>>> val result = >> > >>>> tab.window(Tumble ... as 'w) >> > >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> > >>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >> > >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> > >>>> >> > >>>> Approach 2: Similar to Fabian‘s approach, which the result schema >> > would >> > >>> be >> > >>>> clearly defined, but add a built-in append UDF. That make >> > >>>> map/flatmap/agg/flatAgg interface only accept one Expression. >> > >>>> val result = >> > >>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, >> > >>>> 'col2, >> > >>>> 'long, 'proctime) >> > >>>> .window(Tumble over 5.millis on 'proctime as 'w) >> > >>>> >> > >>>> Note: Append is a special UDF for built-in that can pass through >> > >>>> any >> > >>>> column. >> > >>>> >> > >>>> So, May be we can defined the as table.map(Expression) first, If >> > >>>> necessary, we can extend to table.map(Expression*) in the future >> > >>>> ? >> > Of >> > >>>> course, I also hope that we can do more perfection in this >> > >>>> proposal >> > >>> through >> > >>>> discussion. >> > >>>> >> > >>>> Thanks, >> > >>>> Jincheng >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: >> > >>>> >> > >>>>> Hi Fabian, >> > >>>>> >> > >>>>> I think that the key question you raised is if we allow extra >> > >>> parameters >> > >>>> in >> > >>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that >> may >> > >>>> appear >> > >>>>> more convenient in some cases. However, it might also cause some >> > >>>> confusions >> > >>>>> if we do that. For example, do we allow multiple UDFs in these >> > >>>> expressions? >> > >>>>> If we do, the semantics may be weird to define, e.g. what does >> > >>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? >> > >>>>> Even >> > >>> though >> > >>>>> not allowing it may appear less powerful, but it can make things >> more >> > >>>>> intuitive too. In the case of agg/flatAgg, we can define the keys >> to >> > >> be >> > >>>>> implied in the result table and appears at the beginning. You can >> > >> use a >> > >>>>> select method if you want to modify this behavior. I think that >> > >>>> eventually >> > >>>>> we will have some API which allows other expressions as >> > >>>>> additional >> > >>>>> parameters, but I think it's better to do that after we introduce >> the >> > >>>>> concept of nested tables. A lot of things we suggested here can >> > >>>>> be >> > >>>>> considered as special cases of that. But things are much simpler >> > >>>>> if >> > >> we >> > >>>>> leave that to later. >> > >>>>> >> > >>>>> Regards, >> > >>>>> Xiaowei >> > >>>>> >> > >>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email]> >> > >>> wrote: >> > >>>>> >> > >>>>>> Hi, >> > >>>>>> >> > >>>>>> * Re emit: >> > >>>>>> I think we should start with a well understood semantics of full >> > >>>>>> replacement. This is how the other agg functions work. >> > >>>>>> As was said before, there are open questions regarding an append >> > >> mode >> > >>>>>> (checkpointing, whether supporting retractions or not and if yes >> > >> how >> > >>> to >> > >>>>>> declare them, ...). >> > >>>>>> Since this seems to be an optimization, I'd postpone it. >> > >>>>>> >> > >>>>>> * Re grouping keys: >> > >>>>>> I don't think we should automatically add them because the >> > >>>>>> result >> > >>>> schema >> > >>>>>> would not be intuitive. >> > >>>>>> Would they be added at the beginning of the tuple or at the end? >> > >> What >> > >>>>>> metadata fields of windows would be added? In which order would >> > >> they >> > >>> be >> > >>>>>> added? >> > >>>>>> >> > >>>>>> However, we could support syntax like this: >> > >>>>>> val t: Table = ??? >> > >>>>>> t >> > >>>>>> .window(Tumble ... as 'w) >> > >>>>>> .groupBy('a, 'b) >> > >>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as >> > >>>> 'rtime) >> > >>>>>> >> > >>>>>> The result schema would be clearly defined as [b, a, f1, f2, >> > >>>>>> ..., >> > >> fn, >> > >>>>> wend, >> > >>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. >> > >>>>>> >> > >>>>>> * Re Multi-staged evaluation: >> > >>>>>> I think this should be an optimization that can be applied if >> > >>>>>> the >> > >> UDF >> > >>>>>> implements the merge() method. >> > >>>>>> >> > >>>>>> Best, Fabian >> > >>>>>> >> > >>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >> > >>>>>> [hidden email] >> > >>>>>>> : >> > >>>>>> >> > >>>>>>> Hi xiaowei, >> > >>>>>>> >> > >>>>>>> Yes, I agree with you that the semantics of >> > >> TableAggregateFunction >> > >>>> emit >> > >>>>>> is >> > >>>>>>> much more complex than AggregateFunction. The fundamental >> > >>> difference >> > >>>> is >> > >>>>>>> that TableAggregateFunction emits a "table" while >> > >> AggregateFunction >> > >>>>>> outputs >> > >>>>>>> (a column of) a "row". In the case of AggregateFunction it only >> > >> has >> > >>>> one >> > >>>>>>> mode which is “replacing” (complete update). But for >> > >>>>>>> TableAggregateFunction, it could be incremental (only emit the >> > >> new >> > >>>>>> updated >> > >>>>>>> results) update or complete update (always emit the entire >> > >>>>>>> table >> > >>> when >> > >>>>>>> “emit" is triggered). From the performance perspective, we >> > >>>>>>> might >> > >>>> want >> > >>>>> to >> > >>>>>>> use incremental update. But we need review and design this >> > >>> carefully, >> > >>>>>>> especially taking into account the cases of the failover >> > >>>>>>> (instead >> > >>> of >> > >>>>> just >> > >>>>>>> back-up the ACC it may also needs to remember the emit offset) >> > >> and >> > >>>>>>> retractions, as the semantics of TableAggregateFunction emit >> > >>>>>>> are >> > >>>>>> different >> > >>>>>>> than other UDFs. TableFunction also emits a table, but it does >> > >> not >> > >>>> need >> > >>>>>> to >> > >>>>>>> worry this due to the nature of stateless. >> > >>>>>>> >> > >>>>>>> Regards, >> > >>>>>>> Shaoxuan >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang >> > >>>>>>> <[hidden email] >> > >>> >> > >>>>> wrote: >> > >>>>>>> >> > >>>>>>>> Hi Jincheng, >> > >>>>>>>> >> > >>>>>>>> Thanks for adding the public interfaces! I think that it's a >> > >> very >> > >>>>> good >> > >>>>>>>> start. There are a few points that we need to have more >> > >>>> discussions. >> > >>>>>>>> >> > >>>>>>>> - TableAggregateFunction - this is a very complex beast, >> > >>>>> definitely >> > >>>>>>> the >> > >>>>>>>> most complex user defined objects we introduced so far. I >> > >>> think >> > >>>>>> there >> > >>>>>>>> are >> > >>>>>>>> quite some interesting questions here. For example, do we >> > >>> allow >> > >>>>>>>> multi-staged TableAggregate in this case? What is the >> > >>> semantics >> > >>>> of >> > >>>>>>>> emit? Is >> > >>>>>>>> it amendments to the previous output, or replacing it? I >> > >> think >> > >>>>> that >> > >>>>>>> this >> > >>>>>>>> subject itself is worth a discussion to make sure we get the >> > >>>>> details >> > >>>>>>>> right. >> > >>>>>>>> - GroupedTable.agg - does the group keys automatically >> > >> appear >> > >>> in >> > >>>>> the >> > >>>>>>>> output? how about the case of windowing aggregation? >> > >>>>>>>> >> > >>>>>>>> Regards, >> > >>>>>>>> Xiaowei >> > >>>>>>>> >> > >>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >> > >>>>> [hidden email]> >> > >>>>>>>> wrote: >> > >>>>>>>> >> > >>>>>>>>> Hi, Xiaowei, >> > >>>>>>>>> >> > >>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >> > >>> Outline >> > >>>> ! >> > >>>>>>>>> >> > >>>>>>>>> I quickly looked at the overall content, these are good >> > >>>> expressions >> > >>>>>> of >> > >>>>>>>> our >> > >>>>>>>>> offline discussions. But from the points of my view, we >> > >> should >> > >>>> add >> > >>>>>> the >> > >>>>>>>>> usage of public interfaces that we will introduce in this >> > >>>> propose. >> > >>>>>>> So, I >> > >>>>>>>>> added the following usage description of interface and >> > >>> operators >> > >>>>> in >> > >>>>>>>>> google doc: >> > >>>>>>>>> >> > >>>>>>>>> 1. Map Operator >> > >>>>>>>>> Map operator is a new operator of Table, Map operator can >> > >>>>> apply a >> > >>>>>>>>> scalar function, and can return multi-column. The usage as >> > >>>> follows: >> > >>>>>>>>> >> > >>>>>>>>> val res = tab >> > >>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >> > >>>>>>>>> .select(‘a, ‘c) >> > >>>>>>>>> >> > >>>>>>>>> 2. FlatMap Operator >> > >>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >> > >>> operator >> > >>>>> can >> > >>>>>>>> apply >> > >>>>>>>>> a table function, and can return multi-row. The usage as >> > >>> follows: >> > >>>>>>>>> >> > >>>>>>>>> val res = tab >> > >>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >> > >>>>>>>>> .select(‘a, ‘c) >> > >>>>>>>>> >> > >>>>>>>>> 3. Agg Operator >> > >>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg >> > >>>>>> operator >> > >>>>>>>> can >> > >>>>>>>>> apply a aggregate function, and can return multi-column. The >> > >>>> usage >> > >>>>> as >> > >>>>>>>>> follows: >> > >>>>>>>>> >> > >>>>>>>>> val res = tab >> > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> > >> global >> > >>>>>>>> aggregates >> > >>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >> > >>>>>>>>> .select(‘a, ‘c) >> > >>>>>>>>> >> > >>>>>>>>> 4. FlatAgg Operator >> > >>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, >> > >>>>> FaltAgg >> > >>>>>>>>> operator can apply a table aggregate function, and can return >> > >>>>>>> multi-row. >> > >>>>>>>>> The usage as follows: >> > >>>>>>>>> >> > >>>>>>>>> val res = tab >> > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> > >>> global >> > >>>>>> table >> > >>>>>>>>> aggregates >> > >>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >> > >>>>>>>>> .select(‘a, ‘c) >> > >>>>>>>>> >> > >>>>>>>>> 5. TableAggregateFunction >> > >>>>>>>>> The behavior of table aggregates is most like >> > >>>>>> GroupReduceFunction >> > >>>>>>>> did, >> > >>>>>>>>> which computed for a group of elements, and output a group >> > >> of >> > >>>>>>> elements. >> > >>>>>>>>> The TableAggregateFunction can be applied on >> > >>>>> GroupedTable.flatAgg() . >> > >>>>>>> The >> > >>>>>>>>> interface of TableAggregateFunction has a lot of content, so >> > >> I >> > >>>>> don't >> > >>>>>>> copy >> > >>>>>>>>> it here, Please look at the detail in google doc: >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>>>> >> > >>>>>>> >> > >>>>>> >> > >>>>> >> > >>>> >> > >>> >> > >> >> > >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >> > >>>>>>>>> >> > >>>>>>>>> I will be very appreciate to anyone for reviewing and >> > >>> commenting. >> > >>>>>>>>> >> > >>>>>>>>> Best, >> > >>>>>>>>> Jincheng >> > >>>>>>>>> >> > >>>>>>>> >> > >>>>>>> >> > >>>>>> >> > >>>>> >> > >>>> >> > >>> >> > >> >> > >> > >> > -- ----------------------------------------------------------------------------------- *Rome was not built in one day* ----------------------------------------------------------------------------------- |
Hi,
Thank you all for the great proposal and discussion! I also prefer to move on to the next step, so +1 for opening the JIRAs to start the work. We can have more detailed discussion there. Btw, we can start with JIRAs which we have agreed on. Best, Hequn On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <[hidden email]> wrote: > +1. I agree that we should open the JIRAs to start the work. We may > have better ideas on the flavor of the interface when implement/review > the code. > > Regards, > shaoxuan > > > On 11/20/18, jincheng sun <[hidden email]> wrote: > > Hi all, > > > > Thanks all for the feedback. > > > > @Piotr About not using abbreviations naming, +1,I like > > your proposal!Currently both DataSet and DataStream API are using > > `aggregate`, > > BTW,I find other language also not using abbreviations naming,such as R. > > > > Sometimes the interface of the API is really difficult to perfect, we > need > > to spend a lot of time thinking and feedback from a large number of > users, > > and constantly improve, but for backward compatibility issues, we have to > > adopt the most conservative approach when designing the API(Of course, I > am > > more in favor of developing more rich features, when we discuss clearly). > > Therefore, I propose to divide the function implementation of > > map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that > > support time attributes and groupKeys. We can develop the features which > > we have already agreed on the design. And we will continue to discuss > the > > uncertain design. > > > > In fact, in addition to the design of APIs, there will be various > > performance optimization details, such as: table Aggregate function > > emitValue will generate multiple calculation results, in extreme cases, > > each record will trigger a large number of retract messages, this will > have > > poor performance,so we will also optimize the interface design, such as > > adding the emitWithRetractValue interface (I have updated the google doc) > > to allow the user to optionally perform incremental calculations, thus > > avoiding a large number of retracts. Details like this are difficult to > > fully discuss in the mail list, so I recommend creating JIRAs/FLIP first, > > we develop designs that have been agreed upon and continue to discuss > > non-deterministic designs! What do you think? @Fabian & Piotr & XiaoWei > > > > Best, > > Jincheng > > > > Xiaowei Jiang <[hidden email]> 于2018年11月19日周一 上午12:07写道: > > > >> Hi Fabian & Piotr, thanks for the feedback! > >> > >> I appreciate your concerns, both on timestamp attributes as well as on > >> implicit group keys. At the same time, I'm also concerned with the > >> proposed > >> approach of allowing Expression* as parameters, especially for > >> flatMap/flatAgg. So far, we never allowed a scalar expression to appear > >> together with table expressions. With the Expression* approach, this > will > >> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on if > >> we > >> fully understand the consequences when we try to extend our system in > the > >> future. I would be extra cautious in doing this. To avoid this, I think > >> an > >> implicit group key for flatAgg is safer. For flatMap, if users want to > >> keep > >> the rowtime column, he can use crossApply/join instead. So we are not > >> losing any real functionality here. > >> > >> Also a clarification on the following example: > >> tab.window(Tumble ... as 'w) > >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >> .select('k1, 'col1, 'w.rowtime as 'rtime) > >> If we did not have the select clause in this example, we will have 'w as > >> a > >> regular column in the output. It should not magically disappear. > >> > >> The concern is not as strong for Table.map/Table.agg because we are not > >> mixing scalar and table expressions. But we also want to be a bit > >> consistent with these methods. If we used implicit group keys for > >> Table.flatAgg, we probably should do the same for Table.agg. Now we only > >> have to choose what to do with Table.map. I can see good arguments from > >> both sides. But starting with a single Expression seems safer because > >> that > >> we can always extend to Expression* in the future. > >> > >> While thinking about this problem, it appears that we may need more work > >> in > >> our handling of watermarks for SQL/Table API. Our current way of > >> propagating the watermarks from source all the way to sink might not be > >> optimal. For example, after a tumbling window, the watermark can > actually > >> be advanced to just before the expiring of next window. I think that in > >> general, each operator may need to generate new watermarks instead of > >> simply propagating them. Once we accept that watermarks may change > during > >> the execution, it appears that the timestamp columns may also change, as > >> long as we have some way to associate watermark with it. My intuition is > >> that once we have a through solution for the watermark issue, we may be > >> able to solve the problem we encountered for Table.map in a cleaner way. > >> But this is a complex issue which deserves a discussion on its own. > >> > >> Regards, > >> Xiaowei > >> > >> > >> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski < > [hidden email]> > >> wrote: > >> > >> > Hi, > >> > > >> > Isn’t the problem of multiple expressions limited only to `flat***` > >> > functions and to be more specific only to having two (or more) > >> > different > >> > table functions passed as an expressions? `.flatAgg(TableAggA('a), > >> > scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined > >> > (duplicate result of every scalar function to every record. Or am I > >> missing > >> > something? > >> > > >> > Another remark, I would be in favour of not using abbreviations and > >> naming > >> > `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. > >> > > >> > Piotrek > >> > > >> > > On 15 Nov 2018, at 14:15, Fabian Hueske <[hidden email]> wrote: > >> > > > >> > > Hi Jincheng, > >> > > > >> > > I said before, that I think that the append() method is better than > >> > > implicitly forwarding keys, but still, I believe it adds unnecessary > >> > boiler > >> > > plate code. > >> > > > >> > > Moreover, I haven't seen a convincing argument why map(Expression*) > >> > > is > >> > > worse than map(Expression). In either case we need to do all kinds > of > >> > > checks to prevent invalid use of functions. > >> > > If the method is not correctly used, we can emit a good error > message > >> and > >> > > documenting map(Expression*) will be easier than > >> > map(append(Expression*)), > >> > > in my opinion. > >> > > I think we should not add unnessary syntax unless there is a good > >> reason > >> > > and to be honest, I haven't seen this reason yet. > >> > > > >> > > Regarding the groupBy.agg() method, I think it should behave just > >> > > like > >> > any > >> > > other method, i.e., not do any implicit forwarding. > >> > > Let's take the example of the windowed group by, that you posted > >> before. > >> > > > >> > > tab.window(Tumble ... as 'w) > >> > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >> > > .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >> > > .select('k1, 'col1, 'w.rowtime as 'rtime) > >> > > > >> > > What happens if 'w.rowtime is not selected? What is the data type of > >> the > >> > > field 'w in the resulting Table? Is it a regular field at all or > just > >> > > a > >> > > system field that disappears if it is not selected? > >> > > > >> > > IMO, the following syntax is shorter, more explicit, and better > >> > > aligned > >> > > with the regular window.groupBy.select aggregations that are > >> > > supported > >> > > today. > >> > > > >> > > tab.window(Tumble ... as 'w) > >> > > .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >> > > .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) > >> > > > >> > > > >> > > Best, Fabian > >> > > > >> > > Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < > >> > > [hidden email]>: > >> > > > >> > >> Hi Fabian/Xiaowei, > >> > >> > >> > >> I am very sorry for my late reply! Glad to see your reply, and > >> > >> sounds > >> > >> pretty good! > >> > >> I agree that the approach with append() which can clearly defined > >> > >> the > >> > >> result schema is better which Fabian mentioned. > >> > >> In addition and append() and also contains non-time attributes, > >> > >> e.g.: > >> > >> > >> > >> tab('name, 'age, 'address, 'rowtime) > >> > >> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, > >> > >> 'address, 'rowtime) > >> > >> .window(Tumble over 5.millis on 'rowtime as 'w) > >> > >> .groupBy('w, 'address) > >> > >> > >> > >> In this way the append() is very useful, and the behavior is very > >> > similar > >> > >> to withForwardedFields() in DataSet. > >> > >> So +1 to using append() approach for the map()&flatmap()! > >> > >> > >> > >> But how about the agg() and flatAgg()? In agg/flatAgg case I agree > >> > >> Xiaowei's approach that define the keys to be implied in the result > >> > table > >> > >> and appears at the beginning, for example as follows: > >> > >> tab.window(Tumble ... as 'w) > >> > >> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >> > >> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >> > >> .select('k1, 'col1, 'w.rowtime as 'rtime) > >> > >> > >> > >> What to you think? @Fabian @Xiaowei > >> > >> > >> > >> Thanks, > >> > >> Jincheng > >> > >> > >> > >> Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: > >> > >> > >> > >>> Hi Jincheng, > >> > >>> > >> > >>> Thanks for the summary! > >> > >>> I like the approach with append() better than the implicit > >> > >>> forwarding > >> > as > >> > >> it > >> > >>> clearly indicates which fields are forwarded. > >> > >>> However, I don't see much benefit over the flatMap(Expression*) > >> > variant, > >> > >> as > >> > >>> we would still need to analyze the full expression tree to ensure > >> that > >> > at > >> > >>> most (or exactly?) one Scalar / TableFunction is used. > >> > >>> > >> > >>> Best, > >> > >>> Fabian > >> > >>> > >> > >>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < > >> > >>> [hidden email]>: > >> > >>> > >> > >>>> Hi all, > >> > >>>> > >> > >>>> We are discussing very detailed content about this proposal. We > >> > >>>> are > >> > >>> trying > >> > >>>> to design the API in many aspects (functionality, compatibility, > >> ease > >> > >> of > >> > >>>> use, etc.). I think this is a very good process. Only such a > >> detailed > >> > >>>> discussion, In order to develop PR more clearly and smoothly in > >> > >>>> the > >> > >> later > >> > >>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a > >> > >>>> lot > >> > of > >> > >>>> good ideas. > >> > >>>> About the definition of method signatures I want to share my > >> > >>>> points > >> > >> here > >> > >>>> which I am discussing with fabian in google doc (not yet > >> > >>>> completed), > >> > as > >> > >>>> follows: > >> > >>>> > >> > >>>> Assume we have a table: > >> > >>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, > 'string, > >> > >>>> 'proctime.proctime) > >> > >>>> > >> > >>>> Approach 1: > >> > >>>> case1: Map follows Source Table > >> > >>>> val result = > >> > >>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime > >> implied > >> > >> in > >> > >>>> the output > >> > >>>> .window(Tumble over 5.millis on 'proctime as 'w) > >> > >>>> > >> > >>>> case2: FatAgg follows Window (Fabian mentioned above) > >> > >>>> val result = > >> > >>>> tab.window(Tumble ... as 'w) > >> > >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >> > >>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > >> > >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >> > >>>> > >> > >>>> Approach 2: Similar to Fabian‘s approach, which the result schema > >> > would > >> > >>> be > >> > >>>> clearly defined, but add a built-in append UDF. That make > >> > >>>> map/flatmap/agg/flatAgg interface only accept one Expression. > >> > >>>> val result = > >> > >>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, > >> > >>>> 'col2, > >> > >>>> 'long, 'proctime) > >> > >>>> .window(Tumble over 5.millis on 'proctime as 'w) > >> > >>>> > >> > >>>> Note: Append is a special UDF for built-in that can pass through > >> > >>>> any > >> > >>>> column. > >> > >>>> > >> > >>>> So, May be we can defined the as table.map(Expression) first, > If > >> > >>>> necessary, we can extend to table.map(Expression*) in the future > >> > >>>> ? > >> > Of > >> > >>>> course, I also hope that we can do more perfection in this > >> > >>>> proposal > >> > >>> through > >> > >>>> discussion. > >> > >>>> > >> > >>>> Thanks, > >> > >>>> Jincheng > >> > >>>> > >> > >>>> > >> > >>>> > >> > >>>> > >> > >>>> > >> > >>>> Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: > >> > >>>> > >> > >>>>> Hi Fabian, > >> > >>>>> > >> > >>>>> I think that the key question you raised is if we allow extra > >> > >>> parameters > >> > >>>> in > >> > >>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that > >> may > >> > >>>> appear > >> > >>>>> more convenient in some cases. However, it might also cause some > >> > >>>> confusions > >> > >>>>> if we do that. For example, do we allow multiple UDFs in these > >> > >>>> expressions? > >> > >>>>> If we do, the semantics may be weird to define, e.g. what does > >> > >>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? > >> > >>>>> Even > >> > >>> though > >> > >>>>> not allowing it may appear less powerful, but it can make things > >> more > >> > >>>>> intuitive too. In the case of agg/flatAgg, we can define the > keys > >> to > >> > >> be > >> > >>>>> implied in the result table and appears at the beginning. You > can > >> > >> use a > >> > >>>>> select method if you want to modify this behavior. I think that > >> > >>>> eventually > >> > >>>>> we will have some API which allows other expressions as > >> > >>>>> additional > >> > >>>>> parameters, but I think it's better to do that after we > introduce > >> the > >> > >>>>> concept of nested tables. A lot of things we suggested here can > >> > >>>>> be > >> > >>>>> considered as special cases of that. But things are much simpler > >> > >>>>> if > >> > >> we > >> > >>>>> leave that to later. > >> > >>>>> > >> > >>>>> Regards, > >> > >>>>> Xiaowei > >> > >>>>> > >> > >>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email] > > > >> > >>> wrote: > >> > >>>>> > >> > >>>>>> Hi, > >> > >>>>>> > >> > >>>>>> * Re emit: > >> > >>>>>> I think we should start with a well understood semantics of > full > >> > >>>>>> replacement. This is how the other agg functions work. > >> > >>>>>> As was said before, there are open questions regarding an > append > >> > >> mode > >> > >>>>>> (checkpointing, whether supporting retractions or not and if > yes > >> > >> how > >> > >>> to > >> > >>>>>> declare them, ...). > >> > >>>>>> Since this seems to be an optimization, I'd postpone it. > >> > >>>>>> > >> > >>>>>> * Re grouping keys: > >> > >>>>>> I don't think we should automatically add them because the > >> > >>>>>> result > >> > >>>> schema > >> > >>>>>> would not be intuitive. > >> > >>>>>> Would they be added at the beginning of the tuple or at the > end? > >> > >> What > >> > >>>>>> metadata fields of windows would be added? In which order would > >> > >> they > >> > >>> be > >> > >>>>>> added? > >> > >>>>>> > >> > >>>>>> However, we could support syntax like this: > >> > >>>>>> val t: Table = ??? > >> > >>>>>> t > >> > >>>>>> .window(Tumble ... as 'w) > >> > >>>>>> .groupBy('a, 'b) > >> > >>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime > as > >> > >>>> 'rtime) > >> > >>>>>> > >> > >>>>>> The result schema would be clearly defined as [b, a, f1, f2, > >> > >>>>>> ..., > >> > >> fn, > >> > >>>>> wend, > >> > >>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > >> > >>>>>> > >> > >>>>>> * Re Multi-staged evaluation: > >> > >>>>>> I think this should be an optimization that can be applied if > >> > >>>>>> the > >> > >> UDF > >> > >>>>>> implements the merge() method. > >> > >>>>>> > >> > >>>>>> Best, Fabian > >> > >>>>>> > >> > >>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > >> > >>>>>> [hidden email] > >> > >>>>>>> : > >> > >>>>>> > >> > >>>>>>> Hi xiaowei, > >> > >>>>>>> > >> > >>>>>>> Yes, I agree with you that the semantics of > >> > >> TableAggregateFunction > >> > >>>> emit > >> > >>>>>> is > >> > >>>>>>> much more complex than AggregateFunction. The fundamental > >> > >>> difference > >> > >>>> is > >> > >>>>>>> that TableAggregateFunction emits a "table" while > >> > >> AggregateFunction > >> > >>>>>> outputs > >> > >>>>>>> (a column of) a "row". In the case of AggregateFunction it > only > >> > >> has > >> > >>>> one > >> > >>>>>>> mode which is “replacing” (complete update). But for > >> > >>>>>>> TableAggregateFunction, it could be incremental (only emit the > >> > >> new > >> > >>>>>> updated > >> > >>>>>>> results) update or complete update (always emit the entire > >> > >>>>>>> table > >> > >>> when > >> > >>>>>>> “emit" is triggered). From the performance perspective, we > >> > >>>>>>> might > >> > >>>> want > >> > >>>>> to > >> > >>>>>>> use incremental update. But we need review and design this > >> > >>> carefully, > >> > >>>>>>> especially taking into account the cases of the failover > >> > >>>>>>> (instead > >> > >>> of > >> > >>>>> just > >> > >>>>>>> back-up the ACC it may also needs to remember the emit offset) > >> > >> and > >> > >>>>>>> retractions, as the semantics of TableAggregateFunction emit > >> > >>>>>>> are > >> > >>>>>> different > >> > >>>>>>> than other UDFs. TableFunction also emits a table, but it does > >> > >> not > >> > >>>> need > >> > >>>>>> to > >> > >>>>>>> worry this due to the nature of stateless. > >> > >>>>>>> > >> > >>>>>>> Regards, > >> > >>>>>>> Shaoxuan > >> > >>>>>>> > >> > >>>>>>> > >> > >>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang > >> > >>>>>>> <[hidden email] > >> > >>> > >> > >>>>> wrote: > >> > >>>>>>> > >> > >>>>>>>> Hi Jincheng, > >> > >>>>>>>> > >> > >>>>>>>> Thanks for adding the public interfaces! I think that it's a > >> > >> very > >> > >>>>> good > >> > >>>>>>>> start. There are a few points that we need to have more > >> > >>>> discussions. > >> > >>>>>>>> > >> > >>>>>>>> - TableAggregateFunction - this is a very complex beast, > >> > >>>>> definitely > >> > >>>>>>> the > >> > >>>>>>>> most complex user defined objects we introduced so far. I > >> > >>> think > >> > >>>>>> there > >> > >>>>>>>> are > >> > >>>>>>>> quite some interesting questions here. For example, do we > >> > >>> allow > >> > >>>>>>>> multi-staged TableAggregate in this case? What is the > >> > >>> semantics > >> > >>>> of > >> > >>>>>>>> emit? Is > >> > >>>>>>>> it amendments to the previous output, or replacing it? I > >> > >> think > >> > >>>>> that > >> > >>>>>>> this > >> > >>>>>>>> subject itself is worth a discussion to make sure we get > the > >> > >>>>> details > >> > >>>>>>>> right. > >> > >>>>>>>> - GroupedTable.agg - does the group keys automatically > >> > >> appear > >> > >>> in > >> > >>>>> the > >> > >>>>>>>> output? how about the case of windowing aggregation? > >> > >>>>>>>> > >> > >>>>>>>> Regards, > >> > >>>>>>>> Xiaowei > >> > >>>>>>>> > >> > >>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > >> > >>>>> [hidden email]> > >> > >>>>>>>> wrote: > >> > >>>>>>>> > >> > >>>>>>>>> Hi, Xiaowei, > >> > >>>>>>>>> > >> > >>>>>>>>> Thanks for bring up the discuss of Table API Enhancement > >> > >>> Outline > >> > >>>> ! > >> > >>>>>>>>> > >> > >>>>>>>>> I quickly looked at the overall content, these are good > >> > >>>> expressions > >> > >>>>>> of > >> > >>>>>>>> our > >> > >>>>>>>>> offline discussions. But from the points of my view, we > >> > >> should > >> > >>>> add > >> > >>>>>> the > >> > >>>>>>>>> usage of public interfaces that we will introduce in this > >> > >>>> propose. > >> > >>>>>>> So, I > >> > >>>>>>>>> added the following usage description of interface and > >> > >>> operators > >> > >>>>> in > >> > >>>>>>>>> google doc: > >> > >>>>>>>>> > >> > >>>>>>>>> 1. Map Operator > >> > >>>>>>>>> Map operator is a new operator of Table, Map operator can > >> > >>>>> apply a > >> > >>>>>>>>> scalar function, and can return multi-column. The usage as > >> > >>>> follows: > >> > >>>>>>>>> > >> > >>>>>>>>> val res = tab > >> > >>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > >> > >>>>>>>>> .select(‘a, ‘c) > >> > >>>>>>>>> > >> > >>>>>>>>> 2. FlatMap Operator > >> > >>>>>>>>> FaltMap operator is a new operator of Table, FlatMap > >> > >>> operator > >> > >>>>> can > >> > >>>>>>>> apply > >> > >>>>>>>>> a table function, and can return multi-row. The usage as > >> > >>> follows: > >> > >>>>>>>>> > >> > >>>>>>>>> val res = tab > >> > >>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > >> > >>>>>>>>> .select(‘a, ‘c) > >> > >>>>>>>>> > >> > >>>>>>>>> 3. Agg Operator > >> > >>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg > >> > >>>>>> operator > >> > >>>>>>>> can > >> > >>>>>>>>> apply a aggregate function, and can return multi-column. The > >> > >>>> usage > >> > >>>>> as > >> > >>>>>>>>> follows: > >> > >>>>>>>>> > >> > >>>>>>>>> val res = tab > >> > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > >> > >> global > >> > >>>>>>>> aggregates > >> > >>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > >> > >>>>>>>>> .select(‘a, ‘c) > >> > >>>>>>>>> > >> > >>>>>>>>> 4. FlatAgg Operator > >> > >>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, > >> > >>>>> FaltAgg > >> > >>>>>>>>> operator can apply a table aggregate function, and can > return > >> > >>>>>>> multi-row. > >> > >>>>>>>>> The usage as follows: > >> > >>>>>>>>> > >> > >>>>>>>>> val res = tab > >> > >>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > >> > >>> global > >> > >>>>>> table > >> > >>>>>>>>> aggregates > >> > >>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > >> > >>>>>>>>> .select(‘a, ‘c) > >> > >>>>>>>>> > >> > >>>>>>>>> 5. TableAggregateFunction > >> > >>>>>>>>> The behavior of table aggregates is most like > >> > >>>>>> GroupReduceFunction > >> > >>>>>>>> did, > >> > >>>>>>>>> which computed for a group of elements, and output a group > >> > >> of > >> > >>>>>>> elements. > >> > >>>>>>>>> The TableAggregateFunction can be applied on > >> > >>>>> GroupedTable.flatAgg() . > >> > >>>>>>> The > >> > >>>>>>>>> interface of TableAggregateFunction has a lot of content, so > >> > >> I > >> > >>>>> don't > >> > >>>>>>> copy > >> > >>>>>>>>> it here, Please look at the detail in google doc: > >> > >>>>>>>>> > >> > >>>>>>>>> > >> > >>>>>>>> > >> > >>>>>>> > >> > >>>>>> > >> > >>>>> > >> > >>>> > >> > >>> > >> > >> > >> > > >> > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > >> > >>>>>>>>> > >> > >>>>>>>>> I will be very appreciate to anyone for reviewing and > >> > >>> commenting. > >> > >>>>>>>>> > >> > >>>>>>>>> Best, > >> > >>>>>>>>> Jincheng > >> > >>>>>>>>> > >> > >>>>>>>> > >> > >>>>>>> > >> > >>>>>> > >> > >>>>> > >> > >>>> > >> > >>> > >> > >> > >> > > >> > > >> > > > > > -- > > ----------------------------------------------------------------------------------- > > *Rome was not built in one day* > > > ----------------------------------------------------------------------------------- > |
Hi,
1. > In fact, in addition to the design of APIs, there will be various > performance optimization details, such as: table Aggregate function > emitValue will generate multiple calculation results, in extreme cases, > each record will trigger a large number of retract messages, this will have > poor performance Can this be solved/mitigated by emitting the results only on watermarks? I think that was the path that we decided to take both for Temporal Joins and upsert stream conversion. I know that this increases the latency and there is a place for a future global setting/user preference “emit the data ASAP mode”, but emitting only on watermarks seems to me as a better/more sane default. 2. With respect to the API discussion and implicit columns. The problem for me so far is I’m not sure if I like the additionally complexity of `append()` solution, while implicit columns are definitely not in the spirit of SQL. Neither joins nor aggregations add extra unexpected columns to the result without asking. This definitely can be confusing for the users since it brakes the convention. Thus I would lean towards Fabian’s proposal of multi-argument `map(Expression*)` from those 3 options. 3. Another topic is that I’m not 100% convinced that we should be adding new api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think the same could be achieved by changing table.map(F('x)) into table.select(F('x)).unnest() or table.select(F('x).unnest()) Where `unnest()` means unnest row/tuple type into a columnar table. table.flatMap(F('x)) Could be on the other hand also handled by table.select(F('x)) By correctly deducing that F(x) is a multi row output function Same might apply to `aggregate(F('x))`, but this maybe could be replaced by: table.groupBy(…).select(F('x).unnest()) Adding scalar functions should also be possible: table.groupBy('k).select(F('x).unnest(), ‘k) Maybe such approach would allow us to implement the same features in the SQL as well? Piotrek > On 21 Nov 2018, at 09:43, Hequn Cheng <[hidden email]> wrote: > > Hi, > > Thank you all for the great proposal and discussion! > I also prefer to move on to the next step, so +1 for opening the JIRAs to > start the work. > We can have more detailed discussion there. Btw, we can start with JIRAs > which we have agreed on. > > Best, > Hequn > > On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <[hidden email]> wrote: > >> +1. I agree that we should open the JIRAs to start the work. We may >> have better ideas on the flavor of the interface when implement/review >> the code. >> >> Regards, >> shaoxuan >> >> >> On 11/20/18, jincheng sun <[hidden email]> wrote: >>> Hi all, >>> >>> Thanks all for the feedback. >>> >>> @Piotr About not using abbreviations naming, +1,I like >>> your proposal!Currently both DataSet and DataStream API are using >>> `aggregate`, >>> BTW,I find other language also not using abbreviations naming,such as R. >>> >>> Sometimes the interface of the API is really difficult to perfect, we >> need >>> to spend a lot of time thinking and feedback from a large number of >> users, >>> and constantly improve, but for backward compatibility issues, we have to >>> adopt the most conservative approach when designing the API(Of course, I >> am >>> more in favor of developing more rich features, when we discuss clearly). >>> Therefore, I propose to divide the function implementation of >>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that >>> support time attributes and groupKeys. We can develop the features which >>> we have already agreed on the design. And we will continue to discuss >> the >>> uncertain design. >>> >>> In fact, in addition to the design of APIs, there will be various >>> performance optimization details, such as: table Aggregate function >>> emitValue will generate multiple calculation results, in extreme cases, >>> each record will trigger a large number of retract messages, this will >> have >>> poor performance,so we will also optimize the interface design, such as >>> adding the emitWithRetractValue interface (I have updated the google doc) >>> to allow the user to optionally perform incremental calculations, thus >>> avoiding a large number of retracts. Details like this are difficult to >>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP first, >>> we develop designs that have been agreed upon and continue to discuss >>> non-deterministic designs! What do you think? @Fabian & Piotr & XiaoWei >>> >>> Best, >>> Jincheng >>> >>> Xiaowei Jiang <[hidden email]> 于2018年11月19日周一 上午12:07写道: >>> >>>> Hi Fabian & Piotr, thanks for the feedback! >>>> >>>> I appreciate your concerns, both on timestamp attributes as well as on >>>> implicit group keys. At the same time, I'm also concerned with the >>>> proposed >>>> approach of allowing Expression* as parameters, especially for >>>> flatMap/flatAgg. So far, we never allowed a scalar expression to appear >>>> together with table expressions. With the Expression* approach, this >> will >>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on if >>>> we >>>> fully understand the consequences when we try to extend our system in >> the >>>> future. I would be extra cautious in doing this. To avoid this, I think >>>> an >>>> implicit group key for flatAgg is safer. For flatMap, if users want to >>>> keep >>>> the rowtime column, he can use crossApply/join instead. So we are not >>>> losing any real functionality here. >>>> >>>> Also a clarification on the following example: >>>> tab.window(Tumble ... as 'w) >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>> If we did not have the select clause in this example, we will have 'w as >>>> a >>>> regular column in the output. It should not magically disappear. >>>> >>>> The concern is not as strong for Table.map/Table.agg because we are not >>>> mixing scalar and table expressions. But we also want to be a bit >>>> consistent with these methods. If we used implicit group keys for >>>> Table.flatAgg, we probably should do the same for Table.agg. Now we only >>>> have to choose what to do with Table.map. I can see good arguments from >>>> both sides. But starting with a single Expression seems safer because >>>> that >>>> we can always extend to Expression* in the future. >>>> >>>> While thinking about this problem, it appears that we may need more work >>>> in >>>> our handling of watermarks for SQL/Table API. Our current way of >>>> propagating the watermarks from source all the way to sink might not be >>>> optimal. For example, after a tumbling window, the watermark can >> actually >>>> be advanced to just before the expiring of next window. I think that in >>>> general, each operator may need to generate new watermarks instead of >>>> simply propagating them. Once we accept that watermarks may change >> during >>>> the execution, it appears that the timestamp columns may also change, as >>>> long as we have some way to associate watermark with it. My intuition is >>>> that once we have a through solution for the watermark issue, we may be >>>> able to solve the problem we encountered for Table.map in a cleaner way. >>>> But this is a complex issue which deserves a discussion on its own. >>>> >>>> Regards, >>>> Xiaowei >>>> >>>> >>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski < >> [hidden email]> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Isn’t the problem of multiple expressions limited only to `flat***` >>>>> functions and to be more specific only to having two (or more) >>>>> different >>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a), >>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined >>>>> (duplicate result of every scalar function to every record. Or am I >>>> missing >>>>> something? >>>>> >>>>> Another remark, I would be in favour of not using abbreviations and >>>> naming >>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. >>>>> >>>>> Piotrek >>>>> >>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <[hidden email]> wrote: >>>>>> >>>>>> Hi Jincheng, >>>>>> >>>>>> I said before, that I think that the append() method is better than >>>>>> implicitly forwarding keys, but still, I believe it adds unnecessary >>>>> boiler >>>>>> plate code. >>>>>> >>>>>> Moreover, I haven't seen a convincing argument why map(Expression*) >>>>>> is >>>>>> worse than map(Expression). In either case we need to do all kinds >> of >>>>>> checks to prevent invalid use of functions. >>>>>> If the method is not correctly used, we can emit a good error >> message >>>> and >>>>>> documenting map(Expression*) will be easier than >>>>> map(append(Expression*)), >>>>>> in my opinion. >>>>>> I think we should not add unnessary syntax unless there is a good >>>> reason >>>>>> and to be honest, I haven't seen this reason yet. >>>>>> >>>>>> Regarding the groupBy.agg() method, I think it should behave just >>>>>> like >>>>> any >>>>>> other method, i.e., not do any implicit forwarding. >>>>>> Let's take the example of the windowed group by, that you posted >>>> before. >>>>>> >>>>>> tab.window(Tumble ... as 'w) >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>> >>>>>> What happens if 'w.rowtime is not selected? What is the data type of >>>> the >>>>>> field 'w in the resulting Table? Is it a regular field at all or >> just >>>>>> a >>>>>> system field that disappears if it is not selected? >>>>>> >>>>>> IMO, the following syntax is shorter, more explicit, and better >>>>>> aligned >>>>>> with the regular window.groupBy.select aggregations that are >>>>>> supported >>>>>> today. >>>>>> >>>>>> tab.window(Tumble ... as 'w) >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) >>>>>> >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < >>>>>> [hidden email]>: >>>>>> >>>>>>> Hi Fabian/Xiaowei, >>>>>>> >>>>>>> I am very sorry for my late reply! Glad to see your reply, and >>>>>>> sounds >>>>>>> pretty good! >>>>>>> I agree that the approach with append() which can clearly defined >>>>>>> the >>>>>>> result schema is better which Fabian mentioned. >>>>>>> In addition and append() and also contains non-time attributes, >>>>>>> e.g.: >>>>>>> >>>>>>> tab('name, 'age, 'address, 'rowtime) >>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >>>>>>> 'address, 'rowtime) >>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w) >>>>>>> .groupBy('w, 'address) >>>>>>> >>>>>>> In this way the append() is very useful, and the behavior is very >>>>> similar >>>>>>> to withForwardedFields() in DataSet. >>>>>>> So +1 to using append() approach for the map()&flatmap()! >>>>>>> >>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree >>>>>>> Xiaowei's approach that define the keys to be implied in the result >>>>> table >>>>>>> and appears at the beginning, for example as follows: >>>>>>> tab.window(Tumble ... as 'w) >>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>> >>>>>>> What to you think? @Fabian @Xiaowei >>>>>>> >>>>>>> Thanks, >>>>>>> Jincheng >>>>>>> >>>>>>> Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: >>>>>>> >>>>>>>> Hi Jincheng, >>>>>>>> >>>>>>>> Thanks for the summary! >>>>>>>> I like the approach with append() better than the implicit >>>>>>>> forwarding >>>>> as >>>>>>> it >>>>>>>> clearly indicates which fields are forwarded. >>>>>>>> However, I don't see much benefit over the flatMap(Expression*) >>>>> variant, >>>>>>> as >>>>>>>> we would still need to analyze the full expression tree to ensure >>>> that >>>>> at >>>>>>>> most (or exactly?) one Scalar / TableFunction is used. >>>>>>>> >>>>>>>> Best, >>>>>>>> Fabian >>>>>>>> >>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >>>>>>>> [hidden email]>: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> We are discussing very detailed content about this proposal. We >>>>>>>>> are >>>>>>>> trying >>>>>>>>> to design the API in many aspects (functionality, compatibility, >>>> ease >>>>>>> of >>>>>>>>> use, etc.). I think this is a very good process. Only such a >>>> detailed >>>>>>>>> discussion, In order to develop PR more clearly and smoothly in >>>>>>>>> the >>>>>>> later >>>>>>>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a >>>>>>>>> lot >>>>> of >>>>>>>>> good ideas. >>>>>>>>> About the definition of method signatures I want to share my >>>>>>>>> points >>>>>>> here >>>>>>>>> which I am discussing with fabian in google doc (not yet >>>>>>>>> completed), >>>>> as >>>>>>>>> follows: >>>>>>>>> >>>>>>>>> Assume we have a table: >>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, >> 'string, >>>>>>>>> 'proctime.proctime) >>>>>>>>> >>>>>>>>> Approach 1: >>>>>>>>> case1: Map follows Source Table >>>>>>>>> val result = >>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime >>>> implied >>>>>>> in >>>>>>>>> the output >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>>>>>>> >>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above) >>>>>>>>> val result = >>>>>>>>> tab.window(Tumble ... as 'w) >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >>>>>>>>> >>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result schema >>>>> would >>>>>>>> be >>>>>>>>> clearly defined, but add a built-in append UDF. That make >>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression. >>>>>>>>> val result = >>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, >>>>>>>>> 'col2, >>>>>>>>> 'long, 'proctime) >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >>>>>>>>> >>>>>>>>> Note: Append is a special UDF for built-in that can pass through >>>>>>>>> any >>>>>>>>> column. >>>>>>>>> >>>>>>>>> So, May be we can defined the as table.map(Expression) first, >> If >>>>>>>>> necessary, we can extend to table.map(Expression*) in the future >>>>>>>>> ? >>>>> Of >>>>>>>>> course, I also hope that we can do more perfection in this >>>>>>>>> proposal >>>>>>>> through >>>>>>>>> discussion. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Jincheng >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: >>>>>>>>> >>>>>>>>>> Hi Fabian, >>>>>>>>>> >>>>>>>>>> I think that the key question you raised is if we allow extra >>>>>>>> parameters >>>>>>>>> in >>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that >>>> may >>>>>>>>> appear >>>>>>>>>> more convenient in some cases. However, it might also cause some >>>>>>>>> confusions >>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these >>>>>>>>> expressions? >>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does >>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? >>>>>>>>>> Even >>>>>>>> though >>>>>>>>>> not allowing it may appear less powerful, but it can make things >>>> more >>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the >> keys >>>> to >>>>>>> be >>>>>>>>>> implied in the result table and appears at the beginning. You >> can >>>>>>> use a >>>>>>>>>> select method if you want to modify this behavior. I think that >>>>>>>>> eventually >>>>>>>>>> we will have some API which allows other expressions as >>>>>>>>>> additional >>>>>>>>>> parameters, but I think it's better to do that after we >> introduce >>>> the >>>>>>>>>> concept of nested tables. A lot of things we suggested here can >>>>>>>>>> be >>>>>>>>>> considered as special cases of that. But things are much simpler >>>>>>>>>> if >>>>>>> we >>>>>>>>>> leave that to later. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Xiaowei >>>>>>>>>> >>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email] >>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> * Re emit: >>>>>>>>>>> I think we should start with a well understood semantics of >> full >>>>>>>>>>> replacement. This is how the other agg functions work. >>>>>>>>>>> As was said before, there are open questions regarding an >> append >>>>>>> mode >>>>>>>>>>> (checkpointing, whether supporting retractions or not and if >> yes >>>>>>> how >>>>>>>> to >>>>>>>>>>> declare them, ...). >>>>>>>>>>> Since this seems to be an optimization, I'd postpone it. >>>>>>>>>>> >>>>>>>>>>> * Re grouping keys: >>>>>>>>>>> I don't think we should automatically add them because the >>>>>>>>>>> result >>>>>>>>> schema >>>>>>>>>>> would not be intuitive. >>>>>>>>>>> Would they be added at the beginning of the tuple or at the >> end? >>>>>>> What >>>>>>>>>>> metadata fields of windows would be added? In which order would >>>>>>> they >>>>>>>> be >>>>>>>>>>> added? >>>>>>>>>>> >>>>>>>>>>> However, we could support syntax like this: >>>>>>>>>>> val t: Table = ??? >>>>>>>>>>> t >>>>>>>>>>> .window(Tumble ... as 'w) >>>>>>>>>>> .groupBy('a, 'b) >>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime >> as >>>>>>>>> 'rtime) >>>>>>>>>>> >>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2, >>>>>>>>>>> ..., >>>>>>> fn, >>>>>>>>>> wend, >>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. >>>>>>>>>>> >>>>>>>>>>> * Re Multi-staged evaluation: >>>>>>>>>>> I think this should be an optimization that can be applied if >>>>>>>>>>> the >>>>>>> UDF >>>>>>>>>>> implements the merge() method. >>>>>>>>>>> >>>>>>>>>>> Best, Fabian >>>>>>>>>>> >>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >>>>>>>>>>> [hidden email] >>>>>>>>>>>> : >>>>>>>>>>> >>>>>>>>>>>> Hi xiaowei, >>>>>>>>>>>> >>>>>>>>>>>> Yes, I agree with you that the semantics of >>>>>>> TableAggregateFunction >>>>>>>>> emit >>>>>>>>>>> is >>>>>>>>>>>> much more complex than AggregateFunction. The fundamental >>>>>>>> difference >>>>>>>>> is >>>>>>>>>>>> that TableAggregateFunction emits a "table" while >>>>>>> AggregateFunction >>>>>>>>>>> outputs >>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it >> only >>>>>>> has >>>>>>>>> one >>>>>>>>>>>> mode which is “replacing” (complete update). But for >>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit the >>>>>>> new >>>>>>>>>>> updated >>>>>>>>>>>> results) update or complete update (always emit the entire >>>>>>>>>>>> table >>>>>>>> when >>>>>>>>>>>> “emit" is triggered). From the performance perspective, we >>>>>>>>>>>> might >>>>>>>>> want >>>>>>>>>> to >>>>>>>>>>>> use incremental update. But we need review and design this >>>>>>>> carefully, >>>>>>>>>>>> especially taking into account the cases of the failover >>>>>>>>>>>> (instead >>>>>>>> of >>>>>>>>>> just >>>>>>>>>>>> back-up the ACC it may also needs to remember the emit offset) >>>>>>> and >>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit >>>>>>>>>>>> are >>>>>>>>>>> different >>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it does >>>>>>> not >>>>>>>>> need >>>>>>>>>>> to >>>>>>>>>>>> worry this due to the nature of stateless. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Shaoxuan >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang >>>>>>>>>>>> <[hidden email] >>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Jincheng, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a >>>>>>> very >>>>>>>>>> good >>>>>>>>>>>>> start. There are a few points that we need to have more >>>>>>>>> discussions. >>>>>>>>>>>>> >>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast, >>>>>>>>>> definitely >>>>>>>>>>>> the >>>>>>>>>>>>> most complex user defined objects we introduced so far. I >>>>>>>> think >>>>>>>>>>> there >>>>>>>>>>>>> are >>>>>>>>>>>>> quite some interesting questions here. For example, do we >>>>>>>> allow >>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the >>>>>>>> semantics >>>>>>>>> of >>>>>>>>>>>>> emit? Is >>>>>>>>>>>>> it amendments to the previous output, or replacing it? I >>>>>>> think >>>>>>>>>> that >>>>>>>>>>>> this >>>>>>>>>>>>> subject itself is worth a discussion to make sure we get >> the >>>>>>>>>> details >>>>>>>>>>>>> right. >>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically >>>>>>> appear >>>>>>>> in >>>>>>>>>> the >>>>>>>>>>>>> output? how about the case of windowing aggregation? >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Xiaowei >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >>>>>>>>>> [hidden email]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, Xiaowei, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >>>>>>>> Outline >>>>>>>>> ! >>>>>>>>>>>>>> >>>>>>>>>>>>>> I quickly looked at the overall content, these are good >>>>>>>>> expressions >>>>>>>>>>> of >>>>>>>>>>>>> our >>>>>>>>>>>>>> offline discussions. But from the points of my view, we >>>>>>> should >>>>>>>>> add >>>>>>>>>>> the >>>>>>>>>>>>>> usage of public interfaces that we will introduce in this >>>>>>>>> propose. >>>>>>>>>>>> So, I >>>>>>>>>>>>>> added the following usage description of interface and >>>>>>>> operators >>>>>>>>>> in >>>>>>>>>>>>>> google doc: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Map Operator >>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator can >>>>>>>>>> apply a >>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as >>>>>>>>> follows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. FlatMap Operator >>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >>>>>>>> operator >>>>>>>>>> can >>>>>>>>>>>>> apply >>>>>>>>>>>>>> a table function, and can return multi-row. The usage as >>>>>>>> follows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3. Agg Operator >>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg >>>>>>>>>>> operator >>>>>>>>>>>>> can >>>>>>>>>>>>>> apply a aggregate function, and can return multi-column. The >>>>>>>>> usage >>>>>>>>>> as >>>>>>>>>>>>>> follows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >>>>>>> global >>>>>>>>>>>>> aggregates >>>>>>>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>> >>>>>>>>>>>>>> 4. FlatAgg Operator >>>>>>>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, >>>>>>>>>> FaltAgg >>>>>>>>>>>>>> operator can apply a table aggregate function, and can >> return >>>>>>>>>>>> multi-row. >>>>>>>>>>>>>> The usage as follows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> val res = tab >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >>>>>>>> global >>>>>>>>>>> table >>>>>>>>>>>>>> aggregates >>>>>>>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >>>>>>>>>>>>>> .select(‘a, ‘c) >>>>>>>>>>>>>> >>>>>>>>>>>>>> 5. TableAggregateFunction >>>>>>>>>>>>>> The behavior of table aggregates is most like >>>>>>>>>>> GroupReduceFunction >>>>>>>>>>>>> did, >>>>>>>>>>>>>> which computed for a group of elements, and output a group >>>>>>> of >>>>>>>>>>>> elements. >>>>>>>>>>>>>> The TableAggregateFunction can be applied on >>>>>>>>>> GroupedTable.flatAgg() . >>>>>>>>>>>> The >>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content, so >>>>>>> I >>>>>>>>>> don't >>>>>>>>>>>> copy >>>>>>>>>>>>>> it here, Please look at the detail in google doc: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >>>>>>>>>>>>>> >>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and >>>>>>>> commenting. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Jincheng >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >> >> >> -- >> >> ----------------------------------------------------------------------------------- >> >> *Rome was not built in one day* >> >> >> ----------------------------------------------------------------------------------- >> |
Hi Piotrek,
Thanks for your feedback, and thanks for share your thoughts! #1) No,watermark solves the issue of the late event. Here, the performance problem is caused by the update emit mode. i.e.: When current calculation result is output, the previous calculation result needs to be retracted. #2) As I mentioned above we should continue the discussion until we solve the problems raised by Xiaowei and Fabian. #3)I still hope to keep the simplicity that select only support projected scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b, flatmap('d)). Thanks, Jincheng Piotr Nowojski <[hidden email]> 于2018年11月21日周三 下午5:24写道: > Hi, > > 1. > > > In fact, in addition to the design of APIs, there will be various > > performance optimization details, such as: table Aggregate function > > emitValue will generate multiple calculation results, in extreme cases, > > each record will trigger a large number of retract messages, this will > have > > poor performance > > Can this be solved/mitigated by emitting the results only on watermarks? I > think that was the path that we decided to take both for Temporal Joins and > upsert stream conversion. I know that this increases the latency and there > is a place for a future global setting/user preference “emit the data ASAP > mode”, but emitting only on watermarks seems to me as a better/more sane > default. > > 2. > > With respect to the API discussion and implicit columns. The problem for > me so far is I’m not sure if I like the additionally complexity of > `append()` solution, while implicit columns are definitely not in the > spirit of SQL. Neither joins nor aggregations add extra unexpected columns > to the result without asking. This definitely can be confusing for the > users since it brakes the convention. Thus I would lean towards Fabian’s > proposal of multi-argument `map(Expression*)` from those 3 options. > > 3. > > Another topic is that I’m not 100% convinced that we should be adding new > api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think > the same could be achieved by changing > > table.map(F('x)) > > into > > table.select(F('x)).unnest() > or > table.select(F('x).unnest()) > > Where `unnest()` means unnest row/tuple type into a columnar table. > > table.flatMap(F('x)) > > Could be on the other hand also handled by > > table.select(F('x)) > > By correctly deducing that F(x) is a multi row output function > > Same might apply to `aggregate(F('x))`, but this maybe could be replaced > by: > > table.groupBy(…).select(F('x).unnest()) > > Adding scalar functions should also be possible: > > table.groupBy('k).select(F('x).unnest(), ‘k) > > Maybe such approach would allow us to implement the same features in the > SQL as well? > > Piotrek > > > On 21 Nov 2018, at 09:43, Hequn Cheng <[hidden email]> wrote: > > > > Hi, > > > > Thank you all for the great proposal and discussion! > > I also prefer to move on to the next step, so +1 for opening the JIRAs to > > start the work. > > We can have more detailed discussion there. Btw, we can start with JIRAs > > which we have agreed on. > > > > Best, > > Hequn > > > > On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <[hidden email]> > wrote: > > > >> +1. I agree that we should open the JIRAs to start the work. We may > >> have better ideas on the flavor of the interface when implement/review > >> the code. > >> > >> Regards, > >> shaoxuan > >> > >> > >> On 11/20/18, jincheng sun <[hidden email]> wrote: > >>> Hi all, > >>> > >>> Thanks all for the feedback. > >>> > >>> @Piotr About not using abbreviations naming, +1,I like > >>> your proposal!Currently both DataSet and DataStream API are using > >>> `aggregate`, > >>> BTW,I find other language also not using abbreviations naming,such as > R. > >>> > >>> Sometimes the interface of the API is really difficult to perfect, we > >> need > >>> to spend a lot of time thinking and feedback from a large number of > >> users, > >>> and constantly improve, but for backward compatibility issues, we have > to > >>> adopt the most conservative approach when designing the API(Of course, > I > >> am > >>> more in favor of developing more rich features, when we discuss > clearly). > >>> Therefore, I propose to divide the function implementation of > >>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that > >>> support time attributes and groupKeys. We can develop the features > which > >>> we have already agreed on the design. And we will continue to discuss > >> the > >>> uncertain design. > >>> > >>> In fact, in addition to the design of APIs, there will be various > >>> performance optimization details, such as: table Aggregate function > >>> emitValue will generate multiple calculation results, in extreme cases, > >>> each record will trigger a large number of retract messages, this will > >> have > >>> poor performance,so we will also optimize the interface design, such as > >>> adding the emitWithRetractValue interface (I have updated the google > doc) > >>> to allow the user to optionally perform incremental calculations, thus > >>> avoiding a large number of retracts. Details like this are difficult to > >>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP > first, > >>> we develop designs that have been agreed upon and continue to discuss > >>> non-deterministic designs! What do you think? @Fabian & Piotr & > XiaoWei > >>> > >>> Best, > >>> Jincheng > >>> > >>> Xiaowei Jiang <[hidden email]> 于2018年11月19日周一 上午12:07写道: > >>> > >>>> Hi Fabian & Piotr, thanks for the feedback! > >>>> > >>>> I appreciate your concerns, both on timestamp attributes as well as on > >>>> implicit group keys. At the same time, I'm also concerned with the > >>>> proposed > >>>> approach of allowing Expression* as parameters, especially for > >>>> flatMap/flatAgg. So far, we never allowed a scalar expression to > appear > >>>> together with table expressions. With the Expression* approach, this > >> will > >>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on > if > >>>> we > >>>> fully understand the consequences when we try to extend our system in > >> the > >>>> future. I would be extra cautious in doing this. To avoid this, I > think > >>>> an > >>>> implicit group key for flatAgg is safer. For flatMap, if users want to > >>>> keep > >>>> the rowtime column, he can use crossApply/join instead. So we are not > >>>> losing any real functionality here. > >>>> > >>>> Also a clarification on the following example: > >>>> tab.window(Tumble ... as 'w) > >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>> If we did not have the select clause in this example, we will have 'w > as > >>>> a > >>>> regular column in the output. It should not magically disappear. > >>>> > >>>> The concern is not as strong for Table.map/Table.agg because we are > not > >>>> mixing scalar and table expressions. But we also want to be a bit > >>>> consistent with these methods. If we used implicit group keys for > >>>> Table.flatAgg, we probably should do the same for Table.agg. Now we > only > >>>> have to choose what to do with Table.map. I can see good arguments > from > >>>> both sides. But starting with a single Expression seems safer because > >>>> that > >>>> we can always extend to Expression* in the future. > >>>> > >>>> While thinking about this problem, it appears that we may need more > work > >>>> in > >>>> our handling of watermarks for SQL/Table API. Our current way of > >>>> propagating the watermarks from source all the way to sink might not > be > >>>> optimal. For example, after a tumbling window, the watermark can > >> actually > >>>> be advanced to just before the expiring of next window. I think that > in > >>>> general, each operator may need to generate new watermarks instead of > >>>> simply propagating them. Once we accept that watermarks may change > >> during > >>>> the execution, it appears that the timestamp columns may also change, > as > >>>> long as we have some way to associate watermark with it. My intuition > is > >>>> that once we have a through solution for the watermark issue, we may > be > >>>> able to solve the problem we encountered for Table.map in a cleaner > way. > >>>> But this is a complex issue which deserves a discussion on its own. > >>>> > >>>> Regards, > >>>> Xiaowei > >>>> > >>>> > >>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski < > >> [hidden email]> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> Isn’t the problem of multiple expressions limited only to `flat***` > >>>>> functions and to be more specific only to having two (or more) > >>>>> different > >>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a), > >>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined > >>>>> (duplicate result of every scalar function to every record. Or am I > >>>> missing > >>>>> something? > >>>>> > >>>>> Another remark, I would be in favour of not using abbreviations and > >>>> naming > >>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. > >>>>> > >>>>> Piotrek > >>>>> > >>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <[hidden email]> wrote: > >>>>>> > >>>>>> Hi Jincheng, > >>>>>> > >>>>>> I said before, that I think that the append() method is better than > >>>>>> implicitly forwarding keys, but still, I believe it adds unnecessary > >>>>> boiler > >>>>>> plate code. > >>>>>> > >>>>>> Moreover, I haven't seen a convincing argument why map(Expression*) > >>>>>> is > >>>>>> worse than map(Expression). In either case we need to do all kinds > >> of > >>>>>> checks to prevent invalid use of functions. > >>>>>> If the method is not correctly used, we can emit a good error > >> message > >>>> and > >>>>>> documenting map(Expression*) will be easier than > >>>>> map(append(Expression*)), > >>>>>> in my opinion. > >>>>>> I think we should not add unnessary syntax unless there is a good > >>>> reason > >>>>>> and to be honest, I haven't seen this reason yet. > >>>>>> > >>>>>> Regarding the groupBy.agg() method, I think it should behave just > >>>>>> like > >>>>> any > >>>>>> other method, i.e., not do any implicit forwarding. > >>>>>> Let's take the example of the windowed group by, that you posted > >>>> before. > >>>>>> > >>>>>> tab.window(Tumble ... as 'w) > >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>>>> > >>>>>> What happens if 'w.rowtime is not selected? What is the data type of > >>>> the > >>>>>> field 'w in the resulting Table? Is it a regular field at all or > >> just > >>>>>> a > >>>>>> system field that disappears if it is not selected? > >>>>>> > >>>>>> IMO, the following syntax is shorter, more explicit, and better > >>>>>> aligned > >>>>>> with the regular window.groupBy.select aggregations that are > >>>>>> supported > >>>>>> today. > >>>>>> > >>>>>> tab.window(Tumble ... as 'w) > >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) > >>>>>> > >>>>>> > >>>>>> Best, Fabian > >>>>>> > >>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < > >>>>>> [hidden email]>: > >>>>>> > >>>>>>> Hi Fabian/Xiaowei, > >>>>>>> > >>>>>>> I am very sorry for my late reply! Glad to see your reply, and > >>>>>>> sounds > >>>>>>> pretty good! > >>>>>>> I agree that the approach with append() which can clearly defined > >>>>>>> the > >>>>>>> result schema is better which Fabian mentioned. > >>>>>>> In addition and append() and also contains non-time attributes, > >>>>>>> e.g.: > >>>>>>> > >>>>>>> tab('name, 'age, 'address, 'rowtime) > >>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, > >>>>>>> 'address, 'rowtime) > >>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w) > >>>>>>> .groupBy('w, 'address) > >>>>>>> > >>>>>>> In this way the append() is very useful, and the behavior is very > >>>>> similar > >>>>>>> to withForwardedFields() in DataSet. > >>>>>>> So +1 to using append() approach for the map()&flatmap()! > >>>>>>> > >>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree > >>>>>>> Xiaowei's approach that define the keys to be implied in the result > >>>>> table > >>>>>>> and appears at the beginning, for example as follows: > >>>>>>> tab.window(Tumble ... as 'w) > >>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) > >>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>>>>> > >>>>>>> What to you think? @Fabian @Xiaowei > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Jincheng > >>>>>>> > >>>>>>> Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: > >>>>>>> > >>>>>>>> Hi Jincheng, > >>>>>>>> > >>>>>>>> Thanks for the summary! > >>>>>>>> I like the approach with append() better than the implicit > >>>>>>>> forwarding > >>>>> as > >>>>>>> it > >>>>>>>> clearly indicates which fields are forwarded. > >>>>>>>> However, I don't see much benefit over the flatMap(Expression*) > >>>>> variant, > >>>>>>> as > >>>>>>>> we would still need to analyze the full expression tree to ensure > >>>> that > >>>>> at > >>>>>>>> most (or exactly?) one Scalar / TableFunction is used. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Fabian > >>>>>>>> > >>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < > >>>>>>>> [hidden email]>: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> We are discussing very detailed content about this proposal. We > >>>>>>>>> are > >>>>>>>> trying > >>>>>>>>> to design the API in many aspects (functionality, compatibility, > >>>> ease > >>>>>>> of > >>>>>>>>> use, etc.). I think this is a very good process. Only such a > >>>> detailed > >>>>>>>>> discussion, In order to develop PR more clearly and smoothly in > >>>>>>>>> the > >>>>>>> later > >>>>>>>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a > >>>>>>>>> lot > >>>>> of > >>>>>>>>> good ideas. > >>>>>>>>> About the definition of method signatures I want to share my > >>>>>>>>> points > >>>>>>> here > >>>>>>>>> which I am discussing with fabian in google doc (not yet > >>>>>>>>> completed), > >>>>> as > >>>>>>>>> follows: > >>>>>>>>> > >>>>>>>>> Assume we have a table: > >>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, > >> 'string, > >>>>>>>>> 'proctime.proctime) > >>>>>>>>> > >>>>>>>>> Approach 1: > >>>>>>>>> case1: Map follows Source Table > >>>>>>>>> val result = > >>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime > >>>> implied > >>>>>>> in > >>>>>>>>> the output > >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) > >>>>>>>>> > >>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above) > >>>>>>>>> val result = > >>>>>>>>> tab.window(Tumble ... as 'w) > >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. > >>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) > >>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) > >>>>>>>>> > >>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result schema > >>>>> would > >>>>>>>> be > >>>>>>>>> clearly defined, but add a built-in append UDF. That make > >>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression. > >>>>>>>>> val result = > >>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, > >>>>>>>>> 'col2, > >>>>>>>>> 'long, 'proctime) > >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) > >>>>>>>>> > >>>>>>>>> Note: Append is a special UDF for built-in that can pass through > >>>>>>>>> any > >>>>>>>>> column. > >>>>>>>>> > >>>>>>>>> So, May be we can defined the as table.map(Expression) first, > >> If > >>>>>>>>> necessary, we can extend to table.map(Expression*) in the future > >>>>>>>>> ? > >>>>> Of > >>>>>>>>> course, I also hope that we can do more perfection in this > >>>>>>>>> proposal > >>>>>>>> through > >>>>>>>>> discussion. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Jincheng > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: > >>>>>>>>> > >>>>>>>>>> Hi Fabian, > >>>>>>>>>> > >>>>>>>>>> I think that the key question you raised is if we allow extra > >>>>>>>> parameters > >>>>>>>>> in > >>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing that > >>>> may > >>>>>>>>> appear > >>>>>>>>>> more convenient in some cases. However, it might also cause some > >>>>>>>>> confusions > >>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these > >>>>>>>>> expressions? > >>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does > >>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? > >>>>>>>>>> Even > >>>>>>>> though > >>>>>>>>>> not allowing it may appear less powerful, but it can make things > >>>> more > >>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the > >> keys > >>>> to > >>>>>>> be > >>>>>>>>>> implied in the result table and appears at the beginning. You > >> can > >>>>>>> use a > >>>>>>>>>> select method if you want to modify this behavior. I think that > >>>>>>>>> eventually > >>>>>>>>>> we will have some API which allows other expressions as > >>>>>>>>>> additional > >>>>>>>>>> parameters, but I think it's better to do that after we > >> introduce > >>>> the > >>>>>>>>>> concept of nested tables. A lot of things we suggested here can > >>>>>>>>>> be > >>>>>>>>>> considered as special cases of that. But things are much simpler > >>>>>>>>>> if > >>>>>>> we > >>>>>>>>>> leave that to later. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Xiaowei > >>>>>>>>>> > >>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <[hidden email] > >>> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> * Re emit: > >>>>>>>>>>> I think we should start with a well understood semantics of > >> full > >>>>>>>>>>> replacement. This is how the other agg functions work. > >>>>>>>>>>> As was said before, there are open questions regarding an > >> append > >>>>>>> mode > >>>>>>>>>>> (checkpointing, whether supporting retractions or not and if > >> yes > >>>>>>> how > >>>>>>>> to > >>>>>>>>>>> declare them, ...). > >>>>>>>>>>> Since this seems to be an optimization, I'd postpone it. > >>>>>>>>>>> > >>>>>>>>>>> * Re grouping keys: > >>>>>>>>>>> I don't think we should automatically add them because the > >>>>>>>>>>> result > >>>>>>>>> schema > >>>>>>>>>>> would not be intuitive. > >>>>>>>>>>> Would they be added at the beginning of the tuple or at the > >> end? > >>>>>>> What > >>>>>>>>>>> metadata fields of windows would be added? In which order would > >>>>>>> they > >>>>>>>> be > >>>>>>>>>>> added? > >>>>>>>>>>> > >>>>>>>>>>> However, we could support syntax like this: > >>>>>>>>>>> val t: Table = ??? > >>>>>>>>>>> t > >>>>>>>>>>> .window(Tumble ... as 'w) > >>>>>>>>>>> .groupBy('a, 'b) > >>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime > >> as > >>>>>>>>> 'rtime) > >>>>>>>>>>> > >>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2, > >>>>>>>>>>> ..., > >>>>>>> fn, > >>>>>>>>>> wend, > >>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. > >>>>>>>>>>> > >>>>>>>>>>> * Re Multi-staged evaluation: > >>>>>>>>>>> I think this should be an optimization that can be applied if > >>>>>>>>>>> the > >>>>>>> UDF > >>>>>>>>>>> implements the merge() method. > >>>>>>>>>>> > >>>>>>>>>>> Best, Fabian > >>>>>>>>>>> > >>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < > >>>>>>>>>>> [hidden email] > >>>>>>>>>>>> : > >>>>>>>>>>> > >>>>>>>>>>>> Hi xiaowei, > >>>>>>>>>>>> > >>>>>>>>>>>> Yes, I agree with you that the semantics of > >>>>>>> TableAggregateFunction > >>>>>>>>> emit > >>>>>>>>>>> is > >>>>>>>>>>>> much more complex than AggregateFunction. The fundamental > >>>>>>>> difference > >>>>>>>>> is > >>>>>>>>>>>> that TableAggregateFunction emits a "table" while > >>>>>>> AggregateFunction > >>>>>>>>>>> outputs > >>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it > >> only > >>>>>>> has > >>>>>>>>> one > >>>>>>>>>>>> mode which is “replacing” (complete update). But for > >>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit the > >>>>>>> new > >>>>>>>>>>> updated > >>>>>>>>>>>> results) update or complete update (always emit the entire > >>>>>>>>>>>> table > >>>>>>>> when > >>>>>>>>>>>> “emit" is triggered). From the performance perspective, we > >>>>>>>>>>>> might > >>>>>>>>> want > >>>>>>>>>> to > >>>>>>>>>>>> use incremental update. But we need review and design this > >>>>>>>> carefully, > >>>>>>>>>>>> especially taking into account the cases of the failover > >>>>>>>>>>>> (instead > >>>>>>>> of > >>>>>>>>>> just > >>>>>>>>>>>> back-up the ACC it may also needs to remember the emit offset) > >>>>>>> and > >>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit > >>>>>>>>>>>> are > >>>>>>>>>>> different > >>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it does > >>>>>>> not > >>>>>>>>> need > >>>>>>>>>>> to > >>>>>>>>>>>> worry this due to the nature of stateless. > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> Shaoxuan > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang > >>>>>>>>>>>> <[hidden email] > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Jincheng, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a > >>>>>>> very > >>>>>>>>>> good > >>>>>>>>>>>>> start. There are a few points that we need to have more > >>>>>>>>> discussions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast, > >>>>>>>>>> definitely > >>>>>>>>>>>> the > >>>>>>>>>>>>> most complex user defined objects we introduced so far. I > >>>>>>>> think > >>>>>>>>>>> there > >>>>>>>>>>>>> are > >>>>>>>>>>>>> quite some interesting questions here. For example, do we > >>>>>>>> allow > >>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the > >>>>>>>> semantics > >>>>>>>>> of > >>>>>>>>>>>>> emit? Is > >>>>>>>>>>>>> it amendments to the previous output, or replacing it? I > >>>>>>> think > >>>>>>>>>> that > >>>>>>>>>>>> this > >>>>>>>>>>>>> subject itself is worth a discussion to make sure we get > >> the > >>>>>>>>>> details > >>>>>>>>>>>>> right. > >>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically > >>>>>>> appear > >>>>>>>> in > >>>>>>>>>> the > >>>>>>>>>>>>> output? how about the case of windowing aggregation? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Xiaowei > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < > >>>>>>>>>> [hidden email]> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, Xiaowei, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement > >>>>>>>> Outline > >>>>>>>>> ! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I quickly looked at the overall content, these are good > >>>>>>>>> expressions > >>>>>>>>>>> of > >>>>>>>>>>>>> our > >>>>>>>>>>>>>> offline discussions. But from the points of my view, we > >>>>>>> should > >>>>>>>>> add > >>>>>>>>>>> the > >>>>>>>>>>>>>> usage of public interfaces that we will introduce in this > >>>>>>>>> propose. > >>>>>>>>>>>> So, I > >>>>>>>>>>>>>> added the following usage description of interface and > >>>>>>>> operators > >>>>>>>>>> in > >>>>>>>>>>>>>> google doc: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Map Operator > >>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator can > >>>>>>>>>> apply a > >>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as > >>>>>>>>> follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) > >>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. FlatMap Operator > >>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap > >>>>>>>> operator > >>>>>>>>>> can > >>>>>>>>>>>>> apply > >>>>>>>>>>>>>> a table function, and can return multi-row. The usage as > >>>>>>>> follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) > >>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3. Agg Operator > >>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg > >>>>>>>>>>> operator > >>>>>>>>>>>>> can > >>>>>>>>>>>>>> apply a aggregate function, and can return multi-column. The > >>>>>>>>> usage > >>>>>>>>>> as > >>>>>>>>>>>>>> follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > >>>>>>> global > >>>>>>>>>>>>> aggregates > >>>>>>>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) > >>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4. FlatAgg Operator > >>>>>>>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, > >>>>>>>>>> FaltAgg > >>>>>>>>>>>>>> operator can apply a table aggregate function, and can > >> return > >>>>>>>>>>>> multi-row. > >>>>>>>>>>>>>> The usage as follows: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> val res = tab > >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define > >>>>>>>> global > >>>>>>>>>>> table > >>>>>>>>>>>>>> aggregates > >>>>>>>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) > >>>>>>>>>>>>>> .select(‘a, ‘c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 5. TableAggregateFunction > >>>>>>>>>>>>>> The behavior of table aggregates is most like > >>>>>>>>>>> GroupReduceFunction > >>>>>>>>>>>>> did, > >>>>>>>>>>>>>> which computed for a group of elements, and output a group > >>>>>>> of > >>>>>>>>>>>> elements. > >>>>>>>>>>>>>> The TableAggregateFunction can be applied on > >>>>>>>>>> GroupedTable.flatAgg() . > >>>>>>>>>>>> The > >>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content, so > >>>>>>> I > >>>>>>>>>> don't > >>>>>>>>>>>> copy > >>>>>>>>>>>>>> it here, Please look at the detail in google doc: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and > >>>>>>>> commenting. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>>> > >>> > >> > >> > >> -- > >> > >> > ----------------------------------------------------------------------------------- > >> > >> *Rome was not built in one day* > >> > >> > >> > ----------------------------------------------------------------------------------- > >> > > |
Hi shaoxuan & Hequn,
Thanks for your suggestion,I'll file the JIRAs later. We can prepare PRs while continuing to move forward the ongoing discussion. Regards, Jincheng jincheng sun <[hidden email]> 于2018年11月21日周三 下午7:07写道: > Hi Piotrek, > Thanks for your feedback, and thanks for share your thoughts! > > #1) No,watermark solves the issue of the late event. Here, the performance > problem is caused by the update emit mode. i.e.: When current calculation > result is output, the previous calculation result needs to be retracted. > #2) As I mentioned above we should continue the discussion until we solve > the problems raised by Xiaowei and Fabian. > #3)I still hope to keep the simplicity that select only support projected > scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b, > flatmap('d)). > > Thanks, > Jincheng > > Piotr Nowojski <[hidden email]> 于2018年11月21日周三 下午5:24写道: > >> Hi, >> >> 1. >> >> > In fact, in addition to the design of APIs, there will be various >> > performance optimization details, such as: table Aggregate function >> > emitValue will generate multiple calculation results, in extreme cases, >> > each record will trigger a large number of retract messages, this will >> have >> > poor performance >> >> Can this be solved/mitigated by emitting the results only on watermarks? >> I think that was the path that we decided to take both for Temporal Joins >> and upsert stream conversion. I know that this increases the latency and >> there is a place for a future global setting/user preference “emit the data >> ASAP mode”, but emitting only on watermarks seems to me as a better/more >> sane default. >> >> 2. >> >> With respect to the API discussion and implicit columns. The problem for >> me so far is I’m not sure if I like the additionally complexity of >> `append()` solution, while implicit columns are definitely not in the >> spirit of SQL. Neither joins nor aggregations add extra unexpected columns >> to the result without asking. This definitely can be confusing for the >> users since it brakes the convention. Thus I would lean towards Fabian’s >> proposal of multi-argument `map(Expression*)` from those 3 options. >> >> 3. >> >> Another topic is that I’m not 100% convinced that we should be adding new >> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think >> the same could be achieved by changing >> >> table.map(F('x)) >> >> into >> >> table.select(F('x)).unnest() >> or >> table.select(F('x).unnest()) >> >> Where `unnest()` means unnest row/tuple type into a columnar table. >> >> table.flatMap(F('x)) >> >> Could be on the other hand also handled by >> >> table.select(F('x)) >> >> By correctly deducing that F(x) is a multi row output function >> >> Same might apply to `aggregate(F('x))`, but this maybe could be replaced >> by: >> >> table.groupBy(…).select(F('x).unnest()) >> >> Adding scalar functions should also be possible: >> >> table.groupBy('k).select(F('x).unnest(), ‘k) >> >> Maybe such approach would allow us to implement the same features in the >> SQL as well? >> >> Piotrek >> >> > On 21 Nov 2018, at 09:43, Hequn Cheng <[hidden email]> wrote: >> > >> > Hi, >> > >> > Thank you all for the great proposal and discussion! >> > I also prefer to move on to the next step, so +1 for opening the JIRAs >> to >> > start the work. >> > We can have more detailed discussion there. Btw, we can start with JIRAs >> > which we have agreed on. >> > >> > Best, >> > Hequn >> > >> > On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <[hidden email]> >> wrote: >> > >> >> +1. I agree that we should open the JIRAs to start the work. We may >> >> have better ideas on the flavor of the interface when implement/review >> >> the code. >> >> >> >> Regards, >> >> shaoxuan >> >> >> >> >> >> On 11/20/18, jincheng sun <[hidden email]> wrote: >> >>> Hi all, >> >>> >> >>> Thanks all for the feedback. >> >>> >> >>> @Piotr About not using abbreviations naming, +1,I like >> >>> your proposal!Currently both DataSet and DataStream API are using >> >>> `aggregate`, >> >>> BTW,I find other language also not using abbreviations naming,such as >> R. >> >>> >> >>> Sometimes the interface of the API is really difficult to perfect, we >> >> need >> >>> to spend a lot of time thinking and feedback from a large number of >> >> users, >> >>> and constantly improve, but for backward compatibility issues, we >> have to >> >>> adopt the most conservative approach when designing the API(Of >> course, I >> >> am >> >>> more in favor of developing more rich features, when we discuss >> clearly). >> >>> Therefore, I propose to divide the function implementation of >> >>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that >> >>> support time attributes and groupKeys. We can develop the features >> which >> >>> we have already agreed on the design. And we will continue to discuss >> >> the >> >>> uncertain design. >> >>> >> >>> In fact, in addition to the design of APIs, there will be various >> >>> performance optimization details, such as: table Aggregate function >> >>> emitValue will generate multiple calculation results, in extreme >> cases, >> >>> each record will trigger a large number of retract messages, this will >> >> have >> >>> poor performance,so we will also optimize the interface design, such >> as >> >>> adding the emitWithRetractValue interface (I have updated the google >> doc) >> >>> to allow the user to optionally perform incremental calculations, thus >> >>> avoiding a large number of retracts. Details like this are difficult >> to >> >>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP >> first, >> >>> we develop designs that have been agreed upon and continue to discuss >> >>> non-deterministic designs! What do you think? @Fabian & Piotr & >> XiaoWei >> >>> >> >>> Best, >> >>> Jincheng >> >>> >> >>> Xiaowei Jiang <[hidden email]> 于2018年11月19日周一 上午12:07写道: >> >>> >> >>>> Hi Fabian & Piotr, thanks for the feedback! >> >>>> >> >>>> I appreciate your concerns, both on timestamp attributes as well as >> on >> >>>> implicit group keys. At the same time, I'm also concerned with the >> >>>> proposed >> >>>> approach of allowing Expression* as parameters, especially for >> >>>> flatMap/flatAgg. So far, we never allowed a scalar expression to >> appear >> >>>> together with table expressions. With the Expression* approach, this >> >> will >> >>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on >> if >> >>>> we >> >>>> fully understand the consequences when we try to extend our system in >> >> the >> >>>> future. I would be extra cautious in doing this. To avoid this, I >> think >> >>>> an >> >>>> implicit group key for flatAgg is safer. For flatMap, if users want >> to >> >>>> keep >> >>>> the rowtime column, he can use crossApply/join instead. So we are not >> >>>> losing any real functionality here. >> >>>> >> >>>> Also a clarification on the following example: >> >>>> tab.window(Tumble ... as 'w) >> >>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> >>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >>>> If we did not have the select clause in this example, we will have >> 'w as >> >>>> a >> >>>> regular column in the output. It should not magically disappear. >> >>>> >> >>>> The concern is not as strong for Table.map/Table.agg because we are >> not >> >>>> mixing scalar and table expressions. But we also want to be a bit >> >>>> consistent with these methods. If we used implicit group keys for >> >>>> Table.flatAgg, we probably should do the same for Table.agg. Now we >> only >> >>>> have to choose what to do with Table.map. I can see good arguments >> from >> >>>> both sides. But starting with a single Expression seems safer because >> >>>> that >> >>>> we can always extend to Expression* in the future. >> >>>> >> >>>> While thinking about this problem, it appears that we may need more >> work >> >>>> in >> >>>> our handling of watermarks for SQL/Table API. Our current way of >> >>>> propagating the watermarks from source all the way to sink might not >> be >> >>>> optimal. For example, after a tumbling window, the watermark can >> >> actually >> >>>> be advanced to just before the expiring of next window. I think that >> in >> >>>> general, each operator may need to generate new watermarks instead of >> >>>> simply propagating them. Once we accept that watermarks may change >> >> during >> >>>> the execution, it appears that the timestamp columns may also >> change, as >> >>>> long as we have some way to associate watermark with it. My >> intuition is >> >>>> that once we have a through solution for the watermark issue, we may >> be >> >>>> able to solve the problem we encountered for Table.map in a cleaner >> way. >> >>>> But this is a complex issue which deserves a discussion on its own. >> >>>> >> >>>> Regards, >> >>>> Xiaowei >> >>>> >> >>>> >> >>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski < >> >> [hidden email]> >> >>>> wrote: >> >>>> >> >>>>> Hi, >> >>>>> >> >>>>> Isn’t the problem of multiple expressions limited only to `flat***` >> >>>>> functions and to be more specific only to having two (or more) >> >>>>> different >> >>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a), >> >>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined >> >>>>> (duplicate result of every scalar function to every record. Or am I >> >>>> missing >> >>>>> something? >> >>>>> >> >>>>> Another remark, I would be in favour of not using abbreviations and >> >>>> naming >> >>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`. >> >>>>> >> >>>>> Piotrek >> >>>>> >> >>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <[hidden email]> wrote: >> >>>>>> >> >>>>>> Hi Jincheng, >> >>>>>> >> >>>>>> I said before, that I think that the append() method is better than >> >>>>>> implicitly forwarding keys, but still, I believe it adds >> unnecessary >> >>>>> boiler >> >>>>>> plate code. >> >>>>>> >> >>>>>> Moreover, I haven't seen a convincing argument why map(Expression*) >> >>>>>> is >> >>>>>> worse than map(Expression). In either case we need to do all kinds >> >> of >> >>>>>> checks to prevent invalid use of functions. >> >>>>>> If the method is not correctly used, we can emit a good error >> >> message >> >>>> and >> >>>>>> documenting map(Expression*) will be easier than >> >>>>> map(append(Expression*)), >> >>>>>> in my opinion. >> >>>>>> I think we should not add unnessary syntax unless there is a good >> >>>> reason >> >>>>>> and to be honest, I haven't seen this reason yet. >> >>>>>> >> >>>>>> Regarding the groupBy.agg() method, I think it should behave just >> >>>>>> like >> >>>>> any >> >>>>>> other method, i.e., not do any implicit forwarding. >> >>>>>> Let's take the example of the windowed group by, that you posted >> >>>> before. >> >>>>>> >> >>>>>> tab.window(Tumble ... as 'w) >> >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> >>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >>>>>> >> >>>>>> What happens if 'w.rowtime is not selected? What is the data type >> of >> >>>> the >> >>>>>> field 'w in the resulting Table? Is it a regular field at all or >> >> just >> >>>>>> a >> >>>>>> system field that disappears if it is not selected? >> >>>>>> >> >>>>>> IMO, the following syntax is shorter, more explicit, and better >> >>>>>> aligned >> >>>>>> with the regular window.groupBy.select aggregations that are >> >>>>>> supported >> >>>>>> today. >> >>>>>> >> >>>>>> tab.window(Tumble ... as 'w) >> >>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a)) >> >>>>>> >> >>>>>> >> >>>>>> Best, Fabian >> >>>>>> >> >>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun < >> >>>>>> [hidden email]>: >> >>>>>> >> >>>>>>> Hi Fabian/Xiaowei, >> >>>>>>> >> >>>>>>> I am very sorry for my late reply! Glad to see your reply, and >> >>>>>>> sounds >> >>>>>>> pretty good! >> >>>>>>> I agree that the approach with append() which can clearly defined >> >>>>>>> the >> >>>>>>> result schema is better which Fabian mentioned. >> >>>>>>> In addition and append() and also contains non-time attributes, >> >>>>>>> e.g.: >> >>>>>>> >> >>>>>>> tab('name, 'age, 'address, 'rowtime) >> >>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2, >> >>>>>>> 'address, 'rowtime) >> >>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w) >> >>>>>>> .groupBy('w, 'address) >> >>>>>>> >> >>>>>>> In this way the append() is very useful, and the behavior is very >> >>>>> similar >> >>>>>>> to withForwardedFields() in DataSet. >> >>>>>>> So +1 to using append() approach for the map()&flatmap()! >> >>>>>>> >> >>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree >> >>>>>>> Xiaowei's approach that define the keys to be implied in the >> result >> >>>>> table >> >>>>>>> and appears at the beginning, for example as follows: >> >>>>>>> tab.window(Tumble ... as 'w) >> >>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2) >> >>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >>>>>>> >> >>>>>>> What to you think? @Fabian @Xiaowei >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Jincheng >> >>>>>>> >> >>>>>>> Fabian Hueske <[hidden email]> 于2018年11月9日周五 下午6:35写道: >> >>>>>>> >> >>>>>>>> Hi Jincheng, >> >>>>>>>> >> >>>>>>>> Thanks for the summary! >> >>>>>>>> I like the approach with append() better than the implicit >> >>>>>>>> forwarding >> >>>>> as >> >>>>>>> it >> >>>>>>>> clearly indicates which fields are forwarded. >> >>>>>>>> However, I don't see much benefit over the flatMap(Expression*) >> >>>>> variant, >> >>>>>>> as >> >>>>>>>> we would still need to analyze the full expression tree to ensure >> >>>> that >> >>>>> at >> >>>>>>>> most (or exactly?) one Scalar / TableFunction is used. >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> Fabian >> >>>>>>>> >> >>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun < >> >>>>>>>> [hidden email]>: >> >>>>>>>> >> >>>>>>>>> Hi all, >> >>>>>>>>> >> >>>>>>>>> We are discussing very detailed content about this proposal. We >> >>>>>>>>> are >> >>>>>>>> trying >> >>>>>>>>> to design the API in many aspects (functionality, compatibility, >> >>>> ease >> >>>>>>> of >> >>>>>>>>> use, etc.). I think this is a very good process. Only such a >> >>>> detailed >> >>>>>>>>> discussion, In order to develop PR more clearly and smoothly in >> >>>>>>>>> the >> >>>>>>> later >> >>>>>>>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a >> >>>>>>>>> lot >> >>>>> of >> >>>>>>>>> good ideas. >> >>>>>>>>> About the definition of method signatures I want to share my >> >>>>>>>>> points >> >>>>>>> here >> >>>>>>>>> which I am discussing with fabian in google doc (not yet >> >>>>>>>>> completed), >> >>>>> as >> >>>>>>>>> follows: >> >>>>>>>>> >> >>>>>>>>> Assume we have a table: >> >>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long, >> >> 'string, >> >>>>>>>>> 'proctime.proctime) >> >>>>>>>>> >> >>>>>>>>> Approach 1: >> >>>>>>>>> case1: Map follows Source Table >> >>>>>>>>> val result = >> >>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime >> >>>> implied >> >>>>>>> in >> >>>>>>>>> the output >> >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >> >>>>>>>>> >> >>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above) >> >>>>>>>>> val result = >> >>>>>>>>> tab.window(Tumble ... as 'w) >> >>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key. >> >>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2) >> >>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime) >> >>>>>>>>> >> >>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result >> schema >> >>>>> would >> >>>>>>>> be >> >>>>>>>>> clearly defined, but add a built-in append UDF. That make >> >>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression. >> >>>>>>>>> val result = >> >>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1, >> >>>>>>>>> 'col2, >> >>>>>>>>> 'long, 'proctime) >> >>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w) >> >>>>>>>>> >> >>>>>>>>> Note: Append is a special UDF for built-in that can pass through >> >>>>>>>>> any >> >>>>>>>>> column. >> >>>>>>>>> >> >>>>>>>>> So, May be we can defined the as table.map(Expression) first, >> >> If >> >>>>>>>>> necessary, we can extend to table.map(Expression*) in the >> future >> >>>>>>>>> ? >> >>>>> Of >> >>>>>>>>> course, I also hope that we can do more perfection in this >> >>>>>>>>> proposal >> >>>>>>>> through >> >>>>>>>>> discussion. >> >>>>>>>>> >> >>>>>>>>> Thanks, >> >>>>>>>>> Jincheng >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Xiaowei Jiang <[hidden email]> 于2018年11月7日周三 下午11:45写道: >> >>>>>>>>> >> >>>>>>>>>> Hi Fabian, >> >>>>>>>>>> >> >>>>>>>>>> I think that the key question you raised is if we allow extra >> >>>>>>>> parameters >> >>>>>>>>> in >> >>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing >> that >> >>>> may >> >>>>>>>>> appear >> >>>>>>>>>> more convenient in some cases. However, it might also cause >> some >> >>>>>>>>> confusions >> >>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these >> >>>>>>>>> expressions? >> >>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does >> >>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? >> >>>>>>>>>> Even >> >>>>>>>> though >> >>>>>>>>>> not allowing it may appear less powerful, but it can make >> things >> >>>> more >> >>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the >> >> keys >> >>>> to >> >>>>>>> be >> >>>>>>>>>> implied in the result table and appears at the beginning. You >> >> can >> >>>>>>> use a >> >>>>>>>>>> select method if you want to modify this behavior. I think that >> >>>>>>>>> eventually >> >>>>>>>>>> we will have some API which allows other expressions as >> >>>>>>>>>> additional >> >>>>>>>>>> parameters, but I think it's better to do that after we >> >> introduce >> >>>> the >> >>>>>>>>>> concept of nested tables. A lot of things we suggested here can >> >>>>>>>>>> be >> >>>>>>>>>> considered as special cases of that. But things are much >> simpler >> >>>>>>>>>> if >> >>>>>>> we >> >>>>>>>>>> leave that to later. >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Xiaowei >> >>>>>>>>>> >> >>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske < >> [hidden email] >> >>> >> >>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Hi, >> >>>>>>>>>>> >> >>>>>>>>>>> * Re emit: >> >>>>>>>>>>> I think we should start with a well understood semantics of >> >> full >> >>>>>>>>>>> replacement. This is how the other agg functions work. >> >>>>>>>>>>> As was said before, there are open questions regarding an >> >> append >> >>>>>>> mode >> >>>>>>>>>>> (checkpointing, whether supporting retractions or not and if >> >> yes >> >>>>>>> how >> >>>>>>>> to >> >>>>>>>>>>> declare them, ...). >> >>>>>>>>>>> Since this seems to be an optimization, I'd postpone it. >> >>>>>>>>>>> >> >>>>>>>>>>> * Re grouping keys: >> >>>>>>>>>>> I don't think we should automatically add them because the >> >>>>>>>>>>> result >> >>>>>>>>> schema >> >>>>>>>>>>> would not be intuitive. >> >>>>>>>>>>> Would they be added at the beginning of the tuple or at the >> >> end? >> >>>>>>> What >> >>>>>>>>>>> metadata fields of windows would be added? In which order >> would >> >>>>>>> they >> >>>>>>>> be >> >>>>>>>>>>> added? >> >>>>>>>>>>> >> >>>>>>>>>>> However, we could support syntax like this: >> >>>>>>>>>>> val t: Table = ??? >> >>>>>>>>>>> t >> >>>>>>>>>>> .window(Tumble ... as 'w) >> >>>>>>>>>>> .groupBy('a, 'b) >> >>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime >> >> as >> >>>>>>>>> 'rtime) >> >>>>>>>>>>> >> >>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2, >> >>>>>>>>>>> ..., >> >>>>>>> fn, >> >>>>>>>>>> wend, >> >>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF. >> >>>>>>>>>>> >> >>>>>>>>>>> * Re Multi-staged evaluation: >> >>>>>>>>>>> I think this should be an optimization that can be applied if >> >>>>>>>>>>> the >> >>>>>>> UDF >> >>>>>>>>>>> implements the merge() method. >> >>>>>>>>>>> >> >>>>>>>>>>> Best, Fabian >> >>>>>>>>>>> >> >>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang < >> >>>>>>>>>>> [hidden email] >> >>>>>>>>>>>> : >> >>>>>>>>>>> >> >>>>>>>>>>>> Hi xiaowei, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Yes, I agree with you that the semantics of >> >>>>>>> TableAggregateFunction >> >>>>>>>>> emit >> >>>>>>>>>>> is >> >>>>>>>>>>>> much more complex than AggregateFunction. The fundamental >> >>>>>>>> difference >> >>>>>>>>> is >> >>>>>>>>>>>> that TableAggregateFunction emits a "table" while >> >>>>>>> AggregateFunction >> >>>>>>>>>>> outputs >> >>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it >> >> only >> >>>>>>> has >> >>>>>>>>> one >> >>>>>>>>>>>> mode which is “replacing” (complete update). But for >> >>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit >> the >> >>>>>>> new >> >>>>>>>>>>> updated >> >>>>>>>>>>>> results) update or complete update (always emit the entire >> >>>>>>>>>>>> table >> >>>>>>>> when >> >>>>>>>>>>>> “emit" is triggered). From the performance perspective, we >> >>>>>>>>>>>> might >> >>>>>>>>> want >> >>>>>>>>>> to >> >>>>>>>>>>>> use incremental update. But we need review and design this >> >>>>>>>> carefully, >> >>>>>>>>>>>> especially taking into account the cases of the failover >> >>>>>>>>>>>> (instead >> >>>>>>>> of >> >>>>>>>>>> just >> >>>>>>>>>>>> back-up the ACC it may also needs to remember the emit >> offset) >> >>>>>>> and >> >>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit >> >>>>>>>>>>>> are >> >>>>>>>>>>> different >> >>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it >> does >> >>>>>>> not >> >>>>>>>>> need >> >>>>>>>>>>> to >> >>>>>>>>>>>> worry this due to the nature of stateless. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Regards, >> >>>>>>>>>>>> Shaoxuan >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang >> >>>>>>>>>>>> <[hidden email] >> >>>>>>>> >> >>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> Hi Jincheng, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a >> >>>>>>> very >> >>>>>>>>>> good >> >>>>>>>>>>>>> start. There are a few points that we need to have more >> >>>>>>>>> discussions. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast, >> >>>>>>>>>> definitely >> >>>>>>>>>>>> the >> >>>>>>>>>>>>> most complex user defined objects we introduced so far. I >> >>>>>>>> think >> >>>>>>>>>>> there >> >>>>>>>>>>>>> are >> >>>>>>>>>>>>> quite some interesting questions here. For example, do we >> >>>>>>>> allow >> >>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the >> >>>>>>>> semantics >> >>>>>>>>> of >> >>>>>>>>>>>>> emit? Is >> >>>>>>>>>>>>> it amendments to the previous output, or replacing it? I >> >>>>>>> think >> >>>>>>>>>> that >> >>>>>>>>>>>> this >> >>>>>>>>>>>>> subject itself is worth a discussion to make sure we get >> >> the >> >>>>>>>>>> details >> >>>>>>>>>>>>> right. >> >>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically >> >>>>>>> appear >> >>>>>>>> in >> >>>>>>>>>> the >> >>>>>>>>>>>>> output? how about the case of windowing aggregation? >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>> Xiaowei >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun < >> >>>>>>>>>> [hidden email]> >> >>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> Hi, Xiaowei, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement >> >>>>>>>> Outline >> >>>>>>>>> ! >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I quickly looked at the overall content, these are good >> >>>>>>>>> expressions >> >>>>>>>>>>> of >> >>>>>>>>>>>>> our >> >>>>>>>>>>>>>> offline discussions. But from the points of my view, we >> >>>>>>> should >> >>>>>>>>> add >> >>>>>>>>>>> the >> >>>>>>>>>>>>>> usage of public interfaces that we will introduce in this >> >>>>>>>>> propose. >> >>>>>>>>>>>> So, I >> >>>>>>>>>>>>>> added the following usage description of interface and >> >>>>>>>> operators >> >>>>>>>>>> in >> >>>>>>>>>>>>>> google doc: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 1. Map Operator >> >>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator can >> >>>>>>>>>> apply a >> >>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as >> >>>>>>>>> follows: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> val res = tab >> >>>>>>>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c) >> >>>>>>>>>>>>>> .select(‘a, ‘c) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 2. FlatMap Operator >> >>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap >> >>>>>>>> operator >> >>>>>>>>>> can >> >>>>>>>>>>>>> apply >> >>>>>>>>>>>>>> a table function, and can return multi-row. The usage as >> >>>>>>>> follows: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> val res = tab >> >>>>>>>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c) >> >>>>>>>>>>>>>> .select(‘a, ‘c) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 3. Agg Operator >> >>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg >> >>>>>>>>>>> operator >> >>>>>>>>>>>>> can >> >>>>>>>>>>>>>> apply a aggregate function, and can return multi-column. >> The >> >>>>>>>>> usage >> >>>>>>>>>> as >> >>>>>>>>>>>>>> follows: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> val res = tab >> >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> >>>>>>> global >> >>>>>>>>>>>>> aggregates >> >>>>>>>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c) >> >>>>>>>>>>>>>> .select(‘a, ‘c) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 4. FlatAgg Operator >> >>>>>>>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable, >> >>>>>>>>>> FaltAgg >> >>>>>>>>>>>>>> operator can apply a table aggregate function, and can >> >> return >> >>>>>>>>>>>> multi-row. >> >>>>>>>>>>>>>> The usage as follows: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> val res = tab >> >>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define >> >>>>>>>> global >> >>>>>>>>>>> table >> >>>>>>>>>>>>>> aggregates >> >>>>>>>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c) >> >>>>>>>>>>>>>> .select(‘a, ‘c) >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 5. TableAggregateFunction >> >>>>>>>>>>>>>> The behavior of table aggregates is most like >> >>>>>>>>>>> GroupReduceFunction >> >>>>>>>>>>>>> did, >> >>>>>>>>>>>>>> which computed for a group of elements, and output a group >> >>>>>>> of >> >>>>>>>>>>>> elements. >> >>>>>>>>>>>>>> The TableAggregateFunction can be applied on >> >>>>>>>>>> GroupedTable.flatAgg() . >> >>>>>>>>>>>> The >> >>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content, >> so >> >>>>>>> I >> >>>>>>>>>> don't >> >>>>>>>>>>>> copy >> >>>>>>>>>>>>>> it here, Please look at the detail in google doc: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>> >> >> >> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and >> >>>>>>>> commenting. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>> Jincheng >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>>> >> >>>> >> >>> >> >> >> >> >> >> -- >> >> >> >> >> ----------------------------------------------------------------------------------- >> >> >> >> *Rome was not built in one day* >> >> >> >> >> >> >> ----------------------------------------------------------------------------------- >> >> >> >> |
Free forum by Nabble | Edit this page |