Hi everybody,
First, I want to introduce myself to the community. I am a PhD student who wants to work with and improve Flink. Second, I thought to work on improving aggregations as a start. My first goal is to simplify the computaton of a field average. Basically, I want to turn this plan: val input = env.fromCollection( Array(1L, 2L, 3L, 4L) ) input .map { in => (in, 1L) } .sum(0).andSum(1) .map { in => in._1.toDouble / in._2.toDouble } into this: // val input = ... input.average(0).print() My basic idea is to internally still add the counter field and execute the map and sum steps but to hide them from the user. Next, I want to support multiple aggregations so one can write something like: input.min(0).max(0).sum(0).count(0).average(0) Internally, there should only be one pass over the input data and average should reuse the work done by sum and count. In September there was some discussion [1] on the semantics of the min/max aggregations vs. minBy/maxBy. The consensus was that min/max should not simply return the respective field value but return the entire tuple. However, for count/sum/average there is no specific tuple and it would also not work for combinations of min/max. One possible route is to simply return a random element, similar to MySQL. I think this can be very surprising to the user especially when min/max are combined. Another possibility is to return the tuple only for single invocations of min or max and return the field value for the other aggregation functions or combinations. This is also inconstent but appears to be more inline with people's expectation. Also, there might be two or more tuples with the same min/max value and then the question is which should be returned. I haven't yet thought about aggregations in a streaming context and I would appreciate any input on this. Best, Viktor [1] http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Aggregations-td1706.html |
Hi Viktor,
welcome on the dev mailing list! :-) I agree that Flink's aggregations should be improved in various aspects: - support more aggregation functions. Currently only MIN, MAX, SUM are supported. Adding COUNT and AVG would be nice! - support for multiple aggregations per field - support for aggregations on POJO DataSets How about to always return Tuples as the result of an aggregation. For example something like: DataSet<Tuple2<String, Integer>> ds = ... DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> result = ds.groupBy(0).min(1).andMax(1).andCnt(); or DataSet<Tuple2<String, Integer>> ds = ... DataSet<Tuple4<String,Integer, Integer, Long> result = ds.groupBy(0).key(0).andMin(1).andMax(1).andCnt(); In the first version, an arbitrary element of the group is added to the result to identify the keys. The second example explicitly extracts the key from the original input data. POJO data types can be handled similarly by specifying the member fields to aggregate (or copy as key) by name. Doing aggregations "in-place" within an input data type (and leaving other fields untouched) could be a special variant of this operator. 2014-10-31 18:50 GMT+01:00 Rosenfeld, Viktor <[hidden email]> : > Hi everybody, > > First, I want to introduce myself to the community. I am a PhD student who > wants to work with and improve Flink. > > Second, I thought to work on improving aggregations as a start. My first > goal is to simplify the computaton of a field average. Basically, I want to > turn this plan: > > val input = env.fromCollection( Array(1L, 2L, 3L, 4L) ) > > input > .map { in => (in, 1L) } > .sum(0).andSum(1) > .map { in => in._1.toDouble / in._2.toDouble } > > into this: > > // val input = ... > input.average(0).print() > > My basic idea is to internally still add the counter field and execute the > map and sum steps but to hide them from the user. > > Next, I want to support multiple aggregations so one can write something > like: > > input.min(0).max(0).sum(0).count(0).average(0) > > Internally, there should only be one pass over the input data and average > should reuse the work done by sum and count. > > In September there was some discussion [1] on the semantics of the min/max > aggregations vs. minBy/maxBy. The consensus was that min/max should not > simply return the respective field value but return the entire tuple. > However, for count/sum/average there is no specific tuple and it would also > not work for combinations of min/max. > > One possible route is to simply return a random element, similar to MySQL. > I think this can be very surprising to the user especially when min/max are > combined. > > Another possibility is to return the tuple only for single invocations of > min or max and return the field value for the other aggregation functions > or combinations. This is also inconstent but appears to be more inline with > people's expectation. Also, there might be two or more tuples with the same > min/max value and then the question is which should be returned. > > I haven't yet thought about aggregations in a streaming context and I > would appreciate any input on this. > > Best, > Viktor > > [1] > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Aggregations-td1706.html > > |
Hi Fabian,
Wouldn't it make sense to use the call to groupBy() to also extract the key fields? So in your example, the call to key(0) is redundant. If there are multiple fields specified in groupBy() then all of them would be used as the key. If the user only wants a specific key, he can specify them by explicitly calling the key() method. Specifying a field in key() that is not used in groupBy() would be an error. This is close to (proper) SQL semantics. What do you think? I'm not a big fan of how MySQL let's you specify attributes that are not grouped or averaged and returns a random element for them. (I think that's a bug in MySQL, although there's probably a reason for the behavior.) Best, Viktor |
That sounds good to me. Although making the key copy implicit might confuse
users who need to take that into account when specifying the type of data sets of operators... Taking the key(s) from the Grouping should not be a problem. 2014-11-04 15:12 GMT+01:00 viktor.rosenfeld <[hidden email]>: > Hi Fabian, > > > Fabian Hueske wrote > > DataSet<Tuple2<String, Integer>> ds = ... > > DataSet<Tuple4<String,Integer, Integer, Long> result = > > ds.groupBy(0).key(0).andMin(1).andMax(1).andCnt(); > > > > The second example explicitly extracts the key > > from the original input data. > > Wouldn't it make sense to use the call to groupBy() to also extract the key > fields? So in your example, the call to key(0) is redundant. If there are > multiple fields specified in groupBy() then all of them would be used as > the > key. If the user only wants a specific key, he can specify them by > explicitly calling the key() method. Specifying a field in key() that is > not > used in groupBy() would be an error. This is close to (proper) SQL > semantics. > > What do you think? > > I'm not a big fan of how MySQL let's you specify attributes that are not > grouped or averaged and returns a random element for them. (I think that's > a > bug in MySQL, although there's probably a reason for the behavior.) > > Best, > Viktor > > > > -- > View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2359.html > Sent from the Apache Flink (Incubator) Mailing List archive. mailing list > archive at Nabble.com. > |
Hi Fabian,
I ran into a problem with your syntax example: DataSet<Tuple2<String, Integer>> ds = ... DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> result = ds.groupBy(0).min(1).andMax(1).andCnt(); Basically, in the example above we don't know how long the chain of aggregation method calls is. Also, each aggregation method call adds a field to the result tuple (the first call to groupBy returns a Tuple1). Because the resultType of an operator is specified in the constructur, every one of those method calls needs to create a new Operator<OUT> with the correct result type. However, only the translateToDataflow method of the last method call in the chain should actually compute the aggregation. This can be achieved by testing if an aggregation method is called on an AggregationOperator. The translateToDataFlow method of the operators in the start/middle of the chain would then just return a MapOperatorBase which simply extends the tuple. The translateToDataFlow method of the last operator in the chain would return a GroupReduceOperatorBase. This strategy seems very hackish and involves lots of unnecessary copying of tuple data. I think a better way would be to use the following syntax: ds.groupBy(0).aggregate(min(1), max(1), cnt()) or ds.groupBy(0).min(1).max(1).cnt(1).aggregate() Here, there is only one method which creates a new operator, the aggregate method, and the final resultType is known when aggregate is called. What do you think? Best, Viktor |
I like this version: ds.groupBy(0).aggregate(min(1), max(1), cnt()),
very concise. On Mon, Nov 10, 2014 at 10:42 AM, Viktor Rosenfeld <[hidden email]> wrote: > Hi Fabian, > > I ran into a problem with your syntax example: > > DataSet<Tuple2<String, Integer>> ds = ... > DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> result = > ds.groupBy(0).min(1).andMax(1).andCnt(); > > Basically, in the example above we don't know how long the chain of > aggregation method calls is. Also, each aggregation method call adds a > field to the result tuple (the first call to groupBy returns a > Tuple1). Because the resultType of an operator is specified in the > constructur, every one of those method calls needs to create a new > Operator<OUT> with the correct result type. However, only the > translateToDataflow method of the last method call in the chain should > actually compute the aggregation. > > This can be achieved by testing if an aggregation method is called on > an AggregationOperator. The translateToDataFlow method of the > operators in the start/middle of the chain would then just return a > MapOperatorBase which simply extends the tuple. The > translateToDataFlow method of the last operator in the chain would > return a GroupReduceOperatorBase. > > This strategy seems very hackish and involves lots of unnecessary > copying of tuple data. I think a better way would be to use the > following syntax: > > ds.groupBy(0).aggregate(min(1), max(1), cnt()) > > or > > ds.groupBy(0).min(1).max(1).cnt(1).aggregate() > > Here, there is only one method which creates a new operator, the > aggregate method, and the final resultType is known when aggregate is > called. > > What do you think? > > Best, > Viktor > > > > -- > View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html > Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com. |
I also support this approach:
ds.groupBy(0).aggregate(min(1), max(1), cnt()) I think it makes the code more readable, because it is easy to see whats in the result tuple. Gyula > On 10 Nov 2014, at 10:49, Aljoscha Krettek <[hidden email]> wrote: > > I like this version: ds.groupBy(0).aggregate(min(1), max(1), cnt()), > very concise. > > On Mon, Nov 10, 2014 at 10:42 AM, Viktor Rosenfeld > <[hidden email]> wrote: >> Hi Fabian, >> >> I ran into a problem with your syntax example: >> >> DataSet<Tuple2<String, Integer>> ds = ... >> DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> result = >> ds.groupBy(0).min(1).andMax(1).andCnt(); >> >> Basically, in the example above we don't know how long the chain of >> aggregation method calls is. Also, each aggregation method call adds a >> field to the result tuple (the first call to groupBy returns a >> Tuple1). Because the resultType of an operator is specified in the >> constructur, every one of those method calls needs to create a new >> Operator<OUT> with the correct result type. However, only the >> translateToDataflow method of the last method call in the chain should >> actually compute the aggregation. >> >> This can be achieved by testing if an aggregation method is called on >> an AggregationOperator. The translateToDataFlow method of the >> operators in the start/middle of the chain would then just return a >> MapOperatorBase which simply extends the tuple. The >> translateToDataFlow method of the last operator in the chain would >> return a GroupReduceOperatorBase. >> >> This strategy seems very hackish and involves lots of unnecessary >> copying of tuple data. I think a better way would be to use the >> following syntax: >> >> ds.groupBy(0).aggregate(min(1), max(1), cnt()) >> >> or >> >> ds.groupBy(0).min(1).max(1).cnt(1).aggregate() >> >> Here, there is only one method which creates a new operator, the >> aggregate method, and the final resultType is known when aggregate is >> called. >> >> What do you think? >> >> Best, >> Viktor >> >> >> >> -- >> View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html >> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com. |
How/where do you plan to define the methods min(1), max(1), and cnt()?
If these are static methods in some kind of Aggregation class, it won't look so concise anymore, or am I missing something here? I would be fine with both ways, the second one being nice, if it can be done like that. 2014-11-10 11:03 GMT+01:00 Gyula Fora <[hidden email]>: > I also support this approach: > > ds.groupBy(0).aggregate(min(1), max(1), cnt()) > > I think it makes the code more readable, because it is easy to see whats > in the result tuple. > > Gyula > > > On 10 Nov 2014, at 10:49, Aljoscha Krettek <[hidden email]> wrote: > > > > I like this version: ds.groupBy(0).aggregate(min(1), max(1), cnt()), > > very concise. > > > > On Mon, Nov 10, 2014 at 10:42 AM, Viktor Rosenfeld > > <[hidden email]> wrote: > >> Hi Fabian, > >> > >> I ran into a problem with your syntax example: > >> > >> DataSet<Tuple2<String, Integer>> ds = ... > >> DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> > result = > >> ds.groupBy(0).min(1).andMax(1).andCnt(); > >> > >> Basically, in the example above we don't know how long the chain of > >> aggregation method calls is. Also, each aggregation method call adds a > >> field to the result tuple (the first call to groupBy returns a > >> Tuple1). Because the resultType of an operator is specified in the > >> constructur, every one of those method calls needs to create a new > >> Operator<OUT> with the correct result type. However, only the > >> translateToDataflow method of the last method call in the chain should > >> actually compute the aggregation. > >> > >> This can be achieved by testing if an aggregation method is called on > >> an AggregationOperator. The translateToDataFlow method of the > >> operators in the start/middle of the chain would then just return a > >> MapOperatorBase which simply extends the tuple. The > >> translateToDataFlow method of the last operator in the chain would > >> return a GroupReduceOperatorBase. > >> > >> This strategy seems very hackish and involves lots of unnecessary > >> copying of tuple data. I think a better way would be to use the > >> following syntax: > >> > >> ds.groupBy(0).aggregate(min(1), max(1), cnt()) > >> > >> or > >> > >> ds.groupBy(0).min(1).max(1).cnt(1).aggregate() > >> > >> Here, there is only one method which creates a new operator, the > >> aggregate method, and the final resultType is known when aggregate is > >> called. > >> > >> What do you think? > >> > >> Best, > >> Viktor > >> > >> > >> > >> -- > >> View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html > >> Sent from the Apache Flink (Incubator) Mailing List archive. mailing > list archive at Nabble.com. > > |
I guess you would need static method imports to make the code look like
this, which I think is fine. On Mon, Nov 10, 2014 at 12:00 PM, Fabian Hueske <[hidden email]> wrote: > How/where do you plan to define the methods min(1), max(1), and cnt()? > If these are static methods in some kind of Aggregation class, it won't > look so concise anymore, or am I missing something here? > > I would be fine with both ways, the second one being nice, if it can be > done like that. > > 2014-11-10 11:03 GMT+01:00 Gyula Fora <[hidden email]>: > > > I also support this approach: > > > > ds.groupBy(0).aggregate(min(1), max(1), cnt()) > > > > I think it makes the code more readable, because it is easy to see whats > > in the result tuple. > > > > Gyula > > > > > On 10 Nov 2014, at 10:49, Aljoscha Krettek <[hidden email]> > wrote: > > > > > > I like this version: ds.groupBy(0).aggregate(min(1), max(1), cnt()), > > > very concise. > > > > > > On Mon, Nov 10, 2014 at 10:42 AM, Viktor Rosenfeld > > > <[hidden email]> wrote: > > >> Hi Fabian, > > >> > > >> I ran into a problem with your syntax example: > > >> > > >> DataSet<Tuple2<String, Integer>> ds = ... > > >> DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> > > result = > > >> ds.groupBy(0).min(1).andMax(1).andCnt(); > > >> > > >> Basically, in the example above we don't know how long the chain of > > >> aggregation method calls is. Also, each aggregation method call adds a > > >> field to the result tuple (the first call to groupBy returns a > > >> Tuple1). Because the resultType of an operator is specified in the > > >> constructur, every one of those method calls needs to create a new > > >> Operator<OUT> with the correct result type. However, only the > > >> translateToDataflow method of the last method call in the chain should > > >> actually compute the aggregation. > > >> > > >> This can be achieved by testing if an aggregation method is called on > > >> an AggregationOperator. The translateToDataFlow method of the > > >> operators in the start/middle of the chain would then just return a > > >> MapOperatorBase which simply extends the tuple. The > > >> translateToDataFlow method of the last operator in the chain would > > >> return a GroupReduceOperatorBase. > > >> > > >> This strategy seems very hackish and involves lots of unnecessary > > >> copying of tuple data. I think a better way would be to use the > > >> following syntax: > > >> > > >> ds.groupBy(0).aggregate(min(1), max(1), cnt()) > > >> > > >> or > > >> > > >> ds.groupBy(0).min(1).max(1).cnt(1).aggregate() > > >> > > >> Here, there is only one method which creates a new operator, the > > >> aggregate method, and the final resultType is known when aggregate is > > >> called. > > >> > > >> What do you think? > > >> > > >> Best, > > >> Viktor > > >> > > >> > > >> > > >> -- > > >> View this message in context: > > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html > > >> Sent from the Apache Flink (Incubator) Mailing List archive. mailing > > list archive at Nabble.com. > > > > > |
Yes, you would need static import methods.
Best, Viktor
|
Hi everybody,
I've created a GitHub branch for the new aggregation code: https://github.com/he-sk/incubator-flink/tree/aggregation I have implemented both of the APIs that I proposed earlier, so people can play around and decide which they like better: DataSet ds = ... ds.groupBy(0).aggregate(min(1), max(1), count()) And: DataSet ds = ... ds.groupBy(0).min(1).max(1).count().aggregate() The second version is a thin layer on the first version. The aggregation functions min, max, sum, count, and average are supported. For groupings, you can select the group keys with (multiple) key() pseudo-aggregation functions. By default, all group keys are used. You can find examples in AggregationApi1Test.java and AggregationApi2Test.java. Right now, only the Java API uses the new aggregation code. I've only started learning Scala so I don't know how easy it will be to port the new API. One problem that I foresee is that the type information of the input tuples is lost. Therefore, the Scala compiler cannot do type inference on the output tuple. I hope that this can be fixed or worked around by simple specifying the output tuple type directly. I've kept the old aggregation API but marked it deprecated and renamed some functions. The next steps would be: 1) Implement Scala API. 2) Add support for POJOs (sync with streaming aggregations for that). Looking forward to your input. Best, Viktor |
Hi Viktor,
I had a look at your branch. First of all, it looks like very good work! Good code quality, lots of tests, well documented, nice! I like the first approach (ds.aggregate(min(1), max(2), count()) much better than the other one. It basically shows how the result tuple is constructed. I also have a few comments on the code and the overall approach: - I would split the branch into two branches, one for each approach. That make comparisons with master much easier. - I am not sure about the implicit adding of key fields if they are not explicitly added by the user in the aggregation. It might confuse users if the return type looks different from what they have specified. How about having an allKeys() function that adds all keys of the grouping and not adding keys by default? - DataSet and UnorderedGrouping expose getter and setter methods for the AggregationOperatorFactory. These methods are public and therefore facing the user API. Can you make them private or even remove them. They are not really necessary, right? - The aggregation GroupReduceFunction should be combinable to make it perform better, esp. in case of aggregations on ungrouped datasets. It would be even better, if you could convert the GroupReduceFunction into a functional-style ReduceFunction. These function are always combinable and can be executed using a hash-aggregation strategy once we have that feature implemented (again better performance). However, for that you would need to have a pre- and postprocessing MapFunctions (initialize and finalize of aggregates). On the other hand, you only need three aggregation functions sum, min, and max (count is sum of ones, avg is sum/count). This design also eases the sharing of count for multiple avg aggregations. - Some integration test cases would also be nice. See for example the tests in org.apache.flink.test.javaApiOperators.* - We do not use @author tags in our code. - Finally, we try to keep the documentation in sync with the code. Once your changes are ready for a PR, you should adapt the documenation in ./docs according to your changes (no need to do it at this point). Please let me know if you have any questions. Cheers, Fabian 2014-11-19 16:41 GMT+01:00 Viktor Rosenfeld <[hidden email]>: > Hi everybody, > > I've created a GitHub branch for the new aggregation code: > https://github.com/he-sk/incubator-flink/tree/aggregation > > I have implemented both of the APIs that I proposed earlier, so people can > play around and decide which they like better: > > DataSet ds = ... > ds.groupBy(0).aggregate(min(1), max(1), count()) > > And: > > DataSet ds = ... > ds.groupBy(0).min(1).max(1).count().aggregate() > > The second version is a thin layer on the first version. > > The aggregation functions min, max, sum, count, and average are supported. > For groupings, you can select the group keys with (multiple) key() > pseudo-aggregation functions. By default, all group keys are used. > > You can find examples in AggregationApi1Test.java and > AggregationApi2Test.java. > > Right now, only the Java API uses the new aggregation code. I've only > started learning Scala so I don't know how easy it will be to port the new > API. One problem that I foresee is that the type information of the input > tuples is lost. Therefore, the Scala compiler cannot do type inference on > the output tuple. I hope that this can be fixed or worked around by simple > specifying the output tuple type directly. > > I've kept the old aggregation API but marked it deprecated and renamed some > functions. > > The next steps would be: > > 1) Implement Scala API. > 2) Add support for POJOs (sync with streaming aggregations for that). > > Looking forward to your input. > > Best, > Viktor > > > > -- > View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2547.html > Sent from the Apache Flink (Incubator) Mailing List archive. mailing list > archive at Nabble.com. > |
Hi Fabian,
thanks for your feedback. See my responses below. I've moved the changes necessary for the second approach to a branch called aggregation-alt: https://github.com/he-sk/incubator-flink/tree/aggregation-alt Done. But I'm not sure about it. It is not very clear where in the result the key fields should be added. The old code added them at the beginning. I'm now inserting them at the position where the allKeys() function is called except for those keys that are explicitly specified elsewhere. All in all, I think that the semantics are very opaque. I need the setter to test the delegation in DataSet.aggregate(). The test is fairly trivial but now that it's there, why remove it? I've made the getters and setters package private. The GroupReduce cannot be made combinable because it changes the output tuple type. CombineFunction.combine() requires that both the input and the output type are the same. I changed the implementation to use 2 MapFunctions and a ReduceFunction. Also, I implemented average so that it picks up an existing count and sum. However, if the same function is specified multiple times (e.g., 2 calls to min(0) in one aggregate) it won't be reused. The reason is that every function stores only one position of the result in the output tuple. (But two average(0) functions will use the same count and sum functions because the result of count and sum is not represented in the output tuple.) I've copied the tests in AggregateITCase and SumMinMaxITCase for that. Removed. Do you think that for a pull request the implementation of the Scala API is necessary? Or should I create a pull request from the current code? Cheers, Viktor |
Hi,
why does the GroupReduce change the output type? Can this not be done in the two mappers? In my opinion, aggregations should be combinable, otherwise, performance would be severely crippled. Cheers, Aljoscha On Thu, Nov 27, 2014 at 11:20 AM, Viktor Rosenfeld <[hidden email]> wrote: > Hi Fabian, > > thanks for your feedback. See my responses below. > > > Fabian Hueske wrote >> - I would split the branch into two branches, one for each approach. That >> make comparisons with master much easier. > > I've moved the changes necessary for the second approach to a branch called > aggregation-alt: > https://github.com/he-sk/incubator-flink/tree/aggregation-alt > > > Fabian Hueske wrote >> - I am not sure about the implicit adding of key fields if they are not >> explicitly added by the user in the aggregation. It might confuse users if >> the return type looks different from what they have specified. How about >> having an allKeys() function that adds all keys of the grouping and not >> adding keys by default? > > Done. But I'm not sure about it. > > It is not very clear where in the result the key fields should be added. The > old code added them at the beginning. I'm now inserting them at the position > where the allKeys() function is called except for those keys that are > explicitly specified elsewhere. All in all, I think that the semantics are > very > opaque. > > > Fabian Hueske wrote >> - DataSet and UnorderedGrouping expose getter and setter methods for the >> AggregationOperatorFactory. These methods are public and therefore facing >> the user API. Can you make them private or even remove them. They are not >> really necessary, right? > > I need the setter to test the delegation in DataSet.aggregate(). The test is > fairly trivial but now that it's there, why remove it? I've made the getters > and setters package private. > > > Fabian Hueske wrote >> - The aggregation GroupReduceFunction should be combinable to make it >> perform better, esp. in case of aggregations on ungrouped datasets. It >> would be even better, if you could convert the GroupReduceFunction into a >> functional-style ReduceFunction. These function are always combinable and >> can be executed using a hash-aggregation strategy once we have that >> feature >> implemented (again better performance). However, for that you would need >> to >> have a pre- and postprocessing MapFunctions (initialize and finalize of >> aggregates). On the other hand, you only need three aggregation functions >> sum, min, and max (count is sum of ones, avg is sum/count). This design >> also eases the sharing of count for multiple avg aggregations. > > The GroupReduce cannot be made combinable because it changes the output > tuple > type. CombineFunction.combine() requires that both the input and the output > type are the same. > > I changed the implementation to use 2 MapFunctions and a ReduceFunction. > > Also, I implemented average so that it picks up an existing count and sum. > However, if the same function is specified multiple times (e.g., 2 calls to > min(0) in one aggregate) it won't be reused. The reason is that every > function > stores only one position of the result in the output tuple. (But two > average(0) > functions will use the same count and sum functions because the result of > count > and sum is not represented in the output tuple.) > > > Fabian Hueske wrote >> - Some integration test cases would also be nice. See for example the >> tests >> in org.apache.flink.test.javaApiOperators.* > > I've copied the tests in AggregateITCase and SumMinMaxITCase for that. > > > Fabian Hueske wrote >> - We do not use @author tags in our code. > > Removed. > > > Fabian Hueske wrote >> - Finally, we try to keep the documentation in sync with the code. Once >> your changes are ready for a PR, you should adapt the documenation in >> ./docs according to your changes (no need to do it at this point). >> >> Please let me know if you have any questions. > > Do you think that for a pull request the implementation of the Scala API is > necessary? Or should I create a pull request from the current code? > > Cheers, > Viktor > > > > > -- > View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html > Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com. |
Viktor said he changed the implementation to
MapFunction -> ReduceFunction -> MapFunction. So it is combinable :-) 2014-11-27 11:45 GMT+01:00 Aljoscha Krettek <[hidden email]>: > Hi, > why does the GroupReduce change the output type? Can this not be done > in the two mappers? In my opinion, aggregations should be combinable, > otherwise, performance would be severely crippled. > > Cheers, > Aljoscha > > On Thu, Nov 27, 2014 at 11:20 AM, Viktor Rosenfeld > <[hidden email]> wrote: > > Hi Fabian, > > > > thanks for your feedback. See my responses below. > > > > > > Fabian Hueske wrote > >> - I would split the branch into two branches, one for each approach. > That > >> make comparisons with master much easier. > > > > I've moved the changes necessary for the second approach to a branch > called > > aggregation-alt: > > https://github.com/he-sk/incubator-flink/tree/aggregation-alt > > > > > > Fabian Hueske wrote > >> - I am not sure about the implicit adding of key fields if they are not > >> explicitly added by the user in the aggregation. It might confuse users > if > >> the return type looks different from what they have specified. How about > >> having an allKeys() function that adds all keys of the grouping and not > >> adding keys by default? > > > > Done. But I'm not sure about it. > > > > It is not very clear where in the result the key fields should be added. > The > > old code added them at the beginning. I'm now inserting them at the > position > > where the allKeys() function is called except for those keys that are > > explicitly specified elsewhere. All in all, I think that the semantics > are > > very > > opaque. > > > > > > Fabian Hueske wrote > >> - DataSet and UnorderedGrouping expose getter and setter methods for the > >> AggregationOperatorFactory. These methods are public and therefore > facing > >> the user API. Can you make them private or even remove them. They are > not > >> really necessary, right? > > > > I need the setter to test the delegation in DataSet.aggregate(). The > test is > > fairly trivial but now that it's there, why remove it? I've made the > getters > > and setters package private. > > > > > > Fabian Hueske wrote > >> - The aggregation GroupReduceFunction should be combinable to make it > >> perform better, esp. in case of aggregations on ungrouped datasets. It > >> would be even better, if you could convert the GroupReduceFunction into > a > >> functional-style ReduceFunction. These function are always combinable > and > >> can be executed using a hash-aggregation strategy once we have that > >> feature > >> implemented (again better performance). However, for that you would need > >> to > >> have a pre- and postprocessing MapFunctions (initialize and finalize of > >> aggregates). On the other hand, you only need three aggregation > functions > >> sum, min, and max (count is sum of ones, avg is sum/count). This design > >> also eases the sharing of count for multiple avg aggregations. > > > > The GroupReduce cannot be made combinable because it changes the output > > tuple > > type. CombineFunction.combine() requires that both the input and the > output > > type are the same. > > > > I changed the implementation to use 2 MapFunctions and a ReduceFunction. > > > > Also, I implemented average so that it picks up an existing count and > sum. > > However, if the same function is specified multiple times (e.g., 2 calls > to > > min(0) in one aggregate) it won't be reused. The reason is that every > > function > > stores only one position of the result in the output tuple. (But two > > average(0) > > functions will use the same count and sum functions because the result of > > count > > and sum is not represented in the output tuple.) > > > > > > Fabian Hueske wrote > >> - Some integration test cases would also be nice. See for example the > >> tests > >> in org.apache.flink.test.javaApiOperators.* > > > > I've copied the tests in AggregateITCase and SumMinMaxITCase for that. > > > > > > Fabian Hueske wrote > >> - We do not use @author tags in our code. > > > > Removed. > > > > > > Fabian Hueske wrote > >> - Finally, we try to keep the documentation in sync with the code. Once > >> your changes are ready for a PR, you should adapt the documenation in > >> ./docs according to your changes (no need to do it at this point). > >> > >> Please let me know if you have any questions. > > > > Do you think that for a pull request the implementation of the Scala API > is > > necessary? Or should I create a pull request from the current code? > > > > Cheers, > > Viktor > > > > > > > > > > -- > > View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html > > Sent from the Apache Flink (Incubator) Mailing List archive. mailing > list archive at Nabble.com. > |
In reply to this post by Viktor Rosenfeld
Hi Viktor,
thanks for the update! Regarding the explicit vs implicit adding of key fields: I would only allow to use either key(x) or allKeys() and throw an exception if they are mixed. I guess there won't be many situations, where somebody would want to mix them anyway. No need to complicate the logic and semantics for this corner case, IMO. Not sharing the result of two identical operations is fine. Whoever computes ds.aggregate(min(0), min(0)) deserves the overhead ;-) We have the goal to keep the Java and Scala APIs in sync at any point in time. You can make a pull request only with the Java changes, but it won't be merged until you (or somebody else) adapted the Scala API. I would say, do the PR and start a discussion about that. This way, everybody can review the code more easily. I'll have a detailed look at the changes later. Cheers, Fabian 2014-11-27 11:20 GMT+01:00 Viktor Rosenfeld <[hidden email]>: > Hi Fabian, > > thanks for your feedback. See my responses below. > > > Fabian Hueske wrote > > - I would split the branch into two branches, one for each approach. That > > make comparisons with master much easier. > > I've moved the changes necessary for the second approach to a branch called > aggregation-alt: > https://github.com/he-sk/incubator-flink/tree/aggregation-alt > > > Fabian Hueske wrote > > - I am not sure about the implicit adding of key fields if they are not > > explicitly added by the user in the aggregation. It might confuse users > if > > the return type looks different from what they have specified. How about > > having an allKeys() function that adds all keys of the grouping and not > > adding keys by default? > > Done. But I'm not sure about it. > > It is not very clear where in the result the key fields should be added. > The > old code added them at the beginning. I'm now inserting them at the > position > where the allKeys() function is called except for those keys that are > explicitly specified elsewhere. All in all, I think that the semantics are > very > opaque. > > > Fabian Hueske wrote > > - DataSet and UnorderedGrouping expose getter and setter methods for the > > AggregationOperatorFactory. These methods are public and therefore facing > > the user API. Can you make them private or even remove them. They are not > > really necessary, right? > > I need the setter to test the delegation in DataSet.aggregate(). The test > is > fairly trivial but now that it's there, why remove it? I've made the > getters > and setters package private. > > > Fabian Hueske wrote > > - The aggregation GroupReduceFunction should be combinable to make it > > perform better, esp. in case of aggregations on ungrouped datasets. It > > would be even better, if you could convert the GroupReduceFunction into a > > functional-style ReduceFunction. These function are always combinable and > > can be executed using a hash-aggregation strategy once we have that > > feature > > implemented (again better performance). However, for that you would need > > to > > have a pre- and postprocessing MapFunctions (initialize and finalize of > > aggregates). On the other hand, you only need three aggregation functions > > sum, min, and max (count is sum of ones, avg is sum/count). This design > > also eases the sharing of count for multiple avg aggregations. > > The GroupReduce cannot be made combinable because it changes the output > tuple > type. CombineFunction.combine() requires that both the input and the output > type are the same. > > I changed the implementation to use 2 MapFunctions and a ReduceFunction. > > Also, I implemented average so that it picks up an existing count and sum. > However, if the same function is specified multiple times (e.g., 2 calls to > min(0) in one aggregate) it won't be reused. The reason is that every > function > stores only one position of the result in the output tuple. (But two > average(0) > functions will use the same count and sum functions because the result of > count > and sum is not represented in the output tuple.) > > > Fabian Hueske wrote > > - Some integration test cases would also be nice. See for example the > > tests > > in org.apache.flink.test.javaApiOperators.* > > I've copied the tests in AggregateITCase and SumMinMaxITCase for that. > > > Fabian Hueske wrote > > - We do not use @author tags in our code. > > Removed. > > > Fabian Hueske wrote > > - Finally, we try to keep the documentation in sync with the code. Once > > your changes are ready for a PR, you should adapt the documenation in > > ./docs according to your changes (no need to do it at this point). > > > > Please let me know if you have any questions. > > Do you think that for a pull request the implementation of the Scala API is > necessary? Or should I create a pull request from the current code? > > Cheers, > Viktor > > > > > -- > View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html > Sent from the Apache Flink (Incubator) Mailing List archive. mailing list > archive at Nabble.com. > |
In reply to this post by Fabian Hueske
Ahh, I didn't see that. My bad.
On Thu, Nov 27, 2014 at 11:47 AM, Fabian Hueske <[hidden email]> wrote: > Viktor said he changed the implementation to > MapFunction -> ReduceFunction -> MapFunction. > > So it is combinable :-) > > 2014-11-27 11:45 GMT+01:00 Aljoscha Krettek <[hidden email]>: > >> Hi, >> why does the GroupReduce change the output type? Can this not be done >> in the two mappers? In my opinion, aggregations should be combinable, >> otherwise, performance would be severely crippled. >> >> Cheers, >> Aljoscha >> >> On Thu, Nov 27, 2014 at 11:20 AM, Viktor Rosenfeld >> <[hidden email]> wrote: >> > Hi Fabian, >> > >> > thanks for your feedback. See my responses below. >> > >> > >> > Fabian Hueske wrote >> >> - I would split the branch into two branches, one for each approach. >> That >> >> make comparisons with master much easier. >> > >> > I've moved the changes necessary for the second approach to a branch >> called >> > aggregation-alt: >> > https://github.com/he-sk/incubator-flink/tree/aggregation-alt >> > >> > >> > Fabian Hueske wrote >> >> - I am not sure about the implicit adding of key fields if they are not >> >> explicitly added by the user in the aggregation. It might confuse users >> if >> >> the return type looks different from what they have specified. How about >> >> having an allKeys() function that adds all keys of the grouping and not >> >> adding keys by default? >> > >> > Done. But I'm not sure about it. >> > >> > It is not very clear where in the result the key fields should be added. >> The >> > old code added them at the beginning. I'm now inserting them at the >> position >> > where the allKeys() function is called except for those keys that are >> > explicitly specified elsewhere. All in all, I think that the semantics >> are >> > very >> > opaque. >> > >> > >> > Fabian Hueske wrote >> >> - DataSet and UnorderedGrouping expose getter and setter methods for the >> >> AggregationOperatorFactory. These methods are public and therefore >> facing >> >> the user API. Can you make them private or even remove them. They are >> not >> >> really necessary, right? >> > >> > I need the setter to test the delegation in DataSet.aggregate(). The >> test is >> > fairly trivial but now that it's there, why remove it? I've made the >> getters >> > and setters package private. >> > >> > >> > Fabian Hueske wrote >> >> - The aggregation GroupReduceFunction should be combinable to make it >> >> perform better, esp. in case of aggregations on ungrouped datasets. It >> >> would be even better, if you could convert the GroupReduceFunction into >> a >> >> functional-style ReduceFunction. These function are always combinable >> and >> >> can be executed using a hash-aggregation strategy once we have that >> >> feature >> >> implemented (again better performance). However, for that you would need >> >> to >> >> have a pre- and postprocessing MapFunctions (initialize and finalize of >> >> aggregates). On the other hand, you only need three aggregation >> functions >> >> sum, min, and max (count is sum of ones, avg is sum/count). This design >> >> also eases the sharing of count for multiple avg aggregations. >> > >> > The GroupReduce cannot be made combinable because it changes the output >> > tuple >> > type. CombineFunction.combine() requires that both the input and the >> output >> > type are the same. >> > >> > I changed the implementation to use 2 MapFunctions and a ReduceFunction. >> > >> > Also, I implemented average so that it picks up an existing count and >> sum. >> > However, if the same function is specified multiple times (e.g., 2 calls >> to >> > min(0) in one aggregate) it won't be reused. The reason is that every >> > function >> > stores only one position of the result in the output tuple. (But two >> > average(0) >> > functions will use the same count and sum functions because the result of >> > count >> > and sum is not represented in the output tuple.) >> > >> > >> > Fabian Hueske wrote >> >> - Some integration test cases would also be nice. See for example the >> >> tests >> >> in org.apache.flink.test.javaApiOperators.* >> > >> > I've copied the tests in AggregateITCase and SumMinMaxITCase for that. >> > >> > >> > Fabian Hueske wrote >> >> - We do not use @author tags in our code. >> > >> > Removed. >> > >> > >> > Fabian Hueske wrote >> >> - Finally, we try to keep the documentation in sync with the code. Once >> >> your changes are ready for a PR, you should adapt the documenation in >> >> ./docs according to your changes (no need to do it at this point). >> >> >> >> Please let me know if you have any questions. >> > >> > Do you think that for a pull request the implementation of the Scala API >> is >> > necessary? Or should I create a pull request from the current code? >> > >> > Cheers, >> > Viktor >> > >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html >> > Sent from the Apache Flink (Incubator) Mailing List archive. mailing >> list archive at Nabble.com. >> |
Free forum by Nabble | Edit this page |