Very strange behaviour of groupBy() -> sort() -> first()

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Very strange behaviour of groupBy() -> sort() -> first()

Felix Neutatz
Hi,

my use case is the following:

I have a Tuple2<String,Long>. I want to group by the String and sum up the
Long values accordingly. This works fine with these lines:

DataSet<Lineitem> lineitems = getLineitemDataSet(env);
lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
1);

After the aggregation I want to print the 10 groups with the highest sum,
like:

string1, 100L
string2, 50L
string3, 1L

I tried that:

lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();

But instead of 3 records, I get a lot more.

Can see my error?

Best regards,

Felix
Reply | Threaded
Open this post in threaded view
|

Re: Very strange behaviour of groupBy() -> sort() -> first()

Chesnay Schepler
If i remember correctly first() returns the first n values for every
group. the javadocs actually don't make this behaviour very clear.

On 21.01.2015 19:18, Felix Neutatz wrote:

> Hi,
>
> my use case is the following:
>
> I have a Tuple2<String,Long>. I want to group by the String and sum up the
> Long values accordingly. This works fine with these lines:
>
> DataSet<Lineitem> lineitems = getLineitemDataSet(env);
> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
> 1);
>
> After the aggregation I want to print the 10 groups with the highest sum,
> like:
>
> string1, 100L
> string2, 50L
> string3, 1L
>
> I tried that:
>
> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
>
> But instead of 3 records, I get a lot more.
>
> Can see my error?
>
> Best regards,
>
> Felix
>

Reply | Threaded
Open this post in threaded view
|

Re: Very strange behaviour of groupBy() -> sort() -> first()

Stephan Ewen
Chesnay is right.

What you want is a non-grouped sort/first, which would need to be added...

Stephan
Am 21.01.2015 11:25 schrieb "Chesnay Schepler" <
[hidden email]>:

> If i remember correctly first() returns the first n values for every
> group. the javadocs actually don't make this behaviour very clear.
>
> On 21.01.2015 19:18, Felix Neutatz wrote:
>
>> Hi,
>>
>> my use case is the following:
>>
>> I have a Tuple2<String,Long>. I want to group by the String and sum up the
>> Long values accordingly. This works fine with these lines:
>>
>> DataSet<Lineitem> lineitems = getLineitemDataSet(env);
>> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> 1);
>>
>> After the aggregation I want to print the 10 groups with the highest sum,
>> like:
>>
>> string1, 100L
>> string2, 50L
>> string3, 1L
>>
>> I tried that:
>>
>> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
>>
>> But instead of 3 records, I get a lot more.
>>
>> Can see my error?
>>
>> Best regards,
>>
>> Felix
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Very strange behaviour of groupBy() -> sort() -> first()

Fabian Hueske-2
In reply to this post by Chesnay Schepler
Chesnay is right.
Right now, it is not possible to do want you want in a straightforward way
because Flink does not support to fully sort a data set (there are several
related issues in JIRA).

A workaround would be to attach a constant value to each tuple, group on
that (all tuples are sent to the same group), sort that group, and apply
the first operator.

2015-01-21 20:22 GMT+01:00 Chesnay Schepler <[hidden email]>:

> If i remember correctly first() returns the first n values for every
> group. the javadocs actually don't make this behaviour very clear.
>
>
> On 21.01.2015 19:18, Felix Neutatz wrote:
>
>> Hi,
>>
>> my use case is the following:
>>
>> I have a Tuple2<String,Long>. I want to group by the String and sum up the
>> Long values accordingly. This works fine with these lines:
>>
>> DataSet<Lineitem> lineitems = getLineitemDataSet(env);
>> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> 1);
>>
>> After the aggregation I want to print the 10 groups with the highest sum,
>> like:
>>
>> string1, 100L
>> string2, 50L
>> string3, 1L
>>
>> I tried that:
>>
>> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
>>
>> But instead of 3 records, I get a lot more.
>>
>> Can see my error?
>>
>> Best regards,
>>
>> Felix
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Very strange behaviour of groupBy() -> sort() -> first()

Felix Neutatz
Thanks, @Fabian, your workaround works :)

But I think this feature is really missing. Shall we add this functionality
natively or via the proposed lib package?

2015-01-21 20:38 GMT+01:00 Fabian Hueske <[hidden email]>:

> Chesnay is right.
> Right now, it is not possible to do want you want in a straightforward way
> because Flink does not support to fully sort a data set (there are several
> related issues in JIRA).
>
> A workaround would be to attach a constant value to each tuple, group on
> that (all tuples are sent to the same group), sort that group, and apply
> the first operator.
>
> 2015-01-21 20:22 GMT+01:00 Chesnay Schepler <[hidden email]
> >:
>
> > If i remember correctly first() returns the first n values for every
> > group. the javadocs actually don't make this behaviour very clear.
> >
> >
> > On 21.01.2015 19:18, Felix Neutatz wrote:
> >
> >> Hi,
> >>
> >> my use case is the following:
> >>
> >> I have a Tuple2<String,Long>. I want to group by the String and sum up
> the
> >> Long values accordingly. This works fine with these lines:
> >>
> >> DataSet<Lineitem> lineitems = getLineitemDataSet(env);
> >> lineitems.project(new int
> []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
> >> 1);
> >>
> >> After the aggregation I want to print the 10 groups with the highest
> sum,
> >> like:
> >>
> >> string1, 100L
> >> string2, 50L
> >> string3, 1L
> >>
> >> I tried that:
> >>
> >> lineitems.project(new int
> []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
> >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
> >>
> >> But instead of 3 records, I get a lot more.
> >>
> >> Can see my error?
> >>
> >> Best regards,
> >>
> >> Felix
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Very strange behaviour of groupBy() -> sort() -> first()

Fabian Hueske
This should directly go into the API, IMO.
As I said, there are several open JIRAs for this issue.

2015-01-21 22:29 GMT+01:00 Felix Neutatz <[hidden email]>:

> Thanks, @Fabian, your workaround works :)
>
> But I think this feature is really missing. Shall we add this functionality
> natively or via the proposed lib package?
>
> 2015-01-21 20:38 GMT+01:00 Fabian Hueske <[hidden email]>:
>
> > Chesnay is right.
> > Right now, it is not possible to do want you want in a straightforward
> way
> > because Flink does not support to fully sort a data set (there are
> several
> > related issues in JIRA).
> >
> > A workaround would be to attach a constant value to each tuple, group on
> > that (all tuples are sent to the same group), sort that group, and apply
> > the first operator.
> >
> > 2015-01-21 20:22 GMT+01:00 Chesnay Schepler <
> [hidden email]
> > >:
> >
> > > If i remember correctly first() returns the first n values for every
> > > group. the javadocs actually don't make this behaviour very clear.
> > >
> > >
> > > On 21.01.2015 19:18, Felix Neutatz wrote:
> > >
> > >> Hi,
> > >>
> > >> my use case is the following:
> > >>
> > >> I have a Tuple2<String,Long>. I want to group by the String and sum up
> > the
> > >> Long values accordingly. This works fine with these lines:
> > >>
> > >> DataSet<Lineitem> lineitems = getLineitemDataSet(env);
> > >> lineitems.project(new int
> > []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
> > >> 1);
> > >>
> > >> After the aggregation I want to print the 10 groups with the highest
> > sum,
> > >> like:
> > >>
> > >> string1, 100L
> > >> string2, 50L
> > >> string3, 1L
> > >>
> > >> I tried that:
> > >>
> > >> lineitems.project(new int
> > []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
> > >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
> > >>
> > >> But instead of 3 records, I get a lot more.
> > >>
> > >> Can see my error?
> > >>
> > >> Best regards,
> > >>
> > >> Felix
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Very strange behaviour of groupBy() -> sort() -> first()

Fabian Hueske
BTW, I think as well that global sorting is an important feature and
definitely missing in Flink (FLINK-598).
Enabling local sorting for data sinks is one step on the way which can be
rather easily solved (FLINK-1105).

If you would like to contribute to make sorting possible, I would be very
happy to guide you ;-)

Cheers, Fabian

2015-01-21 22:33 GMT+01:00 Fabian Hueske <[hidden email]>:

> This should directly go into the API, IMO.
> As I said, there are several open JIRAs for this issue.
>
> 2015-01-21 22:29 GMT+01:00 Felix Neutatz <[hidden email]>:
>
>> Thanks, @Fabian, your workaround works :)
>>
>> But I think this feature is really missing. Shall we add this
>> functionality
>> natively or via the proposed lib package?
>>
>> 2015-01-21 20:38 GMT+01:00 Fabian Hueske <[hidden email]>:
>>
>> > Chesnay is right.
>> > Right now, it is not possible to do want you want in a straightforward
>> way
>> > because Flink does not support to fully sort a data set (there are
>> several
>> > related issues in JIRA).
>> >
>> > A workaround would be to attach a constant value to each tuple, group on
>> > that (all tuples are sent to the same group), sort that group, and apply
>> > the first operator.
>> >
>> > 2015-01-21 20:22 GMT+01:00 Chesnay Schepler <
>> [hidden email]
>> > >:
>> >
>> > > If i remember correctly first() returns the first n values for every
>> > > group. the javadocs actually don't make this behaviour very clear.
>> > >
>> > >
>> > > On 21.01.2015 19:18, Felix Neutatz wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> my use case is the following:
>> > >>
>> > >> I have a Tuple2<String,Long>. I want to group by the String and sum
>> up
>> > the
>> > >> Long values accordingly. This works fine with these lines:
>> > >>
>> > >> DataSet<Lineitem> lineitems = getLineitemDataSet(env);
>> > >> lineitems.project(new int
>> > []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> > >> 1);
>> > >>
>> > >> After the aggregation I want to print the 10 groups with the highest
>> > sum,
>> > >> like:
>> > >>
>> > >> string1, 100L
>> > >> string2, 50L
>> > >> string3, 1L
>> > >>
>> > >> I tried that:
>> > >>
>> > >> lineitems.project(new int
>> > []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
>> > >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
>> > >>
>> > >> But instead of 3 records, I get a lot more.
>> > >>
>> > >> Can see my error?
>> > >>
>> > >> Best regards,
>> > >>
>> > >> Felix
>> > >>
>> > >>
>> > >
>> >
>>
>
>