Hi,
my use case is the following: I have a Tuple2<String,Long>. I want to group by the String and sum up the Long values accordingly. This works fine with these lines: DataSet<Lineitem> lineitems = getLineitemDataSet(env); lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1); After the aggregation I want to print the 10 groups with the highest sum, like: string1, 100L string2, 50L string3, 1L I tried that: lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); But instead of 3 records, I get a lot more. Can see my error? Best regards, Felix |
If i remember correctly first() returns the first n values for every
group. the javadocs actually don't make this behaviour very clear. On 21.01.2015 19:18, Felix Neutatz wrote: > Hi, > > my use case is the following: > > I have a Tuple2<String,Long>. I want to group by the String and sum up the > Long values accordingly. This works fine with these lines: > > DataSet<Lineitem> lineitems = getLineitemDataSet(env); > lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, > 1); > > After the aggregation I want to print the 10 groups with the highest sum, > like: > > string1, 100L > string2, 50L > string3, 1L > > I tried that: > > lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, > 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); > > But instead of 3 records, I get a lot more. > > Can see my error? > > Best regards, > > Felix > |
Chesnay is right.
What you want is a non-grouped sort/first, which would need to be added... Stephan Am 21.01.2015 11:25 schrieb "Chesnay Schepler" < [hidden email]>: > If i remember correctly first() returns the first n values for every > group. the javadocs actually don't make this behaviour very clear. > > On 21.01.2015 19:18, Felix Neutatz wrote: > >> Hi, >> >> my use case is the following: >> >> I have a Tuple2<String,Long>. I want to group by the String and sum up the >> Long values accordingly. This works fine with these lines: >> >> DataSet<Lineitem> lineitems = getLineitemDataSet(env); >> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, >> 1); >> >> After the aggregation I want to print the 10 groups with the highest sum, >> like: >> >> string1, 100L >> string2, 50L >> string3, 1L >> >> I tried that: >> >> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); >> >> But instead of 3 records, I get a lot more. >> >> Can see my error? >> >> Best regards, >> >> Felix >> >> > |
In reply to this post by Chesnay Schepler
Chesnay is right.
Right now, it is not possible to do want you want in a straightforward way because Flink does not support to fully sort a data set (there are several related issues in JIRA). A workaround would be to attach a constant value to each tuple, group on that (all tuples are sent to the same group), sort that group, and apply the first operator. 2015-01-21 20:22 GMT+01:00 Chesnay Schepler <[hidden email]>: > If i remember correctly first() returns the first n values for every > group. the javadocs actually don't make this behaviour very clear. > > > On 21.01.2015 19:18, Felix Neutatz wrote: > >> Hi, >> >> my use case is the following: >> >> I have a Tuple2<String,Long>. I want to group by the String and sum up the >> Long values accordingly. This works fine with these lines: >> >> DataSet<Lineitem> lineitems = getLineitemDataSet(env); >> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, >> 1); >> >> After the aggregation I want to print the 10 groups with the highest sum, >> like: >> >> string1, 100L >> string2, 50L >> string3, 1L >> >> I tried that: >> >> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); >> >> But instead of 3 records, I get a lot more. >> >> Can see my error? >> >> Best regards, >> >> Felix >> >> > |
Thanks, @Fabian, your workaround works :)
But I think this feature is really missing. Shall we add this functionality natively or via the proposed lib package? 2015-01-21 20:38 GMT+01:00 Fabian Hueske <[hidden email]>: > Chesnay is right. > Right now, it is not possible to do want you want in a straightforward way > because Flink does not support to fully sort a data set (there are several > related issues in JIRA). > > A workaround would be to attach a constant value to each tuple, group on > that (all tuples are sent to the same group), sort that group, and apply > the first operator. > > 2015-01-21 20:22 GMT+01:00 Chesnay Schepler <[hidden email] > >: > > > If i remember correctly first() returns the first n values for every > > group. the javadocs actually don't make this behaviour very clear. > > > > > > On 21.01.2015 19:18, Felix Neutatz wrote: > > > >> Hi, > >> > >> my use case is the following: > >> > >> I have a Tuple2<String,Long>. I want to group by the String and sum up > the > >> Long values accordingly. This works fine with these lines: > >> > >> DataSet<Lineitem> lineitems = getLineitemDataSet(env); > >> lineitems.project(new int > []{3,0}).groupBy(0).aggregate(Aggregations.SUM, > >> 1); > >> > >> After the aggregation I want to print the 10 groups with the highest > sum, > >> like: > >> > >> string1, 100L > >> string2, 50L > >> string3, 1L > >> > >> I tried that: > >> > >> lineitems.project(new int > []{3,0}).groupBy(0).aggregate(Aggregations.SUM, > >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); > >> > >> But instead of 3 records, I get a lot more. > >> > >> Can see my error? > >> > >> Best regards, > >> > >> Felix > >> > >> > > > |
This should directly go into the API, IMO.
As I said, there are several open JIRAs for this issue. 2015-01-21 22:29 GMT+01:00 Felix Neutatz <[hidden email]>: > Thanks, @Fabian, your workaround works :) > > But I think this feature is really missing. Shall we add this functionality > natively or via the proposed lib package? > > 2015-01-21 20:38 GMT+01:00 Fabian Hueske <[hidden email]>: > > > Chesnay is right. > > Right now, it is not possible to do want you want in a straightforward > way > > because Flink does not support to fully sort a data set (there are > several > > related issues in JIRA). > > > > A workaround would be to attach a constant value to each tuple, group on > > that (all tuples are sent to the same group), sort that group, and apply > > the first operator. > > > > 2015-01-21 20:22 GMT+01:00 Chesnay Schepler < > [hidden email] > > >: > > > > > If i remember correctly first() returns the first n values for every > > > group. the javadocs actually don't make this behaviour very clear. > > > > > > > > > On 21.01.2015 19:18, Felix Neutatz wrote: > > > > > >> Hi, > > >> > > >> my use case is the following: > > >> > > >> I have a Tuple2<String,Long>. I want to group by the String and sum up > > the > > >> Long values accordingly. This works fine with these lines: > > >> > > >> DataSet<Lineitem> lineitems = getLineitemDataSet(env); > > >> lineitems.project(new int > > []{3,0}).groupBy(0).aggregate(Aggregations.SUM, > > >> 1); > > >> > > >> After the aggregation I want to print the 10 groups with the highest > > sum, > > >> like: > > >> > > >> string1, 100L > > >> string2, 50L > > >> string3, 1L > > >> > > >> I tried that: > > >> > > >> lineitems.project(new int > > []{3,0}).groupBy(0).aggregate(Aggregations.SUM, > > >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); > > >> > > >> But instead of 3 records, I get a lot more. > > >> > > >> Can see my error? > > >> > > >> Best regards, > > >> > > >> Felix > > >> > > >> > > > > > > |
BTW, I think as well that global sorting is an important feature and
definitely missing in Flink (FLINK-598). Enabling local sorting for data sinks is one step on the way which can be rather easily solved (FLINK-1105). If you would like to contribute to make sorting possible, I would be very happy to guide you ;-) Cheers, Fabian 2015-01-21 22:33 GMT+01:00 Fabian Hueske <[hidden email]>: > This should directly go into the API, IMO. > As I said, there are several open JIRAs for this issue. > > 2015-01-21 22:29 GMT+01:00 Felix Neutatz <[hidden email]>: > >> Thanks, @Fabian, your workaround works :) >> >> But I think this feature is really missing. Shall we add this >> functionality >> natively or via the proposed lib package? >> >> 2015-01-21 20:38 GMT+01:00 Fabian Hueske <[hidden email]>: >> >> > Chesnay is right. >> > Right now, it is not possible to do want you want in a straightforward >> way >> > because Flink does not support to fully sort a data set (there are >> several >> > related issues in JIRA). >> > >> > A workaround would be to attach a constant value to each tuple, group on >> > that (all tuples are sent to the same group), sort that group, and apply >> > the first operator. >> > >> > 2015-01-21 20:22 GMT+01:00 Chesnay Schepler < >> [hidden email] >> > >: >> > >> > > If i remember correctly first() returns the first n values for every >> > > group. the javadocs actually don't make this behaviour very clear. >> > > >> > > >> > > On 21.01.2015 19:18, Felix Neutatz wrote: >> > > >> > >> Hi, >> > >> >> > >> my use case is the following: >> > >> >> > >> I have a Tuple2<String,Long>. I want to group by the String and sum >> up >> > the >> > >> Long values accordingly. This works fine with these lines: >> > >> >> > >> DataSet<Lineitem> lineitems = getLineitemDataSet(env); >> > >> lineitems.project(new int >> > []{3,0}).groupBy(0).aggregate(Aggregations.SUM, >> > >> 1); >> > >> >> > >> After the aggregation I want to print the 10 groups with the highest >> > sum, >> > >> like: >> > >> >> > >> string1, 100L >> > >> string2, 50L >> > >> string3, 1L >> > >> >> > >> I tried that: >> > >> >> > >> lineitems.project(new int >> > []{3,0}).groupBy(0).aggregate(Aggregations.SUM, >> > >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); >> > >> >> > >> But instead of 3 records, I get a lot more. >> > >> >> > >> Can see my error? >> > >> >> > >> Best regards, >> > >> >> > >> Felix >> > >> >> > >> >> > > >> > >> > > |
Free forum by Nabble | Edit this page |