streaming GroupBy + Fold

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

streaming GroupBy + Fold

Martin Neumann-2
Hej,

In one of my Programs I run a Fold on a GroupedDataStream. The aim is to
aggregate the values in each group.
It seems the aggregator in the Fold function is shared on operator level,
so all groups that end up on the same operator get mashed together.

Is this the wanted behavior? If so, what do I have to do to separate them?


cheers Martin
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Stephan Ewen
I think these operations were recently moved to the internal state
interface. Did the behavior change then?

@Marton or Gyula, can you comment? Is it per chance not mapped to the
partitioned state?

On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]> wrote:

> Hej,
>
> In one of my Programs I run a Fold on a GroupedDataStream. The aim is to
> aggregate the values in each group.
> It seems the aggregator in the Fold function is shared on operator level,
> so all groups that end up on the same operator get mashed together.
>
> Is this the wanted behavior? If so, what do I have to do to separate them?
>
>
> cheers Martin
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Martin Neumann-2
One of my colleagues found it today when we where hunting bugs today. We
where using the latest 0.10 version pulled from maven this morning.
The program we where testing is new code so I cant tell you if the behavior
has changed or if it was always like this.

On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]> wrote:

> I think these operations were recently moved to the internal state
> interface. Did the behavior change then?
>
> @Marton or Gyula, can you comment? Is it per chance not mapped to the
> partitioned state?
>
> On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]> wrote:
>
> > Hej,
> >
> > In one of my Programs I run a Fold on a GroupedDataStream. The aim is to
> > aggregate the values in each group.
> > It seems the aggregator in the Fold function is shared on operator level,
> > so all groups that end up on the same operator get mashed together.
> >
> > Is this the wanted behavior? If so, what do I have to do to separate
> them?
> >
> >
> > cheers Martin
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Márton Balassi
Hey,

Thanks for reporting the problem, Martin. I have not merged the PR Stephan
is referring to yet. [1] There I am cleaning up some of the internals too.
Just out of curiosity, could you share the code for the failing test please?

[1] https://github.com/apache/flink/pull/1155

On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <[hidden email]> wrote:

> One of my colleagues found it today when we where hunting bugs today. We
> where using the latest 0.10 version pulled from maven this morning.
> The program we where testing is new code so I cant tell you if the behavior
> has changed or if it was always like this.
>
> On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]> wrote:
>
> > I think these operations were recently moved to the internal state
> > interface. Did the behavior change then?
> >
> > @Marton or Gyula, can you comment? Is it per chance not mapped to the
> > partitioned state?
> >
> > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]> wrote:
> >
> > > Hej,
> > >
> > > In one of my Programs I run a Fold on a GroupedDataStream. The aim is
> to
> > > aggregate the values in each group.
> > > It seems the aggregator in the Fold function is shared on operator
> level,
> > > so all groups that end up on the same operator get mashed together.
> > >
> > > Is this the wanted behavior? If so, what do I have to do to separate
> > them?
> > >
> > >
> > > cheers Martin
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Martin Neumann-2
Hej,

Sorry it took so long to respond I needed to check if I was actually allowed to share the code since it uses internal datasets.

In the appendix of this email you will find the main class of this job without the supporting classes or the actual dataset. If you want to run it you need to replace the dataset by something else but that should be trivial.
If you just want to see the problem itself, have a look at the appended log in conjunction with the code. Each ERROR printout in the log relates to an accumulator receiving wrong values.

cheers Martin

On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <[hidden email]> wrote:
Hey,

Thanks for reporting the problem, Martin. I have not merged the PR Stephan
is referring to yet. [1] There I am cleaning up some of the internals too.
Just out of curiosity, could you share the code for the failing test please?

[1] https://github.com/apache/flink/pull/1155

On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <[hidden email]> wrote:

> One of my colleagues found it today when we where hunting bugs today. We
> where using the latest 0.10 version pulled from maven this morning.
> The program we where testing is new code so I cant tell you if the behavior
> has changed or if it was always like this.
>
> On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]> wrote:
>
> > I think these operations were recently moved to the internal state
> > interface. Did the behavior change then?
> >
> > @Marton or Gyula, can you comment? Is it per chance not mapped to the
> > partitioned state?
> >
> > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]> wrote:
> >
> > > Hej,
> > >
> > > In one of my Programs I run a Fold on a GroupedDataStream. The aim is
> to
> > > aggregate the values in each group.
> > > It seems the aggregator in the Fold function is shared on operator
> level,
> > > so all groups that end up on the same operator get mashed together.
> > >
> > > Is this the wanted behavior? If so, what do I have to do to separate
> > them?
> > >
> > >
> > > cheers Martin
> > >
> >
>


TimeShiftTest.java (4K) Download Attachment
TimeShiftTest.log (70K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Márton Balassi
Thanks, I am checking it out tomorrow morning.

On Mon, Oct 5, 2015 at 9:59 PM, Martin Neumann <[hidden email]> wrote:

> Hej,
>
> Sorry it took so long to respond I needed to check if I was actually
> allowed to share the code since it uses internal datasets.
>
> In the appendix of this email you will find the main class of this job
> without the supporting classes or the actual dataset. If you want to run it
> you need to replace the dataset by something else but that should be
> trivial.
> If you just want to see the problem itself, have a look at the appended
> log in conjunction with the code. Each ERROR printout in the log relates to
> an accumulator receiving wrong values.
>
> cheers Martin
>
> On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <[hidden email]>
> wrote:
>
>> Hey,
>>
>> Thanks for reporting the problem, Martin. I have not merged the PR Stephan
>> is referring to yet. [1] There I am cleaning up some of the internals too.
>> Just out of curiosity, could you share the code for the failing test
>> please?
>>
>> [1] https://github.com/apache/flink/pull/1155
>>
>> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <[hidden email]> wrote:
>>
>> > One of my colleagues found it today when we where hunting bugs today. We
>> > where using the latest 0.10 version pulled from maven this morning.
>> > The program we where testing is new code so I cant tell you if the
>> behavior
>> > has changed or if it was always like this.
>> >
>> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]> wrote:
>> >
>> > > I think these operations were recently moved to the internal state
>> > > interface. Did the behavior change then?
>> > >
>> > > @Marton or Gyula, can you comment? Is it per chance not mapped to the
>> > > partitioned state?
>> > >
>> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]>
>> wrote:
>> > >
>> > > > Hej,
>> > > >
>> > > > In one of my Programs I run a Fold on a GroupedDataStream. The aim
>> is
>> > to
>> > > > aggregate the values in each group.
>> > > > It seems the aggregator in the Fold function is shared on operator
>> > level,
>> > > > so all groups that end up on the same operator get mashed together.
>> > > >
>> > > > Is this the wanted behavior? If so, what do I have to do to separate
>> > > them?
>> > > >
>> > > >
>> > > > cheers Martin
>> > > >
>> > >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Márton Balassi
Martin, I have looked at your code and you are running a fold in a window,
that is a very important distinction - the code paths are separate.
Those code paths have been recently touched by Aljoscha if I am not
mistaken.

I have mocked up a simple example and could not reproduce your problem
unfortunately. [1] Could you maybe produce a minimalistic example that we
can actually execute? :)

[1]
https://github.com/mbalassi/flink/commit/9f1f02d05e2bc2043a8f514d39fbf7753ea7058d

On Mon, Oct 5, 2015 at 10:06 PM, Márton Balassi <[hidden email]>
wrote:

> Thanks, I am checking it out tomorrow morning.
>
> On Mon, Oct 5, 2015 at 9:59 PM, Martin Neumann <[hidden email]> wrote:
>
>> Hej,
>>
>> Sorry it took so long to respond I needed to check if I was actually
>> allowed to share the code since it uses internal datasets.
>>
>> In the appendix of this email you will find the main class of this job
>> without the supporting classes or the actual dataset. If you want to run it
>> you need to replace the dataset by something else but that should be
>> trivial.
>> If you just want to see the problem itself, have a look at the appended
>> log in conjunction with the code. Each ERROR printout in the log relates to
>> an accumulator receiving wrong values.
>>
>> cheers Martin
>>
>> On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <[hidden email]
>> > wrote:
>>
>>> Hey,
>>>
>>> Thanks for reporting the problem, Martin. I have not merged the PR
>>> Stephan
>>> is referring to yet. [1] There I am cleaning up some of the internals
>>> too.
>>> Just out of curiosity, could you share the code for the failing test
>>> please?
>>>
>>> [1] https://github.com/apache/flink/pull/1155
>>>
>>> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <[hidden email]> wrote:
>>>
>>> > One of my colleagues found it today when we where hunting bugs today.
>>> We
>>> > where using the latest 0.10 version pulled from maven this morning.
>>> > The program we where testing is new code so I cant tell you if the
>>> behavior
>>> > has changed or if it was always like this.
>>> >
>>> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]> wrote:
>>> >
>>> > > I think these operations were recently moved to the internal state
>>> > > interface. Did the behavior change then?
>>> > >
>>> > > @Marton or Gyula, can you comment? Is it per chance not mapped to the
>>> > > partitioned state?
>>> > >
>>> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]>
>>> wrote:
>>> > >
>>> > > > Hej,
>>> > > >
>>> > > > In one of my Programs I run a Fold on a GroupedDataStream. The aim
>>> is
>>> > to
>>> > > > aggregate the values in each group.
>>> > > > It seems the aggregator in the Fold function is shared on operator
>>> > level,
>>> > > > so all groups that end up on the same operator get mashed together.
>>> > > >
>>> > > > Is this the wanted behavior? If so, what do I have to do to
>>> separate
>>> > > them?
>>> > > >
>>> > > >
>>> > > > cheers Martin
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Aljoscha Krettek-2
Hi,
If you are using a fold you are using none of the new code paths. I will
add support for Fold to the new windowing implementation today, though.

Cheers,
Aljoscha

On Mon, 5 Oct 2015 at 23:49 Márton Balassi <[hidden email]> wrote:

> Martin, I have looked at your code and you are running a fold in a window,
> that is a very important distinction - the code paths are separate.
> Those code paths have been recently touched by Aljoscha if I am not
> mistaken.
>
> I have mocked up a simple example and could not reproduce your problem
> unfortunately. [1] Could you maybe produce a minimalistic example that we
> can actually execute? :)
>
> [1]
>
> https://github.com/mbalassi/flink/commit/9f1f02d05e2bc2043a8f514d39fbf7753ea7058d
>
> On Mon, Oct 5, 2015 at 10:06 PM, Márton Balassi <[hidden email]>
> wrote:
>
> > Thanks, I am checking it out tomorrow morning.
> >
> > On Mon, Oct 5, 2015 at 9:59 PM, Martin Neumann <[hidden email]> wrote:
> >
> >> Hej,
> >>
> >> Sorry it took so long to respond I needed to check if I was actually
> >> allowed to share the code since it uses internal datasets.
> >>
> >> In the appendix of this email you will find the main class of this job
> >> without the supporting classes or the actual dataset. If you want to
> run it
> >> you need to replace the dataset by something else but that should be
> >> trivial.
> >> If you just want to see the problem itself, have a look at the appended
> >> log in conjunction with the code. Each ERROR printout in the log
> relates to
> >> an accumulator receiving wrong values.
> >>
> >> cheers Martin
> >>
> >> On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <
> [hidden email]
> >> > wrote:
> >>
> >>> Hey,
> >>>
> >>> Thanks for reporting the problem, Martin. I have not merged the PR
> >>> Stephan
> >>> is referring to yet. [1] There I am cleaning up some of the internals
> >>> too.
> >>> Just out of curiosity, could you share the code for the failing test
> >>> please?
> >>>
> >>> [1] https://github.com/apache/flink/pull/1155
> >>>
> >>> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <[hidden email]>
> wrote:
> >>>
> >>> > One of my colleagues found it today when we where hunting bugs today.
> >>> We
> >>> > where using the latest 0.10 version pulled from maven this morning.
> >>> > The program we where testing is new code so I cant tell you if the
> >>> behavior
> >>> > has changed or if it was always like this.
> >>> >
> >>> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]>
> wrote:
> >>> >
> >>> > > I think these operations were recently moved to the internal state
> >>> > > interface. Did the behavior change then?
> >>> > >
> >>> > > @Marton or Gyula, can you comment? Is it per chance not mapped to
> the
> >>> > > partitioned state?
> >>> > >
> >>> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]>
> >>> wrote:
> >>> > >
> >>> > > > Hej,
> >>> > > >
> >>> > > > In one of my Programs I run a Fold on a GroupedDataStream. The
> aim
> >>> is
> >>> > to
> >>> > > > aggregate the values in each group.
> >>> > > > It seems the aggregator in the Fold function is shared on
> operator
> >>> > level,
> >>> > > > so all groups that end up on the same operator get mashed
> together.
> >>> > > >
> >>> > > > Is this the wanted behavior? If so, what do I have to do to
> >>> separate
> >>> > > them?
> >>> > > >
> >>> > > >
> >>> > > > cheers Martin
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Martin Neumann-2
The window is actually part of the workaround we currently using (should
have commented it out) where we use a window and a MapFunction instead of a
Fold.
Original I was running fold without a window facing the same problems.

The workaround works for now so there is no urgency on that one. I just
wanted to make sure I was not doing something stupid and it was a bug that
you guys where  aware of.

cheers Martin

On Tue, Oct 6, 2015 at 8:09 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> If you are using a fold you are using none of the new code paths. I will
> add support for Fold to the new windowing implementation today, though.
>
> Cheers,
> Aljoscha
>
> On Mon, 5 Oct 2015 at 23:49 Márton Balassi <[hidden email]>
> wrote:
>
> > Martin, I have looked at your code and you are running a fold in a
> window,
> > that is a very important distinction - the code paths are separate.
> > Those code paths have been recently touched by Aljoscha if I am not
> > mistaken.
> >
> > I have mocked up a simple example and could not reproduce your problem
> > unfortunately. [1] Could you maybe produce a minimalistic example that we
> > can actually execute? :)
> >
> > [1]
> >
> >
> https://github.com/mbalassi/flink/commit/9f1f02d05e2bc2043a8f514d39fbf7753ea7058d
> >
> > On Mon, Oct 5, 2015 at 10:06 PM, Márton Balassi <
> [hidden email]>
> > wrote:
> >
> > > Thanks, I am checking it out tomorrow morning.
> > >
> > > On Mon, Oct 5, 2015 at 9:59 PM, Martin Neumann <[hidden email]>
> wrote:
> > >
> > >> Hej,
> > >>
> > >> Sorry it took so long to respond I needed to check if I was actually
> > >> allowed to share the code since it uses internal datasets.
> > >>
> > >> In the appendix of this email you will find the main class of this job
> > >> without the supporting classes or the actual dataset. If you want to
> > run it
> > >> you need to replace the dataset by something else but that should be
> > >> trivial.
> > >> If you just want to see the problem itself, have a look at the
> appended
> > >> log in conjunction with the code. Each ERROR printout in the log
> > relates to
> > >> an accumulator receiving wrong values.
> > >>
> > >> cheers Martin
> > >>
> > >> On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <
> > [hidden email]
> > >> > wrote:
> > >>
> > >>> Hey,
> > >>>
> > >>> Thanks for reporting the problem, Martin. I have not merged the PR
> > >>> Stephan
> > >>> is referring to yet. [1] There I am cleaning up some of the internals
> > >>> too.
> > >>> Just out of curiosity, could you share the code for the failing test
> > >>> please?
> > >>>
> > >>> [1] https://github.com/apache/flink/pull/1155
> > >>>
> > >>> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <[hidden email]>
> > wrote:
> > >>>
> > >>> > One of my colleagues found it today when we where hunting bugs
> today.
> > >>> We
> > >>> > where using the latest 0.10 version pulled from maven this morning.
> > >>> > The program we where testing is new code so I cant tell you if the
> > >>> behavior
> > >>> > has changed or if it was always like this.
> > >>> >
> > >>> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]>
> > wrote:
> > >>> >
> > >>> > > I think these operations were recently moved to the internal
> state
> > >>> > > interface. Did the behavior change then?
> > >>> > >
> > >>> > > @Marton or Gyula, can you comment? Is it per chance not mapped to
> > the
> > >>> > > partitioned state?
> > >>> > >
> > >>> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]
> >
> > >>> wrote:
> > >>> > >
> > >>> > > > Hej,
> > >>> > > >
> > >>> > > > In one of my Programs I run a Fold on a GroupedDataStream. The
> > aim
> > >>> is
> > >>> > to
> > >>> > > > aggregate the values in each group.
> > >>> > > > It seems the aggregator in the Fold function is shared on
> > operator
> > >>> > level,
> > >>> > > > so all groups that end up on the same operator get mashed
> > together.
> > >>> > > >
> > >>> > > > Is this the wanted behavior? If so, what do I have to do to
> > >>> separate
> > >>> > > them?
> > >>> > > >
> > >>> > > >
> > >>> > > > cheers Martin
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Aljoscha Krettek-2
Hi,
I ran it using the attached TimeShift.java and I didn't get any key cross-talk. Could you please try my example, or verify that the problem still persists on your side?

I replaced the source by a source that just creates random strings.



On Tue, 6 Oct 2015 at 09:56 Martin Neumann <[hidden email]> wrote:
The window is actually part of the workaround we currently using (should
have commented it out) where we use a window and a MapFunction instead of a
Fold.
Original I was running fold without a window facing the same problems.

The workaround works for now so there is no urgency on that one. I just
wanted to make sure I was not doing something stupid and it was a bug that
you guys where  aware of.

cheers Martin

On Tue, Oct 6, 2015 at 8:09 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> If you are using a fold you are using none of the new code paths. I will
> add support for Fold to the new windowing implementation today, though.
>
> Cheers,
> Aljoscha
>
> On Mon, 5 Oct 2015 at 23:49 Márton Balassi <[hidden email]>
> wrote:
>
> > Martin, I have looked at your code and you are running a fold in a
> window,
> > that is a very important distinction - the code paths are separate.
> > Those code paths have been recently touched by Aljoscha if I am not
> > mistaken.
> >
> > I have mocked up a simple example and could not reproduce your problem
> > unfortunately. [1] Could you maybe produce a minimalistic example that we
> > can actually execute? :)
> >
> > [1]
> >
> >
> https://github.com/mbalassi/flink/commit/9f1f02d05e2bc2043a8f514d39fbf7753ea7058d
> >
> > On Mon, Oct 5, 2015 at 10:06 PM, Márton Balassi <
> [hidden email]>
> > wrote:
> >
> > > Thanks, I am checking it out tomorrow morning.
> > >
> > > On Mon, Oct 5, 2015 at 9:59 PM, Martin Neumann <[hidden email]>
> wrote:
> > >
> > >> Hej,
> > >>
> > >> Sorry it took so long to respond I needed to check if I was actually
> > >> allowed to share the code since it uses internal datasets.
> > >>
> > >> In the appendix of this email you will find the main class of this job
> > >> without the supporting classes or the actual dataset. If you want to
> > run it
> > >> you need to replace the dataset by something else but that should be
> > >> trivial.
> > >> If you just want to see the problem itself, have a look at the
> appended
> > >> log in conjunction with the code. Each ERROR printout in the log
> > relates to
> > >> an accumulator receiving wrong values.
> > >>
> > >> cheers Martin
> > >>
> > >> On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <
> > [hidden email]
> > >> > wrote:
> > >>
> > >>> Hey,
> > >>>
> > >>> Thanks for reporting the problem, Martin. I have not merged the PR
> > >>> Stephan
> > >>> is referring to yet. [1] There I am cleaning up some of the internals
> > >>> too.
> > >>> Just out of curiosity, could you share the code for the failing test
> > >>> please?
> > >>>
> > >>> [1] https://github.com/apache/flink/pull/1155
> > >>>
> > >>> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <[hidden email]>
> > wrote:
> > >>>
> > >>> > One of my colleagues found it today when we where hunting bugs
> today.
> > >>> We
> > >>> > where using the latest 0.10 version pulled from maven this morning.
> > >>> > The program we where testing is new code so I cant tell you if the
> > >>> behavior
> > >>> > has changed or if it was always like this.
> > >>> >
> > >>> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]>
> > wrote:
> > >>> >
> > >>> > > I think these operations were recently moved to the internal
> state
> > >>> > > interface. Did the behavior change then?
> > >>> > >
> > >>> > > @Marton or Gyula, can you comment? Is it per chance not mapped to
> > the
> > >>> > > partitioned state?
> > >>> > >
> > >>> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <[hidden email]
> >
> > >>> wrote:
> > >>> > >
> > >>> > > > Hej,
> > >>> > > >
> > >>> > > > In one of my Programs I run a Fold on a GroupedDataStream. The
> > aim
> > >>> is
> > >>> > to
> > >>> > > > aggregate the values in each group.
> > >>> > > > It seems the aggregator in the Fold function is shared on
> > operator
> > >>> > level,
> > >>> > > > so all groups that end up on the same operator get mashed
> > together.
> > >>> > > >
> > >>> > > > Is this the wanted behavior? If so, what do I have to do to
> > >>> separate
> > >>> > > them?
> > >>> > > >
> > >>> > > >
> > >>> > > > cheers Martin
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Martin Neumann-2
Hej,

I checked the last Flink trunk version together with Aljoscha and the
problems are gone by now. (Just to close this discussion thread now)

cheers Martin

On Wed, Oct 7, 2015 at 1:21 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> I ran it using the attached TimeShift.java and I didn't get any key
> cross-talk. Could you please try my example, or verify that the problem
> still persists on your side?
>
> I replaced the source by a source that just creates random strings.
>
>
>
> On Tue, 6 Oct 2015 at 09:56 Martin Neumann <[hidden email]> wrote:
>
>> The window is actually part of the workaround we currently using (should
>> have commented it out) where we use a window and a MapFunction instead of
>> a
>> Fold.
>> Original I was running fold without a window facing the same problems.
>>
>> The workaround works for now so there is no urgency on that one. I just
>> wanted to make sure I was not doing something stupid and it was a bug that
>> you guys where  aware of.
>>
>> cheers Martin
>>
>> On Tue, Oct 6, 2015 at 8:09 AM, Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>> > Hi,
>> > If you are using a fold you are using none of the new code paths. I will
>> > add support for Fold to the new windowing implementation today, though.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Mon, 5 Oct 2015 at 23:49 Márton Balassi <[hidden email]>
>> > wrote:
>> >
>> > > Martin, I have looked at your code and you are running a fold in a
>> > window,
>> > > that is a very important distinction - the code paths are separate.
>> > > Those code paths have been recently touched by Aljoscha if I am not
>> > > mistaken.
>> > >
>> > > I have mocked up a simple example and could not reproduce your problem
>> > > unfortunately. [1] Could you maybe produce a minimalistic example
>> that we
>> > > can actually execute? :)
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://github.com/mbalassi/flink/commit/9f1f02d05e2bc2043a8f514d39fbf7753ea7058d
>> > >
>> > > On Mon, Oct 5, 2015 at 10:06 PM, Márton Balassi <
>> > [hidden email]>
>> > > wrote:
>> > >
>> > > > Thanks, I am checking it out tomorrow morning.
>> > > >
>> > > > On Mon, Oct 5, 2015 at 9:59 PM, Martin Neumann <[hidden email]>
>> > wrote:
>> > > >
>> > > >> Hej,
>> > > >>
>> > > >> Sorry it took so long to respond I needed to check if I was
>> actually
>> > > >> allowed to share the code since it uses internal datasets.
>> > > >>
>> > > >> In the appendix of this email you will find the main class of this
>> job
>> > > >> without the supporting classes or the actual dataset. If you want
>> to
>> > > run it
>> > > >> you need to replace the dataset by something else but that should
>> be
>> > > >> trivial.
>> > > >> If you just want to see the problem itself, have a look at the
>> > appended
>> > > >> log in conjunction with the code. Each ERROR printout in the log
>> > > relates to
>> > > >> an accumulator receiving wrong values.
>> > > >>
>> > > >> cheers Martin
>> > > >>
>> > > >> On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <
>> > > [hidden email]
>> > > >> > wrote:
>> > > >>
>> > > >>> Hey,
>> > > >>>
>> > > >>> Thanks for reporting the problem, Martin. I have not merged the PR
>> > > >>> Stephan
>> > > >>> is referring to yet. [1] There I am cleaning up some of the
>> internals
>> > > >>> too.
>> > > >>> Just out of curiosity, could you share the code for the failing
>> test
>> > > >>> please?
>> > > >>>
>> > > >>> [1] https://github.com/apache/flink/pull/1155
>> > > >>>
>> > > >>> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <[hidden email]>
>> > > wrote:
>> > > >>>
>> > > >>> > One of my colleagues found it today when we where hunting bugs
>> > today.
>> > > >>> We
>> > > >>> > where using the latest 0.10 version pulled from maven this
>> morning.
>> > > >>> > The program we where testing is new code so I cant tell you if
>> the
>> > > >>> behavior
>> > > >>> > has changed or if it was always like this.
>> > > >>> >
>> > > >>> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <[hidden email]>
>> > > wrote:
>> > > >>> >
>> > > >>> > > I think these operations were recently moved to the internal
>> > state
>> > > >>> > > interface. Did the behavior change then?
>> > > >>> > >
>> > > >>> > > @Marton or Gyula, can you comment? Is it per chance not
>> mapped to
>> > > the
>> > > >>> > > partitioned state?
>> > > >>> > >
>> > > >>> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <
>> [hidden email]
>> > >
>> > > >>> wrote:
>> > > >>> > >
>> > > >>> > > > Hej,
>> > > >>> > > >
>> > > >>> > > > In one of my Programs I run a Fold on a GroupedDataStream.
>> The
>> > > aim
>> > > >>> is
>> > > >>> > to
>> > > >>> > > > aggregate the values in each group.
>> > > >>> > > > It seems the aggregator in the Fold function is shared on
>> > > operator
>> > > >>> > level,
>> > > >>> > > > so all groups that end up on the same operator get mashed
>> > > together.
>> > > >>> > > >
>> > > >>> > > > Is this the wanted behavior? If so, what do I have to do to
>> > > >>> separate
>> > > >>> > > them?
>> > > >>> > > >
>> > > >>> > > >
>> > > >>> > > > cheers Martin
>> > > >>> > > >
>> > > >>> > >
>> > > >>> >
>> > > >>>
>> > > >>
>> > > >>
>> > > >
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: streaming GroupBy + Fold

Márton Balassi
Thanks for the update.

On Wed, Oct 14, 2015 at 10:12 AM, Martin Neumann <[hidden email]> wrote:

> Hej,
>
> I checked the last Flink trunk version together with Aljoscha and the
> problems are gone by now. (Just to close this discussion thread now)
>
> cheers Martin
>
> On Wed, Oct 7, 2015 at 1:21 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > I ran it using the attached TimeShift.java and I didn't get any key
> > cross-talk. Could you please try my example, or verify that the problem
> > still persists on your side?
> >
> > I replaced the source by a source that just creates random strings.
> >
> >
> >
> > On Tue, 6 Oct 2015 at 09:56 Martin Neumann <[hidden email]> wrote:
> >
> >> The window is actually part of the workaround we currently using (should
> >> have commented it out) where we use a window and a MapFunction instead
> of
> >> a
> >> Fold.
> >> Original I was running fold without a window facing the same problems.
> >>
> >> The workaround works for now so there is no urgency on that one. I just
> >> wanted to make sure I was not doing something stupid and it was a bug
> that
> >> you guys where  aware of.
> >>
> >> cheers Martin
> >>
> >> On Tue, Oct 6, 2015 at 8:09 AM, Aljoscha Krettek <[hidden email]>
> >> wrote:
> >>
> >> > Hi,
> >> > If you are using a fold you are using none of the new code paths. I
> will
> >> > add support for Fold to the new windowing implementation today,
> though.
> >> >
> >> > Cheers,
> >> > Aljoscha
> >> >
> >> > On Mon, 5 Oct 2015 at 23:49 Márton Balassi <[hidden email]>
> >> > wrote:
> >> >
> >> > > Martin, I have looked at your code and you are running a fold in a
> >> > window,
> >> > > that is a very important distinction - the code paths are separate.
> >> > > Those code paths have been recently touched by Aljoscha if I am not
> >> > > mistaken.
> >> > >
> >> > > I have mocked up a simple example and could not reproduce your
> problem
> >> > > unfortunately. [1] Could you maybe produce a minimalistic example
> >> that we
> >> > > can actually execute? :)
> >> > >
> >> > > [1]
> >> > >
> >> > >
> >> >
> >>
> https://github.com/mbalassi/flink/commit/9f1f02d05e2bc2043a8f514d39fbf7753ea7058d
> >> > >
> >> > > On Mon, Oct 5, 2015 at 10:06 PM, Márton Balassi <
> >> > [hidden email]>
> >> > > wrote:
> >> > >
> >> > > > Thanks, I am checking it out tomorrow morning.
> >> > > >
> >> > > > On Mon, Oct 5, 2015 at 9:59 PM, Martin Neumann <[hidden email]>
> >> > wrote:
> >> > > >
> >> > > >> Hej,
> >> > > >>
> >> > > >> Sorry it took so long to respond I needed to check if I was
> >> actually
> >> > > >> allowed to share the code since it uses internal datasets.
> >> > > >>
> >> > > >> In the appendix of this email you will find the main class of
> this
> >> job
> >> > > >> without the supporting classes or the actual dataset. If you want
> >> to
> >> > > run it
> >> > > >> you need to replace the dataset by something else but that should
> >> be
> >> > > >> trivial.
> >> > > >> If you just want to see the problem itself, have a look at the
> >> > appended
> >> > > >> log in conjunction with the code. Each ERROR printout in the log
> >> > > relates to
> >> > > >> an accumulator receiving wrong values.
> >> > > >>
> >> > > >> cheers Martin
> >> > > >>
> >> > > >> On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <
> >> > > [hidden email]
> >> > > >> > wrote:
> >> > > >>
> >> > > >>> Hey,
> >> > > >>>
> >> > > >>> Thanks for reporting the problem, Martin. I have not merged the
> PR
> >> > > >>> Stephan
> >> > > >>> is referring to yet. [1] There I am cleaning up some of the
> >> internals
> >> > > >>> too.
> >> > > >>> Just out of curiosity, could you share the code for the failing
> >> test
> >> > > >>> please?
> >> > > >>>
> >> > > >>> [1] https://github.com/apache/flink/pull/1155
> >> > > >>>
> >> > > >>> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <
> [hidden email]>
> >> > > wrote:
> >> > > >>>
> >> > > >>> > One of my colleagues found it today when we where hunting bugs
> >> > today.
> >> > > >>> We
> >> > > >>> > where using the latest 0.10 version pulled from maven this
> >> morning.
> >> > > >>> > The program we where testing is new code so I cant tell you if
> >> the
> >> > > >>> behavior
> >> > > >>> > has changed or if it was always like this.
> >> > > >>> >
> >> > > >>> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <
> [hidden email]>
> >> > > wrote:
> >> > > >>> >
> >> > > >>> > > I think these operations were recently moved to the internal
> >> > state
> >> > > >>> > > interface. Did the behavior change then?
> >> > > >>> > >
> >> > > >>> > > @Marton or Gyula, can you comment? Is it per chance not
> >> mapped to
> >> > > the
> >> > > >>> > > partitioned state?
> >> > > >>> > >
> >> > > >>> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <
> >> [hidden email]
> >> > >
> >> > > >>> wrote:
> >> > > >>> > >
> >> > > >>> > > > Hej,
> >> > > >>> > > >
> >> > > >>> > > > In one of my Programs I run a Fold on a GroupedDataStream.
> >> The
> >> > > aim
> >> > > >>> is
> >> > > >>> > to
> >> > > >>> > > > aggregate the values in each group.
> >> > > >>> > > > It seems the aggregator in the Fold function is shared on
> >> > > operator
> >> > > >>> > level,
> >> > > >>> > > > so all groups that end up on the same operator get mashed
> >> > > together.
> >> > > >>> > > >
> >> > > >>> > > > Is this the wanted behavior? If so, what do I have to do
> to
> >> > > >>> separate
> >> > > >>> > > them?
> >> > > >>> > > >
> >> > > >>> > > >
> >> > > >>> > > > cheers Martin
> >> > > >>> > > >
> >> > > >>> > >
> >> > > >>> >
> >> > > >>>
> >> > > >>
> >> > > >>
> >> > > >
> >> > >
> >> >
> >>
> >
>