Thoughts About Streaming

classic Classic list List threaded Threaded
38 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Thoughts About Streaming

Aljoscha Krettek-2
Hi,
with people proposing changes to the streaming part I also wanted to throw
my hat into the ring. :D

During the last few months, while I was getting acquainted with the
streaming system, I wrote down some thoughts I had about how things could
be improved. Hopefully, they are in somewhat coherent shape now, so please
have a look if you are interested in this:
https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing

This mostly covers:
 - Timestamps assigned at sources
 - Out-of-order processing of elements in window operators
 - API design

Please let me know what you think. Comment in the document or here in the
mailing list.

I have a PR in the makings that would introduce source timestamps and
watermarks for keeping track of them. I also hacked a proof-of-concept of a
windowing system that is able to process out-of-order elements using a
FlatMap operator. (It uses panes to perform efficient pre-aggregations.)

Cheers,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Gyula Fóra
Hi Aljoscha,

Thanks for the nice summary, this is a very good initiative.

I added some comments to the respective sections (where I didnt fully agree
:).).
At some point I think it would be good to have a public hangout session on
this, which could make a more dynamic discussion.

Cheers,
Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. 22.,
H, 21:34):

> Hi,
> with people proposing changes to the streaming part I also wanted to throw
> my hat into the ring. :D
>
> During the last few months, while I was getting acquainted with the
> streaming system, I wrote down some thoughts I had about how things could
> be improved. Hopefully, they are in somewhat coherent shape now, so please
> have a look if you are interested in this:
>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>
> This mostly covers:
>  - Timestamps assigned at sources
>  - Out-of-order processing of elements in window operators
>  - API design
>
> Please let me know what you think. Comment in the document or here in the
> mailing list.
>
> I have a PR in the makings that would introduce source timestamps and
> watermarks for keeping track of them. I also hacked a proof-of-concept of a
> windowing system that is able to process out-of-order elements using a
> FlatMap operator. (It uses panes to perform efficient pre-aggregations.)
>
> Cheers,
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Aljoscha Krettek-2
The reason I posted this now is that we need to think about the API and
windowing before proceeding with the PRs of Gabor (inverse reduce) and
Gyula (removal of "aggregate" functions on DataStream).

For the windowing, I think that the current model does not work for
out-of-order processing. Therefore, the whole windowing infrastructure will
basically have to be redone. Meaning also that any work on the
pre-aggregators or optimizations that we do now becomes useless.

For the API, I proposed to restructure the interactions between all the
different *DataStream classes and grouping/windowing. (See API section of
the doc I posted.)

On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote:

> Hi Aljoscha,
>
> Thanks for the nice summary, this is a very good initiative.
>
> I added some comments to the respective sections (where I didnt fully agree
> :).).
> At some point I think it would be good to have a public hangout session on
> this, which could make a more dynamic discussion.
>
> Cheers,
> Gyula
>
> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. 22.,
> H, 21:34):
>
> > Hi,
> > with people proposing changes to the streaming part I also wanted to
> throw
> > my hat into the ring. :D
> >
> > During the last few months, while I was getting acquainted with the
> > streaming system, I wrote down some thoughts I had about how things could
> > be improved. Hopefully, they are in somewhat coherent shape now, so
> please
> > have a look if you are interested in this:
> >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> >
> > This mostly covers:
> >  - Timestamps assigned at sources
> >  - Out-of-order processing of elements in window operators
> >  - API design
> >
> > Please let me know what you think. Comment in the document or here in the
> > mailing list.
> >
> > I have a PR in the makings that would introduce source timestamps and
> > watermarks for keeping track of them. I also hacked a proof-of-concept
> of a
> > windowing system that is able to process out-of-order elements using a
> > FlatMap operator. (It uses panes to perform efficient pre-aggregations.)
> >
> > Cheers,
> > Aljoscha
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Stephan Ewen
For the windowing designs, we should also have in mind what requirements we
have on the way we keep/store the elements (in external stores, Flink
managed memory, ...)

On Tue, Jun 23, 2015 at 9:55 AM, Aljoscha Krettek <[hidden email]>
wrote:

> The reason I posted this now is that we need to think about the API and
> windowing before proceeding with the PRs of Gabor (inverse reduce) and
> Gyula (removal of "aggregate" functions on DataStream).
>
> For the windowing, I think that the current model does not work for
> out-of-order processing. Therefore, the whole windowing infrastructure will
> basically have to be redone. Meaning also that any work on the
> pre-aggregators or optimizations that we do now becomes useless.
>
> For the API, I proposed to restructure the interactions between all the
> different *DataStream classes and grouping/windowing. (See API section of
> the doc I posted.)
>
> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote:
>
> > Hi Aljoscha,
> >
> > Thanks for the nice summary, this is a very good initiative.
> >
> > I added some comments to the respective sections (where I didnt fully
> agree
> > :).).
> > At some point I think it would be good to have a public hangout session
> on
> > this, which could make a more dynamic discussion.
> >
> > Cheers,
> > Gyula
> >
> > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún.
> 22.,
> > H, 21:34):
> >
> > > Hi,
> > > with people proposing changes to the streaming part I also wanted to
> > throw
> > > my hat into the ring. :D
> > >
> > > During the last few months, while I was getting acquainted with the
> > > streaming system, I wrote down some thoughts I had about how things
> could
> > > be improved. Hopefully, they are in somewhat coherent shape now, so
> > please
> > > have a look if you are interested in this:
> > >
> > >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > >
> > > This mostly covers:
> > >  - Timestamps assigned at sources
> > >  - Out-of-order processing of elements in window operators
> > >  - API design
> > >
> > > Please let me know what you think. Comment in the document or here in
> the
> > > mailing list.
> > >
> > > I have a PR in the makings that would introduce source timestamps and
> > > watermarks for keeping track of them. I also hacked a proof-of-concept
> > of a
> > > windowing system that is able to process out-of-order elements using a
> > > FlatMap operator. (It uses panes to perform efficient
> pre-aggregations.)
> > >
> > > Cheers,
> > > Aljoscha
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Gyula Fóra
In reply to this post by Aljoscha Krettek-2
Hey

I think we should not block PRs unnecessarily if your suggested changes
might touch them at some point.

Also I still think we should not put everything in the Datastream because
it will be a huge mess.

Also we need to agree on the out of order processing, whether we want it
the way you proposed it(which is quite costly). Another alternative
approach there which fits in the current windowing is to filter out if
order events and apply a special handling operator on them. This would be
fairly lightweight.

My point is that we need to consider some alternative solutions. And we
should not block contributions along the way.

Cheers
Gyula

On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <[hidden email]>
wrote:

> The reason I posted this now is that we need to think about the API and
> windowing before proceeding with the PRs of Gabor (inverse reduce) and
> Gyula (removal of "aggregate" functions on DataStream).
>
> For the windowing, I think that the current model does not work for
> out-of-order processing. Therefore, the whole windowing infrastructure will
> basically have to be redone. Meaning also that any work on the
> pre-aggregators or optimizations that we do now becomes useless.
>
> For the API, I proposed to restructure the interactions between all the
> different *DataStream classes and grouping/windowing. (See API section of
> the doc I posted.)
>
> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote:
>
> > Hi Aljoscha,
> >
> > Thanks for the nice summary, this is a very good initiative.
> >
> > I added some comments to the respective sections (where I didnt fully
> agree
> > :).).
> > At some point I think it would be good to have a public hangout session
> on
> > this, which could make a more dynamic discussion.
> >
> > Cheers,
> > Gyula
> >
> > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún.
> 22.,
> > H, 21:34):
> >
> > > Hi,
> > > with people proposing changes to the streaming part I also wanted to
> > throw
> > > my hat into the ring. :D
> > >
> > > During the last few months, while I was getting acquainted with the
> > > streaming system, I wrote down some thoughts I had about how things
> could
> > > be improved. Hopefully, they are in somewhat coherent shape now, so
> > please
> > > have a look if you are interested in this:
> > >
> > >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > >
> > > This mostly covers:
> > >  - Timestamps assigned at sources
> > >  - Out-of-order processing of elements in window operators
> > >  - API design
> > >
> > > Please let me know what you think. Comment in the document or here in
> the
> > > mailing list.
> > >
> > > I have a PR in the makings that would introduce source timestamps and
> > > watermarks for keeping track of them. I also hacked a proof-of-concept
> > of a
> > > windowing system that is able to process out-of-order elements using a
> > > FlatMap operator. (It uses panes to perform efficient
> pre-aggregations.)
> > >
> > > Cheers,
> > > Aljoscha
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Matthias J. Sax
I agree that there should be multiple alternatives the user(!) can
choose from. Partial out-of-order processing works for many/most
aggregates. However, if you consider Event-Pattern-Matching, global
ordering in necessary (even if the performance penalty might be high).

I would also keep "system-time windows" as an alternative to "source
assigned ts-windows".

It might also be interesting to consider the following paper for
overlapping windows: "Resource sharing in continuous sliding-window
aggregates"

> https://dl.acm.org/citation.cfm?id=1316720

-Matthias

On 06/23/2015 10:37 AM, Gyula Fóra wrote:

> Hey
>
> I think we should not block PRs unnecessarily if your suggested changes
> might touch them at some point.
>
> Also I still think we should not put everything in the Datastream because
> it will be a huge mess.
>
> Also we need to agree on the out of order processing, whether we want it
> the way you proposed it(which is quite costly). Another alternative
> approach there which fits in the current windowing is to filter out if
> order events and apply a special handling operator on them. This would be
> fairly lightweight.
>
> My point is that we need to consider some alternative solutions. And we
> should not block contributions along the way.
>
> Cheers
> Gyula
>
> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> The reason I posted this now is that we need to think about the API and
>> windowing before proceeding with the PRs of Gabor (inverse reduce) and
>> Gyula (removal of "aggregate" functions on DataStream).
>>
>> For the windowing, I think that the current model does not work for
>> out-of-order processing. Therefore, the whole windowing infrastructure will
>> basically have to be redone. Meaning also that any work on the
>> pre-aggregators or optimizations that we do now becomes useless.
>>
>> For the API, I proposed to restructure the interactions between all the
>> different *DataStream classes and grouping/windowing. (See API section of
>> the doc I posted.)
>>
>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thanks for the nice summary, this is a very good initiative.
>>>
>>> I added some comments to the respective sections (where I didnt fully
>> agree
>>> :).).
>>> At some point I think it would be good to have a public hangout session
>> on
>>> this, which could make a more dynamic discussion.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún.
>> 22.,
>>> H, 21:34):
>>>
>>>> Hi,
>>>> with people proposing changes to the streaming part I also wanted to
>>> throw
>>>> my hat into the ring. :D
>>>>
>>>> During the last few months, while I was getting acquainted with the
>>>> streaming system, I wrote down some thoughts I had about how things
>> could
>>>> be improved. Hopefully, they are in somewhat coherent shape now, so
>>> please
>>>> have a look if you are interested in this:
>>>>
>>>>
>>>
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>
>>>> This mostly covers:
>>>>  - Timestamps assigned at sources
>>>>  - Out-of-order processing of elements in window operators
>>>>  - API design
>>>>
>>>> Please let me know what you think. Comment in the document or here in
>> the
>>>> mailing list.
>>>>
>>>> I have a PR in the makings that would introduce source timestamps and
>>>> watermarks for keeping track of them. I also hacked a proof-of-concept
>>> of a
>>>> windowing system that is able to process out-of-order elements using a
>>>> FlatMap operator. (It uses panes to perform efficient
>> pre-aggregations.)
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Stephan Ewen
What I like a lot about Aljoscha's proposed design is that we need no
different code for "system time" vs. "event time". It only differs in where
the timestamps are assigned.

The OOP approach also gives you the semantics of total ordering without
imposing merges on the streams.

On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
[hidden email]> wrote:

> I agree that there should be multiple alternatives the user(!) can
> choose from. Partial out-of-order processing works for many/most
> aggregates. However, if you consider Event-Pattern-Matching, global
> ordering in necessary (even if the performance penalty might be high).
>
> I would also keep "system-time windows" as an alternative to "source
> assigned ts-windows".
>
> It might also be interesting to consider the following paper for
> overlapping windows: "Resource sharing in continuous sliding-window
> aggregates"
>
> > https://dl.acm.org/citation.cfm?id=1316720
>
> -Matthias
>
> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> > Hey
> >
> > I think we should not block PRs unnecessarily if your suggested changes
> > might touch them at some point.
> >
> > Also I still think we should not put everything in the Datastream because
> > it will be a huge mess.
> >
> > Also we need to agree on the out of order processing, whether we want it
> > the way you proposed it(which is quite costly). Another alternative
> > approach there which fits in the current windowing is to filter out if
> > order events and apply a special handling operator on them. This would be
> > fairly lightweight.
> >
> > My point is that we need to consider some alternative solutions. And we
> > should not block contributions along the way.
> >
> > Cheers
> > Gyula
> >
> > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> >> The reason I posted this now is that we need to think about the API and
> >> windowing before proceeding with the PRs of Gabor (inverse reduce) and
> >> Gyula (removal of "aggregate" functions on DataStream).
> >>
> >> For the windowing, I think that the current model does not work for
> >> out-of-order processing. Therefore, the whole windowing infrastructure
> will
> >> basically have to be redone. Meaning also that any work on the
> >> pre-aggregators or optimizations that we do now becomes useless.
> >>
> >> For the API, I proposed to restructure the interactions between all the
> >> different *DataStream classes and grouping/windowing. (See API section
> of
> >> the doc I posted.)
> >>
> >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote:
> >>
> >>> Hi Aljoscha,
> >>>
> >>> Thanks for the nice summary, this is a very good initiative.
> >>>
> >>> I added some comments to the respective sections (where I didnt fully
> >> agree
> >>> :).).
> >>> At some point I think it would be good to have a public hangout session
> >> on
> >>> this, which could make a more dynamic discussion.
> >>>
> >>> Cheers,
> >>> Gyula
> >>>
> >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún.
> >> 22.,
> >>> H, 21:34):
> >>>
> >>>> Hi,
> >>>> with people proposing changes to the streaming part I also wanted to
> >>> throw
> >>>> my hat into the ring. :D
> >>>>
> >>>> During the last few months, while I was getting acquainted with the
> >>>> streaming system, I wrote down some thoughts I had about how things
> >> could
> >>>> be improved. Hopefully, they are in somewhat coherent shape now, so
> >>> please
> >>>> have a look if you are interested in this:
> >>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> >>>>
> >>>> This mostly covers:
> >>>>  - Timestamps assigned at sources
> >>>>  - Out-of-order processing of elements in window operators
> >>>>  - API design
> >>>>
> >>>> Please let me know what you think. Comment in the document or here in
> >> the
> >>>> mailing list.
> >>>>
> >>>> I have a PR in the makings that would introduce source timestamps and
> >>>> watermarks for keeping track of them. I also hacked a proof-of-concept
> >>> of a
> >>>> windowing system that is able to process out-of-order elements using a
> >>>> FlatMap operator. (It uses panes to perform efficient
> >> pre-aggregations.)
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Aljoscha Krettek-2
I also don't like big changes but sometimes they are necessary. The reason
why I'm so adamant about out-of-order processing is that out-of-order
elements are not some exception that occurs once in a while; they occur
constantly in a distributed system. For example, in this:
https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting windows
are completely bogus because the current windowing system assumes elements
to globally arrive in order, which is simply not true. (The example has a
source that generates increasing integers. Then these pass through a map
and are unioned with the original DataStream before a window operator.)
This simulates elements arriving from different operators at a windowing
operator. The example is also DOP=1, I imagine this to get worse with
higher DOP.

What do you mean by costly? As I said, I have a proof-of-concept windowing
operator that can handle out-or-order elements. This is an example using
the current Flink API: https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
(It is an infinite source of tuples and a 5 second window operator that
counts the tuples.) The first problem is that this code deadlocks because
of the thread that emits fake elements. If I disable the fake element code
it works, but the throughput using my mockup is 4 times higher . The gap
widens dramatically if the window size increases.

So, it actually increases performance (unless I'm making a mistake in my
explorations) and can handle elements that arrive out-of-order (which
happens basically always in a real-world windowing use-cases).


On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote:

> What I like a lot about Aljoscha's proposed design is that we need no
> different code for "system time" vs. "event time". It only differs in where
> the timestamps are assigned.
>
> The OOP approach also gives you the semantics of total ordering without
> imposing merges on the streams.
>
> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> [hidden email]> wrote:
>
> > I agree that there should be multiple alternatives the user(!) can
> > choose from. Partial out-of-order processing works for many/most
> > aggregates. However, if you consider Event-Pattern-Matching, global
> > ordering in necessary (even if the performance penalty might be high).
> >
> > I would also keep "system-time windows" as an alternative to "source
> > assigned ts-windows".
> >
> > It might also be interesting to consider the following paper for
> > overlapping windows: "Resource sharing in continuous sliding-window
> > aggregates"
> >
> > > https://dl.acm.org/citation.cfm?id=1316720
> >
> > -Matthias
> >
> > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> > > Hey
> > >
> > > I think we should not block PRs unnecessarily if your suggested changes
> > > might touch them at some point.
> > >
> > > Also I still think we should not put everything in the Datastream
> because
> > > it will be a huge mess.
> > >
> > > Also we need to agree on the out of order processing, whether we want
> it
> > > the way you proposed it(which is quite costly). Another alternative
> > > approach there which fits in the current windowing is to filter out if
> > > order events and apply a special handling operator on them. This would
> be
> > > fairly lightweight.
> > >
> > > My point is that we need to consider some alternative solutions. And we
> > > should not block contributions along the way.
> > >
> > > Cheers
> > > Gyula
> > >
> > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <[hidden email]>
> > > wrote:
> > >
> > >> The reason I posted this now is that we need to think about the API
> and
> > >> windowing before proceeding with the PRs of Gabor (inverse reduce) and
> > >> Gyula (removal of "aggregate" functions on DataStream).
> > >>
> > >> For the windowing, I think that the current model does not work for
> > >> out-of-order processing. Therefore, the whole windowing infrastructure
> > will
> > >> basically have to be redone. Meaning also that any work on the
> > >> pre-aggregators or optimizations that we do now becomes useless.
> > >>
> > >> For the API, I proposed to restructure the interactions between all
> the
> > >> different *DataStream classes and grouping/windowing. (See API section
> > of
> > >> the doc I posted.)
> > >>
> > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote:
> > >>
> > >>> Hi Aljoscha,
> > >>>
> > >>> Thanks for the nice summary, this is a very good initiative.
> > >>>
> > >>> I added some comments to the respective sections (where I didnt fully
> > >> agree
> > >>> :).).
> > >>> At some point I think it would be good to have a public hangout
> session
> > >> on
> > >>> this, which could make a more dynamic discussion.
> > >>>
> > >>> Cheers,
> > >>> Gyula
> > >>>
> > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún.
> > >> 22.,
> > >>> H, 21:34):
> > >>>
> > >>>> Hi,
> > >>>> with people proposing changes to the streaming part I also wanted to
> > >>> throw
> > >>>> my hat into the ring. :D
> > >>>>
> > >>>> During the last few months, while I was getting acquainted with the
> > >>>> streaming system, I wrote down some thoughts I had about how things
> > >> could
> > >>>> be improved. Hopefully, they are in somewhat coherent shape now, so
> > >>> please
> > >>>> have a look if you are interested in this:
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > >>>>
> > >>>> This mostly covers:
> > >>>>  - Timestamps assigned at sources
> > >>>>  - Out-of-order processing of elements in window operators
> > >>>>  - API design
> > >>>>
> > >>>> Please let me know what you think. Comment in the document or here
> in
> > >> the
> > >>>> mailing list.
> > >>>>
> > >>>> I have a PR in the makings that would introduce source timestamps
> and
> > >>>> watermarks for keeping track of them. I also hacked a
> proof-of-concept
> > >>> of a
> > >>>> windowing system that is able to process out-of-order elements
> using a
> > >>>> FlatMap operator. (It uses panes to perform efficient
> > >> pre-aggregations.)
> > >>>>
> > >>>> Cheers,
> > >>>> Aljoscha
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Ted Dunning
Out of order is ubiquitous in the real-world.  Typically, what happens is
that businesses will declare a maximum allowable delay for delayed
transactions and will commit to results when that delay is reached.
Transactions that arrive later than this cutoff are collected specially as
corrections which are reported/used when possible.

Clearly, ordering can also be violated during processing, but if the data
is originally out of order the situation can't be repaired by any protocol
fixes that prevent transactions from becoming disordered but has to handled
at the data level.




On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]>
wrote:

> I also don't like big changes but sometimes they are necessary. The reason
> why I'm so adamant about out-of-order processing is that out-of-order
> elements are not some exception that occurs once in a while; they occur
> constantly in a distributed system. For example, in this:
> https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
> windows
> are completely bogus because the current windowing system assumes elements
> to globally arrive in order, which is simply not true. (The example has a
> source that generates increasing integers. Then these pass through a map
> and are unioned with the original DataStream before a window operator.)
> This simulates elements arriving from different operators at a windowing
> operator. The example is also DOP=1, I imagine this to get worse with
> higher DOP.
>
> What do you mean by costly? As I said, I have a proof-of-concept windowing
> operator that can handle out-or-order elements. This is an example using
> the current Flink API:
> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> (It is an infinite source of tuples and a 5 second window operator that
> counts the tuples.) The first problem is that this code deadlocks because
> of the thread that emits fake elements. If I disable the fake element code
> it works, but the throughput using my mockup is 4 times higher . The gap
> widens dramatically if the window size increases.
>
> So, it actually increases performance (unless I'm making a mistake in my
> explorations) and can handle elements that arrive out-of-order (which
> happens basically always in a real-world windowing use-cases).
>
>
> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote:
>
> > What I like a lot about Aljoscha's proposed design is that we need no
> > different code for "system time" vs. "event time". It only differs in
> where
> > the timestamps are assigned.
> >
> > The OOP approach also gives you the semantics of total ordering without
> > imposing merges on the streams.
> >
> > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> > [hidden email]> wrote:
> >
> > > I agree that there should be multiple alternatives the user(!) can
> > > choose from. Partial out-of-order processing works for many/most
> > > aggregates. However, if you consider Event-Pattern-Matching, global
> > > ordering in necessary (even if the performance penalty might be high).
> > >
> > > I would also keep "system-time windows" as an alternative to "source
> > > assigned ts-windows".
> > >
> > > It might also be interesting to consider the following paper for
> > > overlapping windows: "Resource sharing in continuous sliding-window
> > > aggregates"
> > >
> > > > https://dl.acm.org/citation.cfm?id=1316720
> > >
> > > -Matthias
> > >
> > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> > > > Hey
> > > >
> > > > I think we should not block PRs unnecessarily if your suggested
> changes
> > > > might touch them at some point.
> > > >
> > > > Also I still think we should not put everything in the Datastream
> > because
> > > > it will be a huge mess.
> > > >
> > > > Also we need to agree on the out of order processing, whether we want
> > it
> > > > the way you proposed it(which is quite costly). Another alternative
> > > > approach there which fits in the current windowing is to filter out
> if
> > > > order events and apply a special handling operator on them. This
> would
> > be
> > > > fairly lightweight.
> > > >
> > > > My point is that we need to consider some alternative solutions. And
> we
> > > > should not block contributions along the way.
> > > >
> > > > Cheers
> > > > Gyula
> > > >
> > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> [hidden email]>
> > > > wrote:
> > > >
> > > >> The reason I posted this now is that we need to think about the API
> > and
> > > >> windowing before proceeding with the PRs of Gabor (inverse reduce)
> and
> > > >> Gyula (removal of "aggregate" functions on DataStream).
> > > >>
> > > >> For the windowing, I think that the current model does not work for
> > > >> out-of-order processing. Therefore, the whole windowing
> infrastructure
> > > will
> > > >> basically have to be redone. Meaning also that any work on the
> > > >> pre-aggregators or optimizations that we do now becomes useless.
> > > >>
> > > >> For the API, I proposed to restructure the interactions between all
> > the
> > > >> different *DataStream classes and grouping/windowing. (See API
> section
> > > of
> > > >> the doc I posted.)
> > > >>
> > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]>
> wrote:
> > > >>
> > > >>> Hi Aljoscha,
> > > >>>
> > > >>> Thanks for the nice summary, this is a very good initiative.
> > > >>>
> > > >>> I added some comments to the respective sections (where I didnt
> fully
> > > >> agree
> > > >>> :).).
> > > >>> At some point I think it would be good to have a public hangout
> > session
> > > >> on
> > > >>> this, which could make a more dynamic discussion.
> > > >>>
> > > >>> Cheers,
> > > >>> Gyula
> > > >>>
> > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015.
> jún.
> > > >> 22.,
> > > >>> H, 21:34):
> > > >>>
> > > >>>> Hi,
> > > >>>> with people proposing changes to the streaming part I also wanted
> to
> > > >>> throw
> > > >>>> my hat into the ring. :D
> > > >>>>
> > > >>>> During the last few months, while I was getting acquainted with
> the
> > > >>>> streaming system, I wrote down some thoughts I had about how
> things
> > > >> could
> > > >>>> be improved. Hopefully, they are in somewhat coherent shape now,
> so
> > > >>> please
> > > >>>> have a look if you are interested in this:
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > > >>>>
> > > >>>> This mostly covers:
> > > >>>>  - Timestamps assigned at sources
> > > >>>>  - Out-of-order processing of elements in window operators
> > > >>>>  - API design
> > > >>>>
> > > >>>> Please let me know what you think. Comment in the document or here
> > in
> > > >> the
> > > >>>> mailing list.
> > > >>>>
> > > >>>> I have a PR in the makings that would introduce source timestamps
> > and
> > > >>>> watermarks for keeping track of them. I also hacked a
> > proof-of-concept
> > > >>> of a
> > > >>>> windowing system that is able to process out-of-order elements
> > using a
> > > >>>> FlatMap operator. (It uses panes to perform efficient
> > > >> pre-aggregations.)
> > > >>>>
> > > >>>> Cheers,
> > > >>>> Aljoscha
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Kostas Tzoumas-2
I agree with supporting out-of-order out of the box :-), even if this means
a major refactoring. This is the right time to refactor the streaming API
before we pull it out of beta. I think that this is more important than new
features in the streaming API, which can be prioritized once the API is out
of beta (meaning, that IMO this is the right time to stall PRs until we
agree on the design).

There are three sections in the document: windowing, state, and API. How
convoluted are those with each other? Can we separate the discussion or do
we need to discuss those all together? I think part of the difficulty is
that we are discussing three design choices at once.

On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]> wrote:

> Out of order is ubiquitous in the real-world.  Typically, what happens is
> that businesses will declare a maximum allowable delay for delayed
> transactions and will commit to results when that delay is reached.
> Transactions that arrive later than this cutoff are collected specially as
> corrections which are reported/used when possible.
>
> Clearly, ordering can also be violated during processing, but if the data
> is originally out of order the situation can't be repaired by any protocol
> fixes that prevent transactions from becoming disordered but has to handled
> at the data level.
>
>
>
>
> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > I also don't like big changes but sometimes they are necessary. The
> reason
> > why I'm so adamant about out-of-order processing is that out-of-order
> > elements are not some exception that occurs once in a while; they occur
> > constantly in a distributed system. For example, in this:
> > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
> > windows
> > are completely bogus because the current windowing system assumes
> elements
> > to globally arrive in order, which is simply not true. (The example has a
> > source that generates increasing integers. Then these pass through a map
> > and are unioned with the original DataStream before a window operator.)
> > This simulates elements arriving from different operators at a windowing
> > operator. The example is also DOP=1, I imagine this to get worse with
> > higher DOP.
> >
> > What do you mean by costly? As I said, I have a proof-of-concept
> windowing
> > operator that can handle out-or-order elements. This is an example using
> > the current Flink API:
> > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> > (It is an infinite source of tuples and a 5 second window operator that
> > counts the tuples.) The first problem is that this code deadlocks because
> > of the thread that emits fake elements. If I disable the fake element
> code
> > it works, but the throughput using my mockup is 4 times higher . The gap
> > widens dramatically if the window size increases.
> >
> > So, it actually increases performance (unless I'm making a mistake in my
> > explorations) and can handle elements that arrive out-of-order (which
> > happens basically always in a real-world windowing use-cases).
> >
> >
> > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote:
> >
> > > What I like a lot about Aljoscha's proposed design is that we need no
> > > different code for "system time" vs. "event time". It only differs in
> > where
> > > the timestamps are assigned.
> > >
> > > The OOP approach also gives you the semantics of total ordering without
> > > imposing merges on the streams.
> > >
> > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> > > [hidden email]> wrote:
> > >
> > > > I agree that there should be multiple alternatives the user(!) can
> > > > choose from. Partial out-of-order processing works for many/most
> > > > aggregates. However, if you consider Event-Pattern-Matching, global
> > > > ordering in necessary (even if the performance penalty might be
> high).
> > > >
> > > > I would also keep "system-time windows" as an alternative to "source
> > > > assigned ts-windows".
> > > >
> > > > It might also be interesting to consider the following paper for
> > > > overlapping windows: "Resource sharing in continuous sliding-window
> > > > aggregates"
> > > >
> > > > > https://dl.acm.org/citation.cfm?id=1316720
> > > >
> > > > -Matthias
> > > >
> > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> > > > > Hey
> > > > >
> > > > > I think we should not block PRs unnecessarily if your suggested
> > changes
> > > > > might touch them at some point.
> > > > >
> > > > > Also I still think we should not put everything in the Datastream
> > > because
> > > > > it will be a huge mess.
> > > > >
> > > > > Also we need to agree on the out of order processing, whether we
> want
> > > it
> > > > > the way you proposed it(which is quite costly). Another alternative
> > > > > approach there which fits in the current windowing is to filter out
> > if
> > > > > order events and apply a special handling operator on them. This
> > would
> > > be
> > > > > fairly lightweight.
> > > > >
> > > > > My point is that we need to consider some alternative solutions.
> And
> > we
> > > > > should not block contributions along the way.
> > > > >
> > > > > Cheers
> > > > > Gyula
> > > > >
> > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> > [hidden email]>
> > > > > wrote:
> > > > >
> > > > >> The reason I posted this now is that we need to think about the
> API
> > > and
> > > > >> windowing before proceeding with the PRs of Gabor (inverse reduce)
> > and
> > > > >> Gyula (removal of "aggregate" functions on DataStream).
> > > > >>
> > > > >> For the windowing, I think that the current model does not work
> for
> > > > >> out-of-order processing. Therefore, the whole windowing
> > infrastructure
> > > > will
> > > > >> basically have to be redone. Meaning also that any work on the
> > > > >> pre-aggregators or optimizations that we do now becomes useless.
> > > > >>
> > > > >> For the API, I proposed to restructure the interactions between
> all
> > > the
> > > > >> different *DataStream classes and grouping/windowing. (See API
> > section
> > > > of
> > > > >> the doc I posted.)
> > > > >>
> > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]>
> > wrote:
> > > > >>
> > > > >>> Hi Aljoscha,
> > > > >>>
> > > > >>> Thanks for the nice summary, this is a very good initiative.
> > > > >>>
> > > > >>> I added some comments to the respective sections (where I didnt
> > fully
> > > > >> agree
> > > > >>> :).).
> > > > >>> At some point I think it would be good to have a public hangout
> > > session
> > > > >> on
> > > > >>> this, which could make a more dynamic discussion.
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Gyula
> > > > >>>
> > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015.
> > jún.
> > > > >> 22.,
> > > > >>> H, 21:34):
> > > > >>>
> > > > >>>> Hi,
> > > > >>>> with people proposing changes to the streaming part I also
> wanted
> > to
> > > > >>> throw
> > > > >>>> my hat into the ring. :D
> > > > >>>>
> > > > >>>> During the last few months, while I was getting acquainted with
> > the
> > > > >>>> streaming system, I wrote down some thoughts I had about how
> > things
> > > > >> could
> > > > >>>> be improved. Hopefully, they are in somewhat coherent shape now,
> > so
> > > > >>> please
> > > > >>>> have a look if you are interested in this:
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > > > >>>>
> > > > >>>> This mostly covers:
> > > > >>>>  - Timestamps assigned at sources
> > > > >>>>  - Out-of-order processing of elements in window operators
> > > > >>>>  - API design
> > > > >>>>
> > > > >>>> Please let me know what you think. Comment in the document or
> here
> > > in
> > > > >> the
> > > > >>>> mailing list.
> > > > >>>>
> > > > >>>> I have a PR in the makings that would introduce source
> timestamps
> > > and
> > > > >>>> watermarks for keeping track of them. I also hacked a
> > > proof-of-concept
> > > > >>> of a
> > > > >>>> windowing system that is able to process out-of-order elements
> > > using a
> > > > >>>> FlatMap operator. (It uses panes to perform efficient
> > > > >> pre-aggregations.)
> > > > >>>>
> > > > >>>> Cheers,
> > > > >>>> Aljoscha
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Gyula Fóra
I agree lets separate these topics from each other so we can get faster
resolution.

There is already a state discussion in the thread we started with Paris.

On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> wrote:

> I agree with supporting out-of-order out of the box :-), even if this means
> a major refactoring. This is the right time to refactor the streaming API
> before we pull it out of beta. I think that this is more important than new
> features in the streaming API, which can be prioritized once the API is out
> of beta (meaning, that IMO this is the right time to stall PRs until we
> agree on the design).
>
> There are three sections in the document: windowing, state, and API. How
> convoluted are those with each other? Can we separate the discussion or do
> we need to discuss those all together? I think part of the difficulty is
> that we are discussing three design choices at once.
>
> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]>
> wrote:
>
> > Out of order is ubiquitous in the real-world.  Typically, what happens is
> > that businesses will declare a maximum allowable delay for delayed
> > transactions and will commit to results when that delay is reached.
> > Transactions that arrive later than this cutoff are collected specially
> as
> > corrections which are reported/used when possible.
> >
> > Clearly, ordering can also be violated during processing, but if the data
> > is originally out of order the situation can't be repaired by any
> protocol
> > fixes that prevent transactions from becoming disordered but has to
> handled
> > at the data level.
> >
> >
> >
> >
> > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > I also don't like big changes but sometimes they are necessary. The
> > reason
> > > why I'm so adamant about out-of-order processing is that out-of-order
> > > elements are not some exception that occurs once in a while; they occur
> > > constantly in a distributed system. For example, in this:
> > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
> > > windows
> > > are completely bogus because the current windowing system assumes
> > elements
> > > to globally arrive in order, which is simply not true. (The example
> has a
> > > source that generates increasing integers. Then these pass through a
> map
> > > and are unioned with the original DataStream before a window operator.)
> > > This simulates elements arriving from different operators at a
> windowing
> > > operator. The example is also DOP=1, I imagine this to get worse with
> > > higher DOP.
> > >
> > > What do you mean by costly? As I said, I have a proof-of-concept
> > windowing
> > > operator that can handle out-or-order elements. This is an example
> using
> > > the current Flink API:
> > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> > > (It is an infinite source of tuples and a 5 second window operator that
> > > counts the tuples.) The first problem is that this code deadlocks
> because
> > > of the thread that emits fake elements. If I disable the fake element
> > code
> > > it works, but the throughput using my mockup is 4 times higher . The
> gap
> > > widens dramatically if the window size increases.
> > >
> > > So, it actually increases performance (unless I'm making a mistake in
> my
> > > explorations) and can handle elements that arrive out-of-order (which
> > > happens basically always in a real-world windowing use-cases).
> > >
> > >
> > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote:
> > >
> > > > What I like a lot about Aljoscha's proposed design is that we need no
> > > > different code for "system time" vs. "event time". It only differs in
> > > where
> > > > the timestamps are assigned.
> > > >
> > > > The OOP approach also gives you the semantics of total ordering
> without
> > > > imposing merges on the streams.
> > > >
> > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> > > > [hidden email]> wrote:
> > > >
> > > > > I agree that there should be multiple alternatives the user(!) can
> > > > > choose from. Partial out-of-order processing works for many/most
> > > > > aggregates. However, if you consider Event-Pattern-Matching, global
> > > > > ordering in necessary (even if the performance penalty might be
> > high).
> > > > >
> > > > > I would also keep "system-time windows" as an alternative to
> "source
> > > > > assigned ts-windows".
> > > > >
> > > > > It might also be interesting to consider the following paper for
> > > > > overlapping windows: "Resource sharing in continuous sliding-window
> > > > > aggregates"
> > > > >
> > > > > > https://dl.acm.org/citation.cfm?id=1316720
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> > > > > > Hey
> > > > > >
> > > > > > I think we should not block PRs unnecessarily if your suggested
> > > changes
> > > > > > might touch them at some point.
> > > > > >
> > > > > > Also I still think we should not put everything in the Datastream
> > > > because
> > > > > > it will be a huge mess.
> > > > > >
> > > > > > Also we need to agree on the out of order processing, whether we
> > want
> > > > it
> > > > > > the way you proposed it(which is quite costly). Another
> alternative
> > > > > > approach there which fits in the current windowing is to filter
> out
> > > if
> > > > > > order events and apply a special handling operator on them. This
> > > would
> > > > be
> > > > > > fairly lightweight.
> > > > > >
> > > > > > My point is that we need to consider some alternative solutions.
> > And
> > > we
> > > > > > should not block contributions along the way.
> > > > > >
> > > > > > Cheers
> > > > > > Gyula
> > > > > >
> > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> > > [hidden email]>
> > > > > > wrote:
> > > > > >
> > > > > >> The reason I posted this now is that we need to think about the
> > API
> > > > and
> > > > > >> windowing before proceeding with the PRs of Gabor (inverse
> reduce)
> > > and
> > > > > >> Gyula (removal of "aggregate" functions on DataStream).
> > > > > >>
> > > > > >> For the windowing, I think that the current model does not work
> > for
> > > > > >> out-of-order processing. Therefore, the whole windowing
> > > infrastructure
> > > > > will
> > > > > >> basically have to be redone. Meaning also that any work on the
> > > > > >> pre-aggregators or optimizations that we do now becomes useless.
> > > > > >>
> > > > > >> For the API, I proposed to restructure the interactions between
> > all
> > > > the
> > > > > >> different *DataStream classes and grouping/windowing. (See API
> > > section
> > > > > of
> > > > > >> the doc I posted.)
> > > > > >>
> > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]>
> > > wrote:
> > > > > >>
> > > > > >>> Hi Aljoscha,
> > > > > >>>
> > > > > >>> Thanks for the nice summary, this is a very good initiative.
> > > > > >>>
> > > > > >>> I added some comments to the respective sections (where I didnt
> > > fully
> > > > > >> agree
> > > > > >>> :).).
> > > > > >>> At some point I think it would be good to have a public hangout
> > > > session
> > > > > >> on
> > > > > >>> this, which could make a more dynamic discussion.
> > > > > >>>
> > > > > >>> Cheers,
> > > > > >>> Gyula
> > > > > >>>
> > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont:
> 2015.
> > > jún.
> > > > > >> 22.,
> > > > > >>> H, 21:34):
> > > > > >>>
> > > > > >>>> Hi,
> > > > > >>>> with people proposing changes to the streaming part I also
> > wanted
> > > to
> > > > > >>> throw
> > > > > >>>> my hat into the ring. :D
> > > > > >>>>
> > > > > >>>> During the last few months, while I was getting acquainted
> with
> > > the
> > > > > >>>> streaming system, I wrote down some thoughts I had about how
> > > things
> > > > > >> could
> > > > > >>>> be improved. Hopefully, they are in somewhat coherent shape
> now,
> > > so
> > > > > >>> please
> > > > > >>>> have a look if you are interested in this:
> > > > > >>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > > > > >>>>
> > > > > >>>> This mostly covers:
> > > > > >>>>  - Timestamps assigned at sources
> > > > > >>>>  - Out-of-order processing of elements in window operators
> > > > > >>>>  - API design
> > > > > >>>>
> > > > > >>>> Please let me know what you think. Comment in the document or
> > here
> > > > in
> > > > > >> the
> > > > > >>>> mailing list.
> > > > > >>>>
> > > > > >>>> I have a PR in the makings that would introduce source
> > timestamps
> > > > and
> > > > > >>>> watermarks for keeping track of them. I also hacked a
> > > > proof-of-concept
> > > > > >>> of a
> > > > > >>>> windowing system that is able to process out-of-order elements
> > > > using a
> > > > > >>>> FlatMap operator. (It uses panes to perform efficient
> > > > > >> pre-aggregations.)
> > > > > >>>>
> > > > > >>>> Cheers,
> > > > > >>>> Aljoscha
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Aljoscha Krettek-2
I think I'll have to elaborate a bit so I created a proof-of-concept
implementation of my Ideas and ran some throughput measurements to
alleviate concerns about performance.

First, though, I want to highlight again why the current approach does not
work with out-of-order elements (which, again, occur constantly due to the
distributed nature of the system). This is the example I posted earlier:
https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like
this:

+--+
| | Source
+--+
|
+-----+
| |
| +--+
| | | Identity Map
| +--+
| |
+-----+
|
+--+
| | Window
+--+
|
|
+--+
| | Sink
+--+

So all it does is pass the elements through an identity map and then merge
them again before the window operator. The source emits ascending integers
and the window operator has a custom timestamp extractor that uses the
integer itself as the timestamp and should create windows of size 4 (that
is elements with timestamp 0-3 are one window, the next are the elements
with timestamp 4-8, and so on). Since the topology basically doubles the
elements form the source I would expect to get these windows:
Window: 0, 0, 1, 1, 2, 2, 3, 3
Window: 4, 4, 6, 6, 7, 7, 8, 8

The output is this, however:
Window: 0, 1, 2, 3,
Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
Window: 8, 9, 10, 11,
Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
Window: 16, 17, 18, 19,
Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
Window: 24, 25, 26, 27,
Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,

The reason is that the elements simply arrive out-of-order. Imagine what
would happen if the elements actually arrived with some delay from
different operations.

Now, on to the performance numbers. The proof-of-concept I created is
available here:
https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic
idea is that sources assign the current timestamp when emitting elements.
They also periodically emit watermarks that tell us that no elements with
an earlier timestamp will be emitted. The watermarks propagate through the
operators. The window operator looks at the timestamp of an element and
puts it into the buffer that corresponds to that window. When the window
operator receives a watermark it will look at the in-flight windows
(basically the buffers) and emit those windows where the window-end is
before the watermark.

For measuring throughput I did the following: The source emits tuples of
the form ("tuple", 1) in an infinite loop. The window operator sums up the
tuples, thereby counting how many tuples the window operator can handle in
a given time window. There are two different implementations for the
summation: 1) simply summing up the values in a mapWindow(), there you get
a List of all tuples and simple iterate over it. 2) using sum(1), which is
implemented as a reduce() (that uses the pre-reducer optimisations).

These are the performance numbers (Current is the current implementation,
Next is my proof-of-concept):

Tumbling (1 sec):
 - Current/Map: 1.6 mio
 - Current/Reduce: 2 mio
 - Next/Map: 2.2 mio
 - Next/Reduce: 4 mio

Sliding (5 sec, slide 1 sec):
 - Current/Map: ca 3 mio (fluctuates a lot)
 - Current/Reduce: No output
 - Next/Map: ca 4 mio (fluctuates)
 - Next/Reduce: 10 mio

The Next/Reduce variant can basically scale indefinitely with window size
because the internal state does not rely on the number of elements (it is
just the current sum). The pre-reducer for sliding elements cannot handle
the amount of tuples, it produces no output. For the two Map variants the
performance fluctuates because they always keep all the elements in an
internal buffer before emission, this seems to tax the garbage collector a
bit and leads to random pauses.

One thing that should be noted is that I had to disable the fake-element
emission thread, otherwise the Current versions would deadlock.

So, I started working on this because I thought that out-of-order
processing would be necessary for correctness. And it is certainly, But the
proof-of-concept also shows that performance can be greatly improved.

On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote:
>
> I agree lets separate these topics from each other so we can get faster
> resolution.
>
> There is already a state discussion in the thread we started with Paris.
>
> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]>
wrote:
>
> > I agree with supporting out-of-order out of the box :-), even if this
means
> > a major refactoring. This is the right time to refactor the streaming
API
> > before we pull it out of beta. I think that this is more important than
new
> > features in the streaming API, which can be prioritized once the API is
out
> > of beta (meaning, that IMO this is the right time to stall PRs until we
> > agree on the design).
> >
> > There are three sections in the document: windowing, state, and API. How
> > convoluted are those with each other? Can we separate the discussion or
do
> > we need to discuss those all together? I think part of the difficulty is
> > that we are discussing three design choices at once.
> >
> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]>
> > wrote:
> >
> > > Out of order is ubiquitous in the real-world.  Typically, what
happens is
> > > that businesses will declare a maximum allowable delay for delayed
> > > transactions and will commit to results when that delay is reached.
> > > Transactions that arrive later than this cutoff are collected
specially
> > as
> > > corrections which are reported/used when possible.
> > >
> > > Clearly, ordering can also be violated during processing, but if the
data

> > > is originally out of order the situation can't be repaired by any
> > protocol
> > > fixes that prevent transactions from becoming disordered but has to
> > handled
> > > at the data level.
> > >
> > >
> > >
> > >
> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]
>
> > > wrote:
> > >
> > > > I also don't like big changes but sometimes they are necessary. The
> > > reason
> > > > why I'm so adamant about out-of-order processing is that
out-of-order
> > > > elements are not some exception that occurs once in a while; they
occur

> > > > constantly in a distributed system. For example, in this:
> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
> > > > windows
> > > > are completely bogus because the current windowing system assumes
> > > elements
> > > > to globally arrive in order, which is simply not true. (The example
> > has a
> > > > source that generates increasing integers. Then these pass through a
> > map
> > > > and are unioned with the original DataStream before a window
operator.)
> > > > This simulates elements arriving from different operators at a
> > windowing
> > > > operator. The example is also DOP=1, I imagine this to get worse
with
> > > > higher DOP.
> > > >
> > > > What do you mean by costly? As I said, I have a proof-of-concept
> > > windowing
> > > > operator that can handle out-or-order elements. This is an example
> > using
> > > > the current Flink API:
> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> > > > (It is an infinite source of tuples and a 5 second window operator
that
> > > > counts the tuples.) The first problem is that this code deadlocks
> > because
> > > > of the thread that emits fake elements. If I disable the fake
element
> > > code
> > > > it works, but the throughput using my mockup is 4 times higher . The
> > gap
> > > > widens dramatically if the window size increases.
> > > >
> > > > So, it actually increases performance (unless I'm making a mistake
in
> > my
> > > > explorations) and can handle elements that arrive out-of-order
(which
> > > > happens basically always in a real-world windowing use-cases).
> > > >
> > > >
> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote:
> > > >
> > > > > What I like a lot about Aljoscha's proposed design is that we
need no
> > > > > different code for "system time" vs. "event time". It only
differs in

> > > > where
> > > > > the timestamps are assigned.
> > > > >
> > > > > The OOP approach also gives you the semantics of total ordering
> > without
> > > > > imposing merges on the streams.
> > > > >
> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> > > > > [hidden email]> wrote:
> > > > >
> > > > > > I agree that there should be multiple alternatives the user(!)
can
> > > > > > choose from. Partial out-of-order processing works for many/most
> > > > > > aggregates. However, if you consider Event-Pattern-Matching,
global
> > > > > > ordering in necessary (even if the performance penalty might be
> > > high).
> > > > > >
> > > > > > I would also keep "system-time windows" as an alternative to
> > "source
> > > > > > assigned ts-windows".
> > > > > >
> > > > > > It might also be interesting to consider the following paper for
> > > > > > overlapping windows: "Resource sharing in continuous
sliding-window

> > > > > > aggregates"
> > > > > >
> > > > > > > https://dl.acm.org/citation.cfm?id=1316720
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> > > > > > > Hey
> > > > > > >
> > > > > > > I think we should not block PRs unnecessarily if your
suggested
> > > > changes
> > > > > > > might touch them at some point.
> > > > > > >
> > > > > > > Also I still think we should not put everything in the
Datastream
> > > > > because
> > > > > > > it will be a huge mess.
> > > > > > >
> > > > > > > Also we need to agree on the out of order processing, whether
we
> > > want
> > > > > it
> > > > > > > the way you proposed it(which is quite costly). Another
> > alternative
> > > > > > > approach there which fits in the current windowing is to
filter
> > out
> > > > if
> > > > > > > order events and apply a special handling operator on them.
This
> > > > would
> > > > > be
> > > > > > > fairly lightweight.
> > > > > > >
> > > > > > > My point is that we need to consider some alternative
solutions.

> > > And
> > > > we
> > > > > > > should not block contributions along the way.
> > > > > > >
> > > > > > > Cheers
> > > > > > > Gyula
> > > > > > >
> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> > > > [hidden email]>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> The reason I posted this now is that we need to think about
the
> > > API
> > > > > and
> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse
> > reduce)
> > > > and
> > > > > > >> Gyula (removal of "aggregate" functions on DataStream).
> > > > > > >>
> > > > > > >> For the windowing, I think that the current model does not
work
> > > for
> > > > > > >> out-of-order processing. Therefore, the whole windowing
> > > > infrastructure
> > > > > > will
> > > > > > >> basically have to be redone. Meaning also that any work on
the
> > > > > > >> pre-aggregators or optimizations that we do now becomes
useless.
> > > > > > >>
> > > > > > >> For the API, I proposed to restructure the interactions
between
> > > all
> > > > > the
> > > > > > >> different *DataStream classes and grouping/windowing. (See
API

> > > > section
> > > > > > of
> > > > > > >> the doc I posted.)
> > > > > > >>
> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]
>
> > > > wrote:
> > > > > > >>
> > > > > > >>> Hi Aljoscha,
> > > > > > >>>
> > > > > > >>> Thanks for the nice summary, this is a very good initiative.
> > > > > > >>>
> > > > > > >>> I added some comments to the respective sections (where I
didnt
> > > > fully
> > > > > > >> agree
> > > > > > >>> :).).
> > > > > > >>> At some point I think it would be good to have a public
hangout

> > > > > session
> > > > > > >> on
> > > > > > >>> this, which could make a more dynamic discussion.
> > > > > > >>>
> > > > > > >>> Cheers,
> > > > > > >>> Gyula
> > > > > > >>>
> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont:
> > 2015.
> > > > jún.
> > > > > > >> 22.,
> > > > > > >>> H, 21:34):
> > > > > > >>>
> > > > > > >>>> Hi,
> > > > > > >>>> with people proposing changes to the streaming part I also
> > > wanted
> > > > to
> > > > > > >>> throw
> > > > > > >>>> my hat into the ring. :D
> > > > > > >>>>
> > > > > > >>>> During the last few months, while I was getting acquainted
> > with
> > > > the
> > > > > > >>>> streaming system, I wrote down some thoughts I had about
how

> > > > things
> > > > > > >> could
> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent shape
> > now,
> > > > so
> > > > > > >>> please
> > > > > > >>>> have a look if you are interested in this:
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> > > > > > >>>>
> > > > > > >>>> This mostly covers:
> > > > > > >>>>  - Timestamps assigned at sources
> > > > > > >>>>  - Out-of-order processing of elements in window operators
> > > > > > >>>>  - API design
> > > > > > >>>>
> > > > > > >>>> Please let me know what you think. Comment in the document
or

> > > here
> > > > > in
> > > > > > >> the
> > > > > > >>>> mailing list.
> > > > > > >>>>
> > > > > > >>>> I have a PR in the makings that would introduce source
> > > timestamps
> > > > > and
> > > > > > >>>> watermarks for keeping track of them. I also hacked a
> > > > > proof-of-concept
> > > > > > >>> of a
> > > > > > >>>> windowing system that is able to process out-of-order
elements

> > > > > using a
> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient
> > > > > > >> pre-aggregations.)
> > > > > > >>>>
> > > > > > >>>> Cheers,
> > > > > > >>>> Aljoscha
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Ufuk Celebi-2
Thanks for writing this up and comparing to the current implementation. It's great to see that your mockup indicates correct/expected behaviour *and* better performance. :-)

Regarding the results for the current mechanism: does this problem affects all window operators?

– Ufuk

On 25 Jun 2015, at 11:36, Aljoscha Krettek <[hidden email]> wrote:

> I think I'll have to elaborate a bit so I created a proof-of-concept
> implementation of my Ideas and ran some throughput measurements to
> alleviate concerns about performance.
>
> First, though, I want to highlight again why the current approach does not
> work with out-of-order elements (which, again, occur constantly due to the
> distributed nature of the system). This is the example I posted earlier:
> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like
> this:
>
> +--+
> | | Source
> +--+
> |
> +-----+
> | |
> | +--+
> | | | Identity Map
> | +--+
> | |
> +-----+
> |
> +--+
> | | Window
> +--+
> |
> |
> +--+
> | | Sink
> +--+
>
> So all it does is pass the elements through an identity map and then merge
> them again before the window operator. The source emits ascending integers
> and the window operator has a custom timestamp extractor that uses the
> integer itself as the timestamp and should create windows of size 4 (that
> is elements with timestamp 0-3 are one window, the next are the elements
> with timestamp 4-8, and so on). Since the topology basically doubles the
> elements form the source I would expect to get these windows:
> Window: 0, 0, 1, 1, 2, 2, 3, 3
> Window: 4, 4, 6, 6, 7, 7, 8, 8
>
> The output is this, however:
> Window: 0, 1, 2, 3,
> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> Window: 8, 9, 10, 11,
> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> Window: 16, 17, 18, 19,
> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> Window: 24, 25, 26, 27,
> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>
> The reason is that the elements simply arrive out-of-order. Imagine what
> would happen if the elements actually arrived with some delay from
> different operations.
>
> Now, on to the performance numbers. The proof-of-concept I created is
> available here:
> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic
> idea is that sources assign the current timestamp when emitting elements.
> They also periodically emit watermarks that tell us that no elements with
> an earlier timestamp will be emitted. The watermarks propagate through the
> operators. The window operator looks at the timestamp of an element and
> puts it into the buffer that corresponds to that window. When the window
> operator receives a watermark it will look at the in-flight windows
> (basically the buffers) and emit those windows where the window-end is
> before the watermark.
>
> For measuring throughput I did the following: The source emits tuples of
> the form ("tuple", 1) in an infinite loop. The window operator sums up the
> tuples, thereby counting how many tuples the window operator can handle in
> a given time window. There are two different implementations for the
> summation: 1) simply summing up the values in a mapWindow(), there you get
> a List of all tuples and simple iterate over it. 2) using sum(1), which is
> implemented as a reduce() (that uses the pre-reducer optimisations).
>
> These are the performance numbers (Current is the current implementation,
> Next is my proof-of-concept):
>
> Tumbling (1 sec):
> - Current/Map: 1.6 mio
> - Current/Reduce: 2 mio
> - Next/Map: 2.2 mio
> - Next/Reduce: 4 mio
>
> Sliding (5 sec, slide 1 sec):
> - Current/Map: ca 3 mio (fluctuates a lot)
> - Current/Reduce: No output
> - Next/Map: ca 4 mio (fluctuates)
> - Next/Reduce: 10 mio
>
> The Next/Reduce variant can basically scale indefinitely with window size
> because the internal state does not rely on the number of elements (it is
> just the current sum). The pre-reducer for sliding elements cannot handle
> the amount of tuples, it produces no output. For the two Map variants the
> performance fluctuates because they always keep all the elements in an
> internal buffer before emission, this seems to tax the garbage collector a
> bit and leads to random pauses.
>
> One thing that should be noted is that I had to disable the fake-element
> emission thread, otherwise the Current versions would deadlock.
>
> So, I started working on this because I thought that out-of-order
> processing would be necessary for correctness. And it is certainly, But the
> proof-of-concept also shows that performance can be greatly improved.
>
> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote:
>>
>> I agree lets separate these topics from each other so we can get faster
>> resolution.
>>
>> There is already a state discussion in the thread we started with Paris.
>>
>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]>
> wrote:
>>
>>> I agree with supporting out-of-order out of the box :-), even if this
> means
>>> a major refactoring. This is the right time to refactor the streaming
> API
>>> before we pull it out of beta. I think that this is more important than
> new
>>> features in the streaming API, which can be prioritized once the API is
> out
>>> of beta (meaning, that IMO this is the right time to stall PRs until we
>>> agree on the design).
>>>
>>> There are three sections in the document: windowing, state, and API. How
>>> convoluted are those with each other? Can we separate the discussion or
> do
>>> we need to discuss those all together? I think part of the difficulty is
>>> that we are discussing three design choices at once.
>>>
>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]>
>>> wrote:
>>>
>>>> Out of order is ubiquitous in the real-world.  Typically, what
> happens is
>>>> that businesses will declare a maximum allowable delay for delayed
>>>> transactions and will commit to results when that delay is reached.
>>>> Transactions that arrive later than this cutoff are collected
> specially
>>> as
>>>> corrections which are reported/used when possible.
>>>>
>>>> Clearly, ordering can also be violated during processing, but if the
> data
>>>> is originally out of order the situation can't be repaired by any
>>> protocol
>>>> fixes that prevent transactions from becoming disordered but has to
>>> handled
>>>> at the data level.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]
>>
>>>> wrote:
>>>>
>>>>> I also don't like big changes but sometimes they are necessary. The
>>>> reason
>>>>> why I'm so adamant about out-of-order processing is that
> out-of-order
>>>>> elements are not some exception that occurs once in a while; they
> occur
>>>>> constantly in a distributed system. For example, in this:
>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
>>>>> windows
>>>>> are completely bogus because the current windowing system assumes
>>>> elements
>>>>> to globally arrive in order, which is simply not true. (The example
>>> has a
>>>>> source that generates increasing integers. Then these pass through a
>>> map
>>>>> and are unioned with the original DataStream before a window
> operator.)
>>>>> This simulates elements arriving from different operators at a
>>> windowing
>>>>> operator. The example is also DOP=1, I imagine this to get worse
> with
>>>>> higher DOP.
>>>>>
>>>>> What do you mean by costly? As I said, I have a proof-of-concept
>>>> windowing
>>>>> operator that can handle out-or-order elements. This is an example
>>> using
>>>>> the current Flink API:
>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>>>>> (It is an infinite source of tuples and a 5 second window operator
> that
>>>>> counts the tuples.) The first problem is that this code deadlocks
>>> because
>>>>> of the thread that emits fake elements. If I disable the fake
> element
>>>> code
>>>>> it works, but the throughput using my mockup is 4 times higher . The
>>> gap
>>>>> widens dramatically if the window size increases.
>>>>>
>>>>> So, it actually increases performance (unless I'm making a mistake
> in
>>> my
>>>>> explorations) and can handle elements that arrive out-of-order
> (which
>>>>> happens basically always in a real-world windowing use-cases).
>>>>>
>>>>>
>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>>> What I like a lot about Aljoscha's proposed design is that we
> need no
>>>>>> different code for "system time" vs. "event time". It only
> differs in
>>>>> where
>>>>>> the timestamps are assigned.
>>>>>>
>>>>>> The OOP approach also gives you the semantics of total ordering
>>> without
>>>>>> imposing merges on the streams.
>>>>>>
>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>>>>>> [hidden email]> wrote:
>>>>>>
>>>>>>> I agree that there should be multiple alternatives the user(!)
> can
>>>>>>> choose from. Partial out-of-order processing works for many/most
>>>>>>> aggregates. However, if you consider Event-Pattern-Matching,
> global
>>>>>>> ordering in necessary (even if the performance penalty might be
>>>> high).
>>>>>>>
>>>>>>> I would also keep "system-time windows" as an alternative to
>>> "source
>>>>>>> assigned ts-windows".
>>>>>>>
>>>>>>> It might also be interesting to consider the following paper for
>>>>>>> overlapping windows: "Resource sharing in continuous
> sliding-window
>>>>>>> aggregates"
>>>>>>>
>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>> Hey
>>>>>>>>
>>>>>>>> I think we should not block PRs unnecessarily if your
> suggested
>>>>> changes
>>>>>>>> might touch them at some point.
>>>>>>>>
>>>>>>>> Also I still think we should not put everything in the
> Datastream
>>>>>> because
>>>>>>>> it will be a huge mess.
>>>>>>>>
>>>>>>>> Also we need to agree on the out of order processing, whether
> we
>>>> want
>>>>>> it
>>>>>>>> the way you proposed it(which is quite costly). Another
>>> alternative
>>>>>>>> approach there which fits in the current windowing is to
> filter
>>> out
>>>>> if
>>>>>>>> order events and apply a special handling operator on them.
> This
>>>>> would
>>>>>> be
>>>>>>>> fairly lightweight.
>>>>>>>>
>>>>>>>> My point is that we need to consider some alternative
> solutions.
>>>> And
>>>>> we
>>>>>>>> should not block contributions along the way.
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>>>>> [hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The reason I posted this now is that we need to think about
> the
>>>> API
>>>>>> and
>>>>>>>>> windowing before proceeding with the PRs of Gabor (inverse
>>> reduce)
>>>>> and
>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream).
>>>>>>>>>
>>>>>>>>> For the windowing, I think that the current model does not
> work
>>>> for
>>>>>>>>> out-of-order processing. Therefore, the whole windowing
>>>>> infrastructure
>>>>>>> will
>>>>>>>>> basically have to be redone. Meaning also that any work on
> the
>>>>>>>>> pre-aggregators or optimizations that we do now becomes
> useless.
>>>>>>>>>
>>>>>>>>> For the API, I proposed to restructure the interactions
> between
>>>> all
>>>>>> the
>>>>>>>>> different *DataStream classes and grouping/windowing. (See
> API
>>>>> section
>>>>>>> of
>>>>>>>>> the doc I posted.)
>>>>>>>>>
>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]
>>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>
>>>>>>>>>> Thanks for the nice summary, this is a very good initiative.
>>>>>>>>>>
>>>>>>>>>> I added some comments to the respective sections (where I
> didnt
>>>>> fully
>>>>>>>>> agree
>>>>>>>>>> :).).
>>>>>>>>>> At some point I think it would be good to have a public
> hangout
>>>>>> session
>>>>>>>>> on
>>>>>>>>>> this, which could make a more dynamic discussion.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Gyula
>>>>>>>>>>
>>>>>>>>>> Aljoscha Krettek <[hidden email]> ezt írta (időpont:
>>> 2015.
>>>>> jún.
>>>>>>>>> 22.,
>>>>>>>>>> H, 21:34):
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> with people proposing changes to the streaming part I also
>>>> wanted
>>>>> to
>>>>>>>>>> throw
>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>>
>>>>>>>>>>> During the last few months, while I was getting acquainted
>>> with
>>>>> the
>>>>>>>>>>> streaming system, I wrote down some thoughts I had about
> how
>>>>> things
>>>>>>>>> could
>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent shape
>>> now,
>>>>> so
>>>>>>>>>> please
>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>>
>>>>>>>>>>> This mostly covers:
>>>>>>>>>>> - Timestamps assigned at sources
>>>>>>>>>>> - Out-of-order processing of elements in window operators
>>>>>>>>>>> - API design
>>>>>>>>>>>
>>>>>>>>>>> Please let me know what you think. Comment in the document
> or
>>>> here
>>>>>> in
>>>>>>>>> the
>>>>>>>>>>> mailing list.
>>>>>>>>>>>
>>>>>>>>>>> I have a PR in the makings that would introduce source
>>>> timestamps
>>>>>> and
>>>>>>>>>>> watermarks for keeping track of them. I also hacked a
>>>>>> proof-of-concept
>>>>>>>>>> of a
>>>>>>>>>>> windowing system that is able to process out-of-order
> elements
>>>>>> using a
>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient
>>>>>>>>> pre-aggregations.)
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>

Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Gábor Gévay
In reply to this post by Aljoscha Krettek-2
Hello,

Aljoscha, can you please try the performance test of Current/Reduce
with the InversePreReducer in PR 856? (If you just call sum, it will
use an InversePreReducer.) It would be an interesting test, because
the inverse function optimization really depends on the stream being
ordered, and I think it has the potential of being faster then
Next/Reduce. Especially if the window size is much larger than the
slide size.

Best regards,
Gabor


2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>:

> I think I'll have to elaborate a bit so I created a proof-of-concept
> implementation of my Ideas and ran some throughput measurements to
> alleviate concerns about performance.
>
> First, though, I want to highlight again why the current approach does not
> work with out-of-order elements (which, again, occur constantly due to the
> distributed nature of the system). This is the example I posted earlier:
> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like
> this:
>
> +--+
> | | Source
> +--+
> |
> +-----+
> | |
> | +--+
> | | | Identity Map
> | +--+
> | |
> +-----+
> |
> +--+
> | | Window
> +--+
> |
> |
> +--+
> | | Sink
> +--+
>
> So all it does is pass the elements through an identity map and then merge
> them again before the window operator. The source emits ascending integers
> and the window operator has a custom timestamp extractor that uses the
> integer itself as the timestamp and should create windows of size 4 (that
> is elements with timestamp 0-3 are one window, the next are the elements
> with timestamp 4-8, and so on). Since the topology basically doubles the
> elements form the source I would expect to get these windows:
> Window: 0, 0, 1, 1, 2, 2, 3, 3
> Window: 4, 4, 6, 6, 7, 7, 8, 8
>
> The output is this, however:
> Window: 0, 1, 2, 3,
> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> Window: 8, 9, 10, 11,
> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> Window: 16, 17, 18, 19,
> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> Window: 24, 25, 26, 27,
> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>
> The reason is that the elements simply arrive out-of-order. Imagine what
> would happen if the elements actually arrived with some delay from
> different operations.
>
> Now, on to the performance numbers. The proof-of-concept I created is
> available here:
> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic
> idea is that sources assign the current timestamp when emitting elements.
> They also periodically emit watermarks that tell us that no elements with
> an earlier timestamp will be emitted. The watermarks propagate through the
> operators. The window operator looks at the timestamp of an element and
> puts it into the buffer that corresponds to that window. When the window
> operator receives a watermark it will look at the in-flight windows
> (basically the buffers) and emit those windows where the window-end is
> before the watermark.
>
> For measuring throughput I did the following: The source emits tuples of
> the form ("tuple", 1) in an infinite loop. The window operator sums up the
> tuples, thereby counting how many tuples the window operator can handle in
> a given time window. There are two different implementations for the
> summation: 1) simply summing up the values in a mapWindow(), there you get
> a List of all tuples and simple iterate over it. 2) using sum(1), which is
> implemented as a reduce() (that uses the pre-reducer optimisations).
>
> These are the performance numbers (Current is the current implementation,
> Next is my proof-of-concept):
>
> Tumbling (1 sec):
>  - Current/Map: 1.6 mio
>  - Current/Reduce: 2 mio
>  - Next/Map: 2.2 mio
>  - Next/Reduce: 4 mio
>
> Sliding (5 sec, slide 1 sec):
>  - Current/Map: ca 3 mio (fluctuates a lot)
>  - Current/Reduce: No output
>  - Next/Map: ca 4 mio (fluctuates)
>  - Next/Reduce: 10 mio
>
> The Next/Reduce variant can basically scale indefinitely with window size
> because the internal state does not rely on the number of elements (it is
> just the current sum). The pre-reducer for sliding elements cannot handle
> the amount of tuples, it produces no output. For the two Map variants the
> performance fluctuates because they always keep all the elements in an
> internal buffer before emission, this seems to tax the garbage collector a
> bit and leads to random pauses.
>
> One thing that should be noted is that I had to disable the fake-element
> emission thread, otherwise the Current versions would deadlock.
>
> So, I started working on this because I thought that out-of-order
> processing would be necessary for correctness. And it is certainly, But the
> proof-of-concept also shows that performance can be greatly improved.
>
> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote:
>>
>> I agree lets separate these topics from each other so we can get faster
>> resolution.
>>
>> There is already a state discussion in the thread we started with Paris.
>>
>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]>
> wrote:
>>
>> > I agree with supporting out-of-order out of the box :-), even if this
> means
>> > a major refactoring. This is the right time to refactor the streaming
> API
>> > before we pull it out of beta. I think that this is more important than
> new
>> > features in the streaming API, which can be prioritized once the API is
> out
>> > of beta (meaning, that IMO this is the right time to stall PRs until we
>> > agree on the design).
>> >
>> > There are three sections in the document: windowing, state, and API. How
>> > convoluted are those with each other? Can we separate the discussion or
> do
>> > we need to discuss those all together? I think part of the difficulty is
>> > that we are discussing three design choices at once.
>> >
>> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]>
>> > wrote:
>> >
>> > > Out of order is ubiquitous in the real-world.  Typically, what
> happens is
>> > > that businesses will declare a maximum allowable delay for delayed
>> > > transactions and will commit to results when that delay is reached.
>> > > Transactions that arrive later than this cutoff are collected
> specially
>> > as
>> > > corrections which are reported/used when possible.
>> > >
>> > > Clearly, ordering can also be violated during processing, but if the
> data
>> > > is originally out of order the situation can't be repaired by any
>> > protocol
>> > > fixes that prevent transactions from becoming disordered but has to
>> > handled
>> > > at the data level.
>> > >
>> > >
>> > >
>> > >
>> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]
>>
>> > > wrote:
>> > >
>> > > > I also don't like big changes but sometimes they are necessary. The
>> > > reason
>> > > > why I'm so adamant about out-of-order processing is that
> out-of-order
>> > > > elements are not some exception that occurs once in a while; they
> occur
>> > > > constantly in a distributed system. For example, in this:
>> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting
>> > > > windows
>> > > > are completely bogus because the current windowing system assumes
>> > > elements
>> > > > to globally arrive in order, which is simply not true. (The example
>> > has a
>> > > > source that generates increasing integers. Then these pass through a
>> > map
>> > > > and are unioned with the original DataStream before a window
> operator.)
>> > > > This simulates elements arriving from different operators at a
>> > windowing
>> > > > operator. The example is also DOP=1, I imagine this to get worse
> with
>> > > > higher DOP.
>> > > >
>> > > > What do you mean by costly? As I said, I have a proof-of-concept
>> > > windowing
>> > > > operator that can handle out-or-order elements. This is an example
>> > using
>> > > > the current Flink API:
>> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>> > > > (It is an infinite source of tuples and a 5 second window operator
> that
>> > > > counts the tuples.) The first problem is that this code deadlocks
>> > because
>> > > > of the thread that emits fake elements. If I disable the fake
> element
>> > > code
>> > > > it works, but the throughput using my mockup is 4 times higher . The
>> > gap
>> > > > widens dramatically if the window size increases.
>> > > >
>> > > > So, it actually increases performance (unless I'm making a mistake
> in
>> > my
>> > > > explorations) and can handle elements that arrive out-of-order
> (which
>> > > > happens basically always in a real-world windowing use-cases).
>> > > >
>> > > >
>> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote:
>> > > >
>> > > > > What I like a lot about Aljoscha's proposed design is that we
> need no
>> > > > > different code for "system time" vs. "event time". It only
> differs in
>> > > > where
>> > > > > the timestamps are assigned.
>> > > > >
>> > > > > The OOP approach also gives you the semantics of total ordering
>> > without
>> > > > > imposing merges on the streams.
>> > > > >
>> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>> > > > > [hidden email]> wrote:
>> > > > >
>> > > > > > I agree that there should be multiple alternatives the user(!)
> can
>> > > > > > choose from. Partial out-of-order processing works for many/most
>> > > > > > aggregates. However, if you consider Event-Pattern-Matching,
> global
>> > > > > > ordering in necessary (even if the performance penalty might be
>> > > high).
>> > > > > >
>> > > > > > I would also keep "system-time windows" as an alternative to
>> > "source
>> > > > > > assigned ts-windows".
>> > > > > >
>> > > > > > It might also be interesting to consider the following paper for
>> > > > > > overlapping windows: "Resource sharing in continuous
> sliding-window
>> > > > > > aggregates"
>> > > > > >
>> > > > > > > https://dl.acm.org/citation.cfm?id=1316720
>> > > > > >
>> > > > > > -Matthias
>> > > > > >
>> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>> > > > > > > Hey
>> > > > > > >
>> > > > > > > I think we should not block PRs unnecessarily if your
> suggested
>> > > > changes
>> > > > > > > might touch them at some point.
>> > > > > > >
>> > > > > > > Also I still think we should not put everything in the
> Datastream
>> > > > > because
>> > > > > > > it will be a huge mess.
>> > > > > > >
>> > > > > > > Also we need to agree on the out of order processing, whether
> we
>> > > want
>> > > > > it
>> > > > > > > the way you proposed it(which is quite costly). Another
>> > alternative
>> > > > > > > approach there which fits in the current windowing is to
> filter
>> > out
>> > > > if
>> > > > > > > order events and apply a special handling operator on them.
> This
>> > > > would
>> > > > > be
>> > > > > > > fairly lightweight.
>> > > > > > >
>> > > > > > > My point is that we need to consider some alternative
> solutions.
>> > > And
>> > > > we
>> > > > > > > should not block contributions along the way.
>> > > > > > >
>> > > > > > > Cheers
>> > > > > > > Gyula
>> > > > > > >
>> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>> > > > [hidden email]>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > >> The reason I posted this now is that we need to think about
> the
>> > > API
>> > > > > and
>> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse
>> > reduce)
>> > > > and
>> > > > > > >> Gyula (removal of "aggregate" functions on DataStream).
>> > > > > > >>
>> > > > > > >> For the windowing, I think that the current model does not
> work
>> > > for
>> > > > > > >> out-of-order processing. Therefore, the whole windowing
>> > > > infrastructure
>> > > > > > will
>> > > > > > >> basically have to be redone. Meaning also that any work on
> the
>> > > > > > >> pre-aggregators or optimizations that we do now becomes
> useless.
>> > > > > > >>
>> > > > > > >> For the API, I proposed to restructure the interactions
> between
>> > > all
>> > > > > the
>> > > > > > >> different *DataStream classes and grouping/windowing. (See
> API
>> > > > section
>> > > > > > of
>> > > > > > >> the doc I posted.)
>> > > > > > >>
>> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]
>>
>> > > > wrote:
>> > > > > > >>
>> > > > > > >>> Hi Aljoscha,
>> > > > > > >>>
>> > > > > > >>> Thanks for the nice summary, this is a very good initiative.
>> > > > > > >>>
>> > > > > > >>> I added some comments to the respective sections (where I
> didnt
>> > > > fully
>> > > > > > >> agree
>> > > > > > >>> :).).
>> > > > > > >>> At some point I think it would be good to have a public
> hangout
>> > > > > session
>> > > > > > >> on
>> > > > > > >>> this, which could make a more dynamic discussion.
>> > > > > > >>>
>> > > > > > >>> Cheers,
>> > > > > > >>> Gyula
>> > > > > > >>>
>> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont:
>> > 2015.
>> > > > jún.
>> > > > > > >> 22.,
>> > > > > > >>> H, 21:34):
>> > > > > > >>>
>> > > > > > >>>> Hi,
>> > > > > > >>>> with people proposing changes to the streaming part I also
>> > > wanted
>> > > > to
>> > > > > > >>> throw
>> > > > > > >>>> my hat into the ring. :D
>> > > > > > >>>>
>> > > > > > >>>> During the last few months, while I was getting acquainted
>> > with
>> > > > the
>> > > > > > >>>> streaming system, I wrote down some thoughts I had about
> how
>> > > > things
>> > > > > > >> could
>> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent shape
>> > now,
>> > > > so
>> > > > > > >>> please
>> > > > > > >>>> have a look if you are interested in this:
>> > > > > > >>>>
>> > > > > > >>>>
>> > > > > > >>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>> > > > > > >>>>
>> > > > > > >>>> This mostly covers:
>> > > > > > >>>>  - Timestamps assigned at sources
>> > > > > > >>>>  - Out-of-order processing of elements in window operators
>> > > > > > >>>>  - API design
>> > > > > > >>>>
>> > > > > > >>>> Please let me know what you think. Comment in the document
> or
>> > > here
>> > > > > in
>> > > > > > >> the
>> > > > > > >>>> mailing list.
>> > > > > > >>>>
>> > > > > > >>>> I have a PR in the makings that would introduce source
>> > > timestamps
>> > > > > and
>> > > > > > >>>> watermarks for keeping track of them. I also hacked a
>> > > > > proof-of-concept
>> > > > > > >>> of a
>> > > > > > >>>> windowing system that is able to process out-of-order
> elements
>> > > > > using a
>> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient
>> > > > > > >> pre-aggregations.)
>> > > > > > >>>>
>> > > > > > >>>> Cheers,
>> > > > > > >>>> Aljoscha
>> > > > > > >>>>
>> > > > > > >>>
>> > > > > > >>
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Aljoscha Krettek-2
Hi,
I also ran the tests on top of PR 856 (inverse reducer) now. The results
seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all
the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec)
(Theoretically there would be 5000 tuples in 5 seconds but this is due to
overhead). These are the results for the inverse reduce optimisation:
(Tuple 0,38)
(Tuple 0,829)
(Tuple 0,1625)
(Tuple 0,2424)
(Tuple 0,3190)
(Tuple 0,3198)
(Tuple 0,-339368)
(Tuple 0,-1315725)
(Tuple 0,-2932932)
(Tuple 0,-5082735)
(Tuple 0,-7743256)
(Tuple 0,75701046)
(Tuple 0,642829470)
(Tuple 0,2242018381)
(Tuple 0,5190708618)
(Tuple 0,10060360311)
(Tuple 0,-94254951)
(Tuple 0,-219806321293)
(Tuple 0,-1258895232699)
(Tuple 0,-4074432596329)

One line is one emitted window count. This is what happens when I remove
the Thread.sleep(1):
(Tuple 0,660676)
(Tuple 0,2553733)
(Tuple 0,3542696)
(Tuple 0,1)
(Tuple 0,1107035)
(Tuple 0,2549491)
(Tuple 0,4100387)
(Tuple 0,-8406583360092)
(Tuple 0,-8406582150743)
(Tuple 0,-8406580427190)
(Tuple 0,-8406580427190)
(Tuple 0,-8406580427190)
(Tuple 0,6847279255682044995)
(Tuple 0,6847279255682044995)
(Tuple 0,-5390528042713628318)
(Tuple 0,-5390528042711551780)
(Tuple 0,-5390528042711551780)

So at some point the pre-reducer seems to go haywire and does not recover
from it. The good thing is that it does produce results now, where the
previous Current/Reduce would simply hang and not produce any output.

On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote:

> Hello,
>
> Aljoscha, can you please try the performance test of Current/Reduce
> with the InversePreReducer in PR 856? (If you just call sum, it will
> use an InversePreReducer.) It would be an interesting test, because
> the inverse function optimization really depends on the stream being
> ordered, and I think it has the potential of being faster then
> Next/Reduce. Especially if the window size is much larger than the
> slide size.
>
> Best regards,
> Gabor
>
>
> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>:
> > I think I'll have to elaborate a bit so I created a proof-of-concept
> > implementation of my Ideas and ran some throughput measurements to
> > alleviate concerns about performance.
> >
> > First, though, I want to highlight again why the current approach does
> not
> > work with out-of-order elements (which, again, occur constantly due to
> the
> > distributed nature of the system). This is the example I posted earlier:
> > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks
> like
> > this:
> >
> > +--+
> > | | Source
> > +--+
> > |
> > +-----+
> > | |
> > | +--+
> > | | | Identity Map
> > | +--+
> > | |
> > +-----+
> > |
> > +--+
> > | | Window
> > +--+
> > |
> > |
> > +--+
> > | | Sink
> > +--+
> >
> > So all it does is pass the elements through an identity map and then
> merge
> > them again before the window operator. The source emits ascending
> integers
> > and the window operator has a custom timestamp extractor that uses the
> > integer itself as the timestamp and should create windows of size 4 (that
> > is elements with timestamp 0-3 are one window, the next are the elements
> > with timestamp 4-8, and so on). Since the topology basically doubles the
> > elements form the source I would expect to get these windows:
> > Window: 0, 0, 1, 1, 2, 2, 3, 3
> > Window: 4, 4, 6, 6, 7, 7, 8, 8
> >
> > The output is this, however:
> > Window: 0, 1, 2, 3,
> > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> > Window: 8, 9, 10, 11,
> > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> > Window: 16, 17, 18, 19,
> > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> > Window: 24, 25, 26, 27,
> > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
> >
> > The reason is that the elements simply arrive out-of-order. Imagine what
> > would happen if the elements actually arrived with some delay from
> > different operations.
> >
> > Now, on to the performance numbers. The proof-of-concept I created is
> > available here:
> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The
> basic
> > idea is that sources assign the current timestamp when emitting elements.
> > They also periodically emit watermarks that tell us that no elements with
> > an earlier timestamp will be emitted. The watermarks propagate through
> the
> > operators. The window operator looks at the timestamp of an element and
> > puts it into the buffer that corresponds to that window. When the window
> > operator receives a watermark it will look at the in-flight windows
> > (basically the buffers) and emit those windows where the window-end is
> > before the watermark.
> >
> > For measuring throughput I did the following: The source emits tuples of
> > the form ("tuple", 1) in an infinite loop. The window operator sums up
> the
> > tuples, thereby counting how many tuples the window operator can handle
> in
> > a given time window. There are two different implementations for the
> > summation: 1) simply summing up the values in a mapWindow(), there you
> get
> > a List of all tuples and simple iterate over it. 2) using sum(1), which
> is
> > implemented as a reduce() (that uses the pre-reducer optimisations).
> >
> > These are the performance numbers (Current is the current implementation,
> > Next is my proof-of-concept):
> >
> > Tumbling (1 sec):
> >  - Current/Map: 1.6 mio
> >  - Current/Reduce: 2 mio
> >  - Next/Map: 2.2 mio
> >  - Next/Reduce: 4 mio
> >
> > Sliding (5 sec, slide 1 sec):
> >  - Current/Map: ca 3 mio (fluctuates a lot)
> >  - Current/Reduce: No output
> >  - Next/Map: ca 4 mio (fluctuates)
> >  - Next/Reduce: 10 mio
> >
> > The Next/Reduce variant can basically scale indefinitely with window size
> > because the internal state does not rely on the number of elements (it is
> > just the current sum). The pre-reducer for sliding elements cannot handle
> > the amount of tuples, it produces no output. For the two Map variants the
> > performance fluctuates because they always keep all the elements in an
> > internal buffer before emission, this seems to tax the garbage collector
> a
> > bit and leads to random pauses.
> >
> > One thing that should be noted is that I had to disable the fake-element
> > emission thread, otherwise the Current versions would deadlock.
> >
> > So, I started working on this because I thought that out-of-order
> > processing would be necessary for correctness. And it is certainly, But
> the
> > proof-of-concept also shows that performance can be greatly improved.
> >
> > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote:
> >>
> >> I agree lets separate these topics from each other so we can get faster
> >> resolution.
> >>
> >> There is already a state discussion in the thread we started with Paris.
> >>
> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]>
> > wrote:
> >>
> >> > I agree with supporting out-of-order out of the box :-), even if this
> > means
> >> > a major refactoring. This is the right time to refactor the streaming
> > API
> >> > before we pull it out of beta. I think that this is more important
> than
> > new
> >> > features in the streaming API, which can be prioritized once the API
> is
> > out
> >> > of beta (meaning, that IMO this is the right time to stall PRs until
> we
> >> > agree on the design).
> >> >
> >> > There are three sections in the document: windowing, state, and API.
> How
> >> > convoluted are those with each other? Can we separate the discussion
> or
> > do
> >> > we need to discuss those all together? I think part of the difficulty
> is
> >> > that we are discussing three design choices at once.
> >> >
> >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]>
> >> > wrote:
> >> >
> >> > > Out of order is ubiquitous in the real-world.  Typically, what
> > happens is
> >> > > that businesses will declare a maximum allowable delay for delayed
> >> > > transactions and will commit to results when that delay is reached.
> >> > > Transactions that arrive later than this cutoff are collected
> > specially
> >> > as
> >> > > corrections which are reported/used when possible.
> >> > >
> >> > > Clearly, ordering can also be violated during processing, but if the
> > data
> >> > > is originally out of order the situation can't be repaired by any
> >> > protocol
> >> > > fixes that prevent transactions from becoming disordered but has to
> >> > handled
> >> > > at the data level.
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
> [hidden email]
> >>
> >> > > wrote:
> >> > >
> >> > > > I also don't like big changes but sometimes they are necessary.
> The
> >> > > reason
> >> > > > why I'm so adamant about out-of-order processing is that
> > out-of-order
> >> > > > elements are not some exception that occurs once in a while; they
> > occur
> >> > > > constantly in a distributed system. For example, in this:
> >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the
> resulting
> >> > > > windows
> >> > > > are completely bogus because the current windowing system assumes
> >> > > elements
> >> > > > to globally arrive in order, which is simply not true. (The
> example
> >> > has a
> >> > > > source that generates increasing integers. Then these pass
> through a
> >> > map
> >> > > > and are unioned with the original DataStream before a window
> > operator.)
> >> > > > This simulates elements arriving from different operators at a
> >> > windowing
> >> > > > operator. The example is also DOP=1, I imagine this to get worse
> > with
> >> > > > higher DOP.
> >> > > >
> >> > > > What do you mean by costly? As I said, I have a proof-of-concept
> >> > > windowing
> >> > > > operator that can handle out-or-order elements. This is an example
> >> > using
> >> > > > the current Flink API:
> >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> >> > > > (It is an infinite source of tuples and a 5 second window operator
> > that
> >> > > > counts the tuples.) The first problem is that this code deadlocks
> >> > because
> >> > > > of the thread that emits fake elements. If I disable the fake
> > element
> >> > > code
> >> > > > it works, but the throughput using my mockup is 4 times higher .
> The
> >> > gap
> >> > > > widens dramatically if the window size increases.
> >> > > >
> >> > > > So, it actually increases performance (unless I'm making a mistake
> > in
> >> > my
> >> > > > explorations) and can handle elements that arrive out-of-order
> > (which
> >> > > > happens basically always in a real-world windowing use-cases).
> >> > > >
> >> > > >
> >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]>
> wrote:
> >> > > >
> >> > > > > What I like a lot about Aljoscha's proposed design is that we
> > need no
> >> > > > > different code for "system time" vs. "event time". It only
> > differs in
> >> > > > where
> >> > > > > the timestamps are assigned.
> >> > > > >
> >> > > > > The OOP approach also gives you the semantics of total ordering
> >> > without
> >> > > > > imposing merges on the streams.
> >> > > > >
> >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> >> > > > > [hidden email]> wrote:
> >> > > > >
> >> > > > > > I agree that there should be multiple alternatives the user(!)
> > can
> >> > > > > > choose from. Partial out-of-order processing works for
> many/most
> >> > > > > > aggregates. However, if you consider Event-Pattern-Matching,
> > global
> >> > > > > > ordering in necessary (even if the performance penalty might
> be
> >> > > high).
> >> > > > > >
> >> > > > > > I would also keep "system-time windows" as an alternative to
> >> > "source
> >> > > > > > assigned ts-windows".
> >> > > > > >
> >> > > > > > It might also be interesting to consider the following paper
> for
> >> > > > > > overlapping windows: "Resource sharing in continuous
> > sliding-window
> >> > > > > > aggregates"
> >> > > > > >
> >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720
> >> > > > > >
> >> > > > > > -Matthias
> >> > > > > >
> >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> >> > > > > > > Hey
> >> > > > > > >
> >> > > > > > > I think we should not block PRs unnecessarily if your
> > suggested
> >> > > > changes
> >> > > > > > > might touch them at some point.
> >> > > > > > >
> >> > > > > > > Also I still think we should not put everything in the
> > Datastream
> >> > > > > because
> >> > > > > > > it will be a huge mess.
> >> > > > > > >
> >> > > > > > > Also we need to agree on the out of order processing,
> whether
> > we
> >> > > want
> >> > > > > it
> >> > > > > > > the way you proposed it(which is quite costly). Another
> >> > alternative
> >> > > > > > > approach there which fits in the current windowing is to
> > filter
> >> > out
> >> > > > if
> >> > > > > > > order events and apply a special handling operator on them.
> > This
> >> > > > would
> >> > > > > be
> >> > > > > > > fairly lightweight.
> >> > > > > > >
> >> > > > > > > My point is that we need to consider some alternative
> > solutions.
> >> > > And
> >> > > > we
> >> > > > > > > should not block contributions along the way.
> >> > > > > > >
> >> > > > > > > Cheers
> >> > > > > > > Gyula
> >> > > > > > >
> >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> >> > > > [hidden email]>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> The reason I posted this now is that we need to think about
> > the
> >> > > API
> >> > > > > and
> >> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse
> >> > reduce)
> >> > > > and
> >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream).
> >> > > > > > >>
> >> > > > > > >> For the windowing, I think that the current model does not
> > work
> >> > > for
> >> > > > > > >> out-of-order processing. Therefore, the whole windowing
> >> > > > infrastructure
> >> > > > > > will
> >> > > > > > >> basically have to be redone. Meaning also that any work on
> > the
> >> > > > > > >> pre-aggregators or optimizations that we do now becomes
> > useless.
> >> > > > > > >>
> >> > > > > > >> For the API, I proposed to restructure the interactions
> > between
> >> > > all
> >> > > > > the
> >> > > > > > >> different *DataStream classes and grouping/windowing. (See
> > API
> >> > > > section
> >> > > > > > of
> >> > > > > > >> the doc I posted.)
> >> > > > > > >>
> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
> [hidden email]
> >>
> >> > > > wrote:
> >> > > > > > >>
> >> > > > > > >>> Hi Aljoscha,
> >> > > > > > >>>
> >> > > > > > >>> Thanks for the nice summary, this is a very good
> initiative.
> >> > > > > > >>>
> >> > > > > > >>> I added some comments to the respective sections (where I
> > didnt
> >> > > > fully
> >> > > > > > >> agree
> >> > > > > > >>> :).).
> >> > > > > > >>> At some point I think it would be good to have a public
> > hangout
> >> > > > > session
> >> > > > > > >> on
> >> > > > > > >>> this, which could make a more dynamic discussion.
> >> > > > > > >>>
> >> > > > > > >>> Cheers,
> >> > > > > > >>> Gyula
> >> > > > > > >>>
> >> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont:
> >> > 2015.
> >> > > > jún.
> >> > > > > > >> 22.,
> >> > > > > > >>> H, 21:34):
> >> > > > > > >>>
> >> > > > > > >>>> Hi,
> >> > > > > > >>>> with people proposing changes to the streaming part I
> also
> >> > > wanted
> >> > > > to
> >> > > > > > >>> throw
> >> > > > > > >>>> my hat into the ring. :D
> >> > > > > > >>>>
> >> > > > > > >>>> During the last few months, while I was getting
> acquainted
> >> > with
> >> > > > the
> >> > > > > > >>>> streaming system, I wrote down some thoughts I had about
> > how
> >> > > > things
> >> > > > > > >> could
> >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent
> shape
> >> > now,
> >> > > > so
> >> > > > > > >>> please
> >> > > > > > >>>> have a look if you are interested in this:
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> >> > > > > > >>>>
> >> > > > > > >>>> This mostly covers:
> >> > > > > > >>>>  - Timestamps assigned at sources
> >> > > > > > >>>>  - Out-of-order processing of elements in window
> operators
> >> > > > > > >>>>  - API design
> >> > > > > > >>>>
> >> > > > > > >>>> Please let me know what you think. Comment in the
> document
> > or
> >> > > here
> >> > > > > in
> >> > > > > > >> the
> >> > > > > > >>>> mailing list.
> >> > > > > > >>>>
> >> > > > > > >>>> I have a PR in the makings that would introduce source
> >> > > timestamps
> >> > > > > and
> >> > > > > > >>>> watermarks for keeping track of them. I also hacked a
> >> > > > > proof-of-concept
> >> > > > > > >>> of a
> >> > > > > > >>>> windowing system that is able to process out-of-order
> > elements
> >> > > > > using a
> >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient
> >> > > > > > >> pre-aggregations.)
> >> > > > > > >>>>
> >> > > > > > >>>> Cheers,
> >> > > > > > >>>> Aljoscha
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Gábor Gévay
I'm very sorry, I had a bug in the InversePreReducer. It should be
fixed now. Can you please run it again?

I also tried to reproduce some of your performance numbers, but I'm
getting only less than 1/10th of yours. For example, in the Tumbling
case, Current/Reduce produces only ~100000 for me. Do you have any
idea what I could be doing wrong? My code:
http://pastebin.com/zbEjmGhk
I am running it on a 2 GHz Core i7.

Best regards,
Gabor


2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>:

> Hi,
> I also ran the tests on top of PR 856 (inverse reducer) now. The results
> seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all
> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec)
> (Theoretically there would be 5000 tuples in 5 seconds but this is due to
> overhead). These are the results for the inverse reduce optimisation:
> (Tuple 0,38)
> (Tuple 0,829)
> (Tuple 0,1625)
> (Tuple 0,2424)
> (Tuple 0,3190)
> (Tuple 0,3198)
> (Tuple 0,-339368)
> (Tuple 0,-1315725)
> (Tuple 0,-2932932)
> (Tuple 0,-5082735)
> (Tuple 0,-7743256)
> (Tuple 0,75701046)
> (Tuple 0,642829470)
> (Tuple 0,2242018381)
> (Tuple 0,5190708618)
> (Tuple 0,10060360311)
> (Tuple 0,-94254951)
> (Tuple 0,-219806321293)
> (Tuple 0,-1258895232699)
> (Tuple 0,-4074432596329)
>
> One line is one emitted window count. This is what happens when I remove
> the Thread.sleep(1):
> (Tuple 0,660676)
> (Tuple 0,2553733)
> (Tuple 0,3542696)
> (Tuple 0,1)
> (Tuple 0,1107035)
> (Tuple 0,2549491)
> (Tuple 0,4100387)
> (Tuple 0,-8406583360092)
> (Tuple 0,-8406582150743)
> (Tuple 0,-8406580427190)
> (Tuple 0,-8406580427190)
> (Tuple 0,-8406580427190)
> (Tuple 0,6847279255682044995)
> (Tuple 0,6847279255682044995)
> (Tuple 0,-5390528042713628318)
> (Tuple 0,-5390528042711551780)
> (Tuple 0,-5390528042711551780)
>
> So at some point the pre-reducer seems to go haywire and does not recover
> from it. The good thing is that it does produce results now, where the
> previous Current/Reduce would simply hang and not produce any output.
>
> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote:
>
>> Hello,
>>
>> Aljoscha, can you please try the performance test of Current/Reduce
>> with the InversePreReducer in PR 856? (If you just call sum, it will
>> use an InversePreReducer.) It would be an interesting test, because
>> the inverse function optimization really depends on the stream being
>> ordered, and I think it has the potential of being faster then
>> Next/Reduce. Especially if the window size is much larger than the
>> slide size.
>>
>> Best regards,
>> Gabor
>>
>>
>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>> > I think I'll have to elaborate a bit so I created a proof-of-concept
>> > implementation of my Ideas and ran some throughput measurements to
>> > alleviate concerns about performance.
>> >
>> > First, though, I want to highlight again why the current approach does
>> not
>> > work with out-of-order elements (which, again, occur constantly due to
>> the
>> > distributed nature of the system). This is the example I posted earlier:
>> > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks
>> like
>> > this:
>> >
>> > +--+
>> > | | Source
>> > +--+
>> > |
>> > +-----+
>> > | |
>> > | +--+
>> > | | | Identity Map
>> > | +--+
>> > | |
>> > +-----+
>> > |
>> > +--+
>> > | | Window
>> > +--+
>> > |
>> > |
>> > +--+
>> > | | Sink
>> > +--+
>> >
>> > So all it does is pass the elements through an identity map and then
>> merge
>> > them again before the window operator. The source emits ascending
>> integers
>> > and the window operator has a custom timestamp extractor that uses the
>> > integer itself as the timestamp and should create windows of size 4 (that
>> > is elements with timestamp 0-3 are one window, the next are the elements
>> > with timestamp 4-8, and so on). Since the topology basically doubles the
>> > elements form the source I would expect to get these windows:
>> > Window: 0, 0, 1, 1, 2, 2, 3, 3
>> > Window: 4, 4, 6, 6, 7, 7, 8, 8
>> >
>> > The output is this, however:
>> > Window: 0, 1, 2, 3,
>> > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>> > Window: 8, 9, 10, 11,
>> > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>> > Window: 16, 17, 18, 19,
>> > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
>> > Window: 24, 25, 26, 27,
>> > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>> >
>> > The reason is that the elements simply arrive out-of-order. Imagine what
>> > would happen if the elements actually arrived with some delay from
>> > different operations.
>> >
>> > Now, on to the performance numbers. The proof-of-concept I created is
>> > available here:
>> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The
>> basic
>> > idea is that sources assign the current timestamp when emitting elements.
>> > They also periodically emit watermarks that tell us that no elements with
>> > an earlier timestamp will be emitted. The watermarks propagate through
>> the
>> > operators. The window operator looks at the timestamp of an element and
>> > puts it into the buffer that corresponds to that window. When the window
>> > operator receives a watermark it will look at the in-flight windows
>> > (basically the buffers) and emit those windows where the window-end is
>> > before the watermark.
>> >
>> > For measuring throughput I did the following: The source emits tuples of
>> > the form ("tuple", 1) in an infinite loop. The window operator sums up
>> the
>> > tuples, thereby counting how many tuples the window operator can handle
>> in
>> > a given time window. There are two different implementations for the
>> > summation: 1) simply summing up the values in a mapWindow(), there you
>> get
>> > a List of all tuples and simple iterate over it. 2) using sum(1), which
>> is
>> > implemented as a reduce() (that uses the pre-reducer optimisations).
>> >
>> > These are the performance numbers (Current is the current implementation,
>> > Next is my proof-of-concept):
>> >
>> > Tumbling (1 sec):
>> >  - Current/Map: 1.6 mio
>> >  - Current/Reduce: 2 mio
>> >  - Next/Map: 2.2 mio
>> >  - Next/Reduce: 4 mio
>> >
>> > Sliding (5 sec, slide 1 sec):
>> >  - Current/Map: ca 3 mio (fluctuates a lot)
>> >  - Current/Reduce: No output
>> >  - Next/Map: ca 4 mio (fluctuates)
>> >  - Next/Reduce: 10 mio
>> >
>> > The Next/Reduce variant can basically scale indefinitely with window size
>> > because the internal state does not rely on the number of elements (it is
>> > just the current sum). The pre-reducer for sliding elements cannot handle
>> > the amount of tuples, it produces no output. For the two Map variants the
>> > performance fluctuates because they always keep all the elements in an
>> > internal buffer before emission, this seems to tax the garbage collector
>> a
>> > bit and leads to random pauses.
>> >
>> > One thing that should be noted is that I had to disable the fake-element
>> > emission thread, otherwise the Current versions would deadlock.
>> >
>> > So, I started working on this because I thought that out-of-order
>> > processing would be necessary for correctness. And it is certainly, But
>> the
>> > proof-of-concept also shows that performance can be greatly improved.
>> >
>> > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote:
>> >>
>> >> I agree lets separate these topics from each other so we can get faster
>> >> resolution.
>> >>
>> >> There is already a state discussion in the thread we started with Paris.
>> >>
>> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]>
>> > wrote:
>> >>
>> >> > I agree with supporting out-of-order out of the box :-), even if this
>> > means
>> >> > a major refactoring. This is the right time to refactor the streaming
>> > API
>> >> > before we pull it out of beta. I think that this is more important
>> than
>> > new
>> >> > features in the streaming API, which can be prioritized once the API
>> is
>> > out
>> >> > of beta (meaning, that IMO this is the right time to stall PRs until
>> we
>> >> > agree on the design).
>> >> >
>> >> > There are three sections in the document: windowing, state, and API.
>> How
>> >> > convoluted are those with each other? Can we separate the discussion
>> or
>> > do
>> >> > we need to discuss those all together? I think part of the difficulty
>> is
>> >> > that we are discussing three design choices at once.
>> >> >
>> >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]>
>> >> > wrote:
>> >> >
>> >> > > Out of order is ubiquitous in the real-world.  Typically, what
>> > happens is
>> >> > > that businesses will declare a maximum allowable delay for delayed
>> >> > > transactions and will commit to results when that delay is reached.
>> >> > > Transactions that arrive later than this cutoff are collected
>> > specially
>> >> > as
>> >> > > corrections which are reported/used when possible.
>> >> > >
>> >> > > Clearly, ordering can also be violated during processing, but if the
>> > data
>> >> > > is originally out of order the situation can't be repaired by any
>> >> > protocol
>> >> > > fixes that prevent transactions from becoming disordered but has to
>> >> > handled
>> >> > > at the data level.
>> >> > >
>> >> > >
>> >> > >
>> >> > >
>> >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>> [hidden email]
>> >>
>> >> > > wrote:
>> >> > >
>> >> > > > I also don't like big changes but sometimes they are necessary.
>> The
>> >> > > reason
>> >> > > > why I'm so adamant about out-of-order processing is that
>> > out-of-order
>> >> > > > elements are not some exception that occurs once in a while; they
>> > occur
>> >> > > > constantly in a distributed system. For example, in this:
>> >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the
>> resulting
>> >> > > > windows
>> >> > > > are completely bogus because the current windowing system assumes
>> >> > > elements
>> >> > > > to globally arrive in order, which is simply not true. (The
>> example
>> >> > has a
>> >> > > > source that generates increasing integers. Then these pass
>> through a
>> >> > map
>> >> > > > and are unioned with the original DataStream before a window
>> > operator.)
>> >> > > > This simulates elements arriving from different operators at a
>> >> > windowing
>> >> > > > operator. The example is also DOP=1, I imagine this to get worse
>> > with
>> >> > > > higher DOP.
>> >> > > >
>> >> > > > What do you mean by costly? As I said, I have a proof-of-concept
>> >> > > windowing
>> >> > > > operator that can handle out-or-order elements. This is an example
>> >> > using
>> >> > > > the current Flink API:
>> >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>> >> > > > (It is an infinite source of tuples and a 5 second window operator
>> > that
>> >> > > > counts the tuples.) The first problem is that this code deadlocks
>> >> > because
>> >> > > > of the thread that emits fake elements. If I disable the fake
>> > element
>> >> > > code
>> >> > > > it works, but the throughput using my mockup is 4 times higher .
>> The
>> >> > gap
>> >> > > > widens dramatically if the window size increases.
>> >> > > >
>> >> > > > So, it actually increases performance (unless I'm making a mistake
>> > in
>> >> > my
>> >> > > > explorations) and can handle elements that arrive out-of-order
>> > (which
>> >> > > > happens basically always in a real-world windowing use-cases).
>> >> > > >
>> >> > > >
>> >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]>
>> wrote:
>> >> > > >
>> >> > > > > What I like a lot about Aljoscha's proposed design is that we
>> > need no
>> >> > > > > different code for "system time" vs. "event time". It only
>> > differs in
>> >> > > > where
>> >> > > > > the timestamps are assigned.
>> >> > > > >
>> >> > > > > The OOP approach also gives you the semantics of total ordering
>> >> > without
>> >> > > > > imposing merges on the streams.
>> >> > > > >
>> >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>> >> > > > > [hidden email]> wrote:
>> >> > > > >
>> >> > > > > > I agree that there should be multiple alternatives the user(!)
>> > can
>> >> > > > > > choose from. Partial out-of-order processing works for
>> many/most
>> >> > > > > > aggregates. However, if you consider Event-Pattern-Matching,
>> > global
>> >> > > > > > ordering in necessary (even if the performance penalty might
>> be
>> >> > > high).
>> >> > > > > >
>> >> > > > > > I would also keep "system-time windows" as an alternative to
>> >> > "source
>> >> > > > > > assigned ts-windows".
>> >> > > > > >
>> >> > > > > > It might also be interesting to consider the following paper
>> for
>> >> > > > > > overlapping windows: "Resource sharing in continuous
>> > sliding-window
>> >> > > > > > aggregates"
>> >> > > > > >
>> >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720
>> >> > > > > >
>> >> > > > > > -Matthias
>> >> > > > > >
>> >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>> >> > > > > > > Hey
>> >> > > > > > >
>> >> > > > > > > I think we should not block PRs unnecessarily if your
>> > suggested
>> >> > > > changes
>> >> > > > > > > might touch them at some point.
>> >> > > > > > >
>> >> > > > > > > Also I still think we should not put everything in the
>> > Datastream
>> >> > > > > because
>> >> > > > > > > it will be a huge mess.
>> >> > > > > > >
>> >> > > > > > > Also we need to agree on the out of order processing,
>> whether
>> > we
>> >> > > want
>> >> > > > > it
>> >> > > > > > > the way you proposed it(which is quite costly). Another
>> >> > alternative
>> >> > > > > > > approach there which fits in the current windowing is to
>> > filter
>> >> > out
>> >> > > > if
>> >> > > > > > > order events and apply a special handling operator on them.
>> > This
>> >> > > > would
>> >> > > > > be
>> >> > > > > > > fairly lightweight.
>> >> > > > > > >
>> >> > > > > > > My point is that we need to consider some alternative
>> > solutions.
>> >> > > And
>> >> > > > we
>> >> > > > > > > should not block contributions along the way.
>> >> > > > > > >
>> >> > > > > > > Cheers
>> >> > > > > > > Gyula
>> >> > > > > > >
>> >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>> >> > > > [hidden email]>
>> >> > > > > > > wrote:
>> >> > > > > > >
>> >> > > > > > >> The reason I posted this now is that we need to think about
>> > the
>> >> > > API
>> >> > > > > and
>> >> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse
>> >> > reduce)
>> >> > > > and
>> >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream).
>> >> > > > > > >>
>> >> > > > > > >> For the windowing, I think that the current model does not
>> > work
>> >> > > for
>> >> > > > > > >> out-of-order processing. Therefore, the whole windowing
>> >> > > > infrastructure
>> >> > > > > > will
>> >> > > > > > >> basically have to be redone. Meaning also that any work on
>> > the
>> >> > > > > > >> pre-aggregators or optimizations that we do now becomes
>> > useless.
>> >> > > > > > >>
>> >> > > > > > >> For the API, I proposed to restructure the interactions
>> > between
>> >> > > all
>> >> > > > > the
>> >> > > > > > >> different *DataStream classes and grouping/windowing. (See
>> > API
>> >> > > > section
>> >> > > > > > of
>> >> > > > > > >> the doc I posted.)
>> >> > > > > > >>
>> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>> [hidden email]
>> >>
>> >> > > > wrote:
>> >> > > > > > >>
>> >> > > > > > >>> Hi Aljoscha,
>> >> > > > > > >>>
>> >> > > > > > >>> Thanks for the nice summary, this is a very good
>> initiative.
>> >> > > > > > >>>
>> >> > > > > > >>> I added some comments to the respective sections (where I
>> > didnt
>> >> > > > fully
>> >> > > > > > >> agree
>> >> > > > > > >>> :).).
>> >> > > > > > >>> At some point I think it would be good to have a public
>> > hangout
>> >> > > > > session
>> >> > > > > > >> on
>> >> > > > > > >>> this, which could make a more dynamic discussion.
>> >> > > > > > >>>
>> >> > > > > > >>> Cheers,
>> >> > > > > > >>> Gyula
>> >> > > > > > >>>
>> >> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont:
>> >> > 2015.
>> >> > > > jún.
>> >> > > > > > >> 22.,
>> >> > > > > > >>> H, 21:34):
>> >> > > > > > >>>
>> >> > > > > > >>>> Hi,
>> >> > > > > > >>>> with people proposing changes to the streaming part I
>> also
>> >> > > wanted
>> >> > > > to
>> >> > > > > > >>> throw
>> >> > > > > > >>>> my hat into the ring. :D
>> >> > > > > > >>>>
>> >> > > > > > >>>> During the last few months, while I was getting
>> acquainted
>> >> > with
>> >> > > > the
>> >> > > > > > >>>> streaming system, I wrote down some thoughts I had about
>> > how
>> >> > > > things
>> >> > > > > > >> could
>> >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent
>> shape
>> >> > now,
>> >> > > > so
>> >> > > > > > >>> please
>> >> > > > > > >>>> have a look if you are interested in this:
>> >> > > > > > >>>>
>> >> > > > > > >>>>
>> >> > > > > > >>>
>> >> > > > > > >>
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>> >> > > > > > >>>>
>> >> > > > > > >>>> This mostly covers:
>> >> > > > > > >>>>  - Timestamps assigned at sources
>> >> > > > > > >>>>  - Out-of-order processing of elements in window
>> operators
>> >> > > > > > >>>>  - API design
>> >> > > > > > >>>>
>> >> > > > > > >>>> Please let me know what you think. Comment in the
>> document
>> > or
>> >> > > here
>> >> > > > > in
>> >> > > > > > >> the
>> >> > > > > > >>>> mailing list.
>> >> > > > > > >>>>
>> >> > > > > > >>>> I have a PR in the makings that would introduce source
>> >> > > timestamps
>> >> > > > > and
>> >> > > > > > >>>> watermarks for keeping track of them. I also hacked a
>> >> > > > > proof-of-concept
>> >> > > > > > >>> of a
>> >> > > > > > >>>> windowing system that is able to process out-of-order
>> > elements
>> >> > > > > using a
>> >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient
>> >> > > > > > >> pre-aggregations.)
>> >> > > > > > >>>>
>> >> > > > > > >>>> Cheers,
>> >> > > > > > >>>> Aljoscha
>> >> > > > > > >>>>
>> >> > > > > > >>>
>> >> > > > > > >>
>> >> > > > > > >
>> >> > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Aljoscha Krettek-2
Yes, now this also processes about 3 mio Elements (Window Size 5 sec, Slide
1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.

Performance is not my main concern, however. My concern is that the current
model assumes elements to arrive in order, which is simply not true.

In your code you have these lines for specifying the window:
.window(Time.of(1l, TimeUnit.SECONDS))
.every(Time.of(1l, TimeUnit.SECONDS))

Although this semantically specifies a tumbling window of size 1 sec I'm
afraid it uses the sliding window logic internally (because of the
.every()).

In my tests I only have the first line.


On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <[hidden email]> wrote:

> I'm very sorry, I had a bug in the InversePreReducer. It should be
> fixed now. Can you please run it again?
>
> I also tried to reproduce some of your performance numbers, but I'm
> getting only less than 1/10th of yours. For example, in the Tumbling
> case, Current/Reduce produces only ~100000 for me. Do you have any
> idea what I could be doing wrong? My code:
> http://pastebin.com/zbEjmGhk
> I am running it on a 2 GHz Core i7.
>
> Best regards,
> Gabor
>
>
> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>:
> > Hi,
> > I also ran the tests on top of PR 856 (inverse reducer) now. The results
> > seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all
> > the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec)
> > (Theoretically there would be 5000 tuples in 5 seconds but this is due to
> > overhead). These are the results for the inverse reduce optimisation:
> > (Tuple 0,38)
> > (Tuple 0,829)
> > (Tuple 0,1625)
> > (Tuple 0,2424)
> > (Tuple 0,3190)
> > (Tuple 0,3198)
> > (Tuple 0,-339368)
> > (Tuple 0,-1315725)
> > (Tuple 0,-2932932)
> > (Tuple 0,-5082735)
> > (Tuple 0,-7743256)
> > (Tuple 0,75701046)
> > (Tuple 0,642829470)
> > (Tuple 0,2242018381)
> > (Tuple 0,5190708618)
> > (Tuple 0,10060360311)
> > (Tuple 0,-94254951)
> > (Tuple 0,-219806321293)
> > (Tuple 0,-1258895232699)
> > (Tuple 0,-4074432596329)
> >
> > One line is one emitted window count. This is what happens when I remove
> > the Thread.sleep(1):
> > (Tuple 0,660676)
> > (Tuple 0,2553733)
> > (Tuple 0,3542696)
> > (Tuple 0,1)
> > (Tuple 0,1107035)
> > (Tuple 0,2549491)
> > (Tuple 0,4100387)
> > (Tuple 0,-8406583360092)
> > (Tuple 0,-8406582150743)
> > (Tuple 0,-8406580427190)
> > (Tuple 0,-8406580427190)
> > (Tuple 0,-8406580427190)
> > (Tuple 0,6847279255682044995)
> > (Tuple 0,6847279255682044995)
> > (Tuple 0,-5390528042713628318)
> > (Tuple 0,-5390528042711551780)
> > (Tuple 0,-5390528042711551780)
> >
> > So at some point the pre-reducer seems to go haywire and does not recover
> > from it. The good thing is that it does produce results now, where the
> > previous Current/Reduce would simply hang and not produce any output.
> >
> > On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote:
> >
> >> Hello,
> >>
> >> Aljoscha, can you please try the performance test of Current/Reduce
> >> with the InversePreReducer in PR 856? (If you just call sum, it will
> >> use an InversePreReducer.) It would be an interesting test, because
> >> the inverse function optimization really depends on the stream being
> >> ordered, and I think it has the potential of being faster then
> >> Next/Reduce. Especially if the window size is much larger than the
> >> slide size.
> >>
> >> Best regards,
> >> Gabor
> >>
> >>
> >> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>:
> >> > I think I'll have to elaborate a bit so I created a proof-of-concept
> >> > implementation of my Ideas and ran some throughput measurements to
> >> > alleviate concerns about performance.
> >> >
> >> > First, though, I want to highlight again why the current approach does
> >> not
> >> > work with out-of-order elements (which, again, occur constantly due to
> >> the
> >> > distributed nature of the system). This is the example I posted
> earlier:
> >> > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks
> >> like
> >> > this:
> >> >
> >> > +--+
> >> > | | Source
> >> > +--+
> >> > |
> >> > +-----+
> >> > | |
> >> > | +--+
> >> > | | | Identity Map
> >> > | +--+
> >> > | |
> >> > +-----+
> >> > |
> >> > +--+
> >> > | | Window
> >> > +--+
> >> > |
> >> > |
> >> > +--+
> >> > | | Sink
> >> > +--+
> >> >
> >> > So all it does is pass the elements through an identity map and then
> >> merge
> >> > them again before the window operator. The source emits ascending
> >> integers
> >> > and the window operator has a custom timestamp extractor that uses the
> >> > integer itself as the timestamp and should create windows of size 4
> (that
> >> > is elements with timestamp 0-3 are one window, the next are the
> elements
> >> > with timestamp 4-8, and so on). Since the topology basically doubles
> the
> >> > elements form the source I would expect to get these windows:
> >> > Window: 0, 0, 1, 1, 2, 2, 3, 3
> >> > Window: 4, 4, 6, 6, 7, 7, 8, 8
> >> >
> >> > The output is this, however:
> >> > Window: 0, 1, 2, 3,
> >> > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> >> > Window: 8, 9, 10, 11,
> >> > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> >> > Window: 16, 17, 18, 19,
> >> > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> >> > Window: 24, 25, 26, 27,
> >> > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
> >> >
> >> > The reason is that the elements simply arrive out-of-order. Imagine
> what
> >> > would happen if the elements actually arrived with some delay from
> >> > different operations.
> >> >
> >> > Now, on to the performance numbers. The proof-of-concept I created is
> >> > available here:
> >> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The
> >> basic
> >> > idea is that sources assign the current timestamp when emitting
> elements.
> >> > They also periodically emit watermarks that tell us that no elements
> with
> >> > an earlier timestamp will be emitted. The watermarks propagate through
> >> the
> >> > operators. The window operator looks at the timestamp of an element
> and
> >> > puts it into the buffer that corresponds to that window. When the
> window
> >> > operator receives a watermark it will look at the in-flight windows
> >> > (basically the buffers) and emit those windows where the window-end is
> >> > before the watermark.
> >> >
> >> > For measuring throughput I did the following: The source emits tuples
> of
> >> > the form ("tuple", 1) in an infinite loop. The window operator sums up
> >> the
> >> > tuples, thereby counting how many tuples the window operator can
> handle
> >> in
> >> > a given time window. There are two different implementations for the
> >> > summation: 1) simply summing up the values in a mapWindow(), there you
> >> get
> >> > a List of all tuples and simple iterate over it. 2) using sum(1),
> which
> >> is
> >> > implemented as a reduce() (that uses the pre-reducer optimisations).
> >> >
> >> > These are the performance numbers (Current is the current
> implementation,
> >> > Next is my proof-of-concept):
> >> >
> >> > Tumbling (1 sec):
> >> >  - Current/Map: 1.6 mio
> >> >  - Current/Reduce: 2 mio
> >> >  - Next/Map: 2.2 mio
> >> >  - Next/Reduce: 4 mio
> >> >
> >> > Sliding (5 sec, slide 1 sec):
> >> >  - Current/Map: ca 3 mio (fluctuates a lot)
> >> >  - Current/Reduce: No output
> >> >  - Next/Map: ca 4 mio (fluctuates)
> >> >  - Next/Reduce: 10 mio
> >> >
> >> > The Next/Reduce variant can basically scale indefinitely with window
> size
> >> > because the internal state does not rely on the number of elements
> (it is
> >> > just the current sum). The pre-reducer for sliding elements cannot
> handle
> >> > the amount of tuples, it produces no output. For the two Map variants
> the
> >> > performance fluctuates because they always keep all the elements in an
> >> > internal buffer before emission, this seems to tax the garbage
> collector
> >> a
> >> > bit and leads to random pauses.
> >> >
> >> > One thing that should be noted is that I had to disable the
> fake-element
> >> > emission thread, otherwise the Current versions would deadlock.
> >> >
> >> > So, I started working on this because I thought that out-of-order
> >> > processing would be necessary for correctness. And it is certainly,
> But
> >> the
> >> > proof-of-concept also shows that performance can be greatly improved.
> >> >
> >> > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote:
> >> >>
> >> >> I agree lets separate these topics from each other so we can get
> faster
> >> >> resolution.
> >> >>
> >> >> There is already a state discussion in the thread we started with
> Paris.
> >> >>
> >> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]>
> >> > wrote:
> >> >>
> >> >> > I agree with supporting out-of-order out of the box :-), even if
> this
> >> > means
> >> >> > a major refactoring. This is the right time to refactor the
> streaming
> >> > API
> >> >> > before we pull it out of beta. I think that this is more important
> >> than
> >> > new
> >> >> > features in the streaming API, which can be prioritized once the
> API
> >> is
> >> > out
> >> >> > of beta (meaning, that IMO this is the right time to stall PRs
> until
> >> we
> >> >> > agree on the design).
> >> >> >
> >> >> > There are three sections in the document: windowing, state, and
> API.
> >> How
> >> >> > convoluted are those with each other? Can we separate the
> discussion
> >> or
> >> > do
> >> >> > we need to discuss those all together? I think part of the
> difficulty
> >> is
> >> >> > that we are discussing three design choices at once.
> >> >> >
> >> >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
> [hidden email]>
> >> >> > wrote:
> >> >> >
> >> >> > > Out of order is ubiquitous in the real-world.  Typically, what
> >> > happens is
> >> >> > > that businesses will declare a maximum allowable delay for
> delayed
> >> >> > > transactions and will commit to results when that delay is
> reached.
> >> >> > > Transactions that arrive later than this cutoff are collected
> >> > specially
> >> >> > as
> >> >> > > corrections which are reported/used when possible.
> >> >> > >
> >> >> > > Clearly, ordering can also be violated during processing, but if
> the
> >> > data
> >> >> > > is originally out of order the situation can't be repaired by any
> >> >> > protocol
> >> >> > > fixes that prevent transactions from becoming disordered but has
> to
> >> >> > handled
> >> >> > > at the data level.
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
> >> [hidden email]
> >> >>
> >> >> > > wrote:
> >> >> > >
> >> >> > > > I also don't like big changes but sometimes they are necessary.
> >> The
> >> >> > > reason
> >> >> > > > why I'm so adamant about out-of-order processing is that
> >> > out-of-order
> >> >> > > > elements are not some exception that occurs once in a while;
> they
> >> > occur
> >> >> > > > constantly in a distributed system. For example, in this:
> >> >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the
> >> resulting
> >> >> > > > windows
> >> >> > > > are completely bogus because the current windowing system
> assumes
> >> >> > > elements
> >> >> > > > to globally arrive in order, which is simply not true. (The
> >> example
> >> >> > has a
> >> >> > > > source that generates increasing integers. Then these pass
> >> through a
> >> >> > map
> >> >> > > > and are unioned with the original DataStream before a window
> >> > operator.)
> >> >> > > > This simulates elements arriving from different operators at a
> >> >> > windowing
> >> >> > > > operator. The example is also DOP=1, I imagine this to get
> worse
> >> > with
> >> >> > > > higher DOP.
> >> >> > > >
> >> >> > > > What do you mean by costly? As I said, I have a
> proof-of-concept
> >> >> > > windowing
> >> >> > > > operator that can handle out-or-order elements. This is an
> example
> >> >> > using
> >> >> > > > the current Flink API:
> >> >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> >> >> > > > (It is an infinite source of tuples and a 5 second window
> operator
> >> > that
> >> >> > > > counts the tuples.) The first problem is that this code
> deadlocks
> >> >> > because
> >> >> > > > of the thread that emits fake elements. If I disable the fake
> >> > element
> >> >> > > code
> >> >> > > > it works, but the throughput using my mockup is 4 times higher
> .
> >> The
> >> >> > gap
> >> >> > > > widens dramatically if the window size increases.
> >> >> > > >
> >> >> > > > So, it actually increases performance (unless I'm making a
> mistake
> >> > in
> >> >> > my
> >> >> > > > explorations) and can handle elements that arrive out-of-order
> >> > (which
> >> >> > > > happens basically always in a real-world windowing use-cases).
> >> >> > > >
> >> >> > > >
> >> >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]>
> >> wrote:
> >> >> > > >
> >> >> > > > > What I like a lot about Aljoscha's proposed design is that we
> >> > need no
> >> >> > > > > different code for "system time" vs. "event time". It only
> >> > differs in
> >> >> > > > where
> >> >> > > > > the timestamps are assigned.
> >> >> > > > >
> >> >> > > > > The OOP approach also gives you the semantics of total
> ordering
> >> >> > without
> >> >> > > > > imposing merges on the streams.
> >> >> > > > >
> >> >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> >> >> > > > > [hidden email]> wrote:
> >> >> > > > >
> >> >> > > > > > I agree that there should be multiple alternatives the
> user(!)
> >> > can
> >> >> > > > > > choose from. Partial out-of-order processing works for
> >> many/most
> >> >> > > > > > aggregates. However, if you consider
> Event-Pattern-Matching,
> >> > global
> >> >> > > > > > ordering in necessary (even if the performance penalty
> might
> >> be
> >> >> > > high).
> >> >> > > > > >
> >> >> > > > > > I would also keep "system-time windows" as an alternative
> to
> >> >> > "source
> >> >> > > > > > assigned ts-windows".
> >> >> > > > > >
> >> >> > > > > > It might also be interesting to consider the following
> paper
> >> for
> >> >> > > > > > overlapping windows: "Resource sharing in continuous
> >> > sliding-window
> >> >> > > > > > aggregates"
> >> >> > > > > >
> >> >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720
> >> >> > > > > >
> >> >> > > > > > -Matthias
> >> >> > > > > >
> >> >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> >> >> > > > > > > Hey
> >> >> > > > > > >
> >> >> > > > > > > I think we should not block PRs unnecessarily if your
> >> > suggested
> >> >> > > > changes
> >> >> > > > > > > might touch them at some point.
> >> >> > > > > > >
> >> >> > > > > > > Also I still think we should not put everything in the
> >> > Datastream
> >> >> > > > > because
> >> >> > > > > > > it will be a huge mess.
> >> >> > > > > > >
> >> >> > > > > > > Also we need to agree on the out of order processing,
> >> whether
> >> > we
> >> >> > > want
> >> >> > > > > it
> >> >> > > > > > > the way you proposed it(which is quite costly). Another
> >> >> > alternative
> >> >> > > > > > > approach there which fits in the current windowing is to
> >> > filter
> >> >> > out
> >> >> > > > if
> >> >> > > > > > > order events and apply a special handling operator on
> them.
> >> > This
> >> >> > > > would
> >> >> > > > > be
> >> >> > > > > > > fairly lightweight.
> >> >> > > > > > >
> >> >> > > > > > > My point is that we need to consider some alternative
> >> > solutions.
> >> >> > > And
> >> >> > > > we
> >> >> > > > > > > should not block contributions along the way.
> >> >> > > > > > >
> >> >> > > > > > > Cheers
> >> >> > > > > > > Gyula
> >> >> > > > > > >
> >> >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> >> >> > > > [hidden email]>
> >> >> > > > > > > wrote:
> >> >> > > > > > >
> >> >> > > > > > >> The reason I posted this now is that we need to think
> about
> >> > the
> >> >> > > API
> >> >> > > > > and
> >> >> > > > > > >> windowing before proceeding with the PRs of Gabor
> (inverse
> >> >> > reduce)
> >> >> > > > and
> >> >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream).
> >> >> > > > > > >>
> >> >> > > > > > >> For the windowing, I think that the current model does
> not
> >> > work
> >> >> > > for
> >> >> > > > > > >> out-of-order processing. Therefore, the whole windowing
> >> >> > > > infrastructure
> >> >> > > > > > will
> >> >> > > > > > >> basically have to be redone. Meaning also that any work
> on
> >> > the
> >> >> > > > > > >> pre-aggregators or optimizations that we do now becomes
> >> > useless.
> >> >> > > > > > >>
> >> >> > > > > > >> For the API, I proposed to restructure the interactions
> >> > between
> >> >> > > all
> >> >> > > > > the
> >> >> > > > > > >> different *DataStream classes and grouping/windowing.
> (See
> >> > API
> >> >> > > > section
> >> >> > > > > > of
> >> >> > > > > > >> the doc I posted.)
> >> >> > > > > > >>
> >> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
> >> [hidden email]
> >> >>
> >> >> > > > wrote:
> >> >> > > > > > >>
> >> >> > > > > > >>> Hi Aljoscha,
> >> >> > > > > > >>>
> >> >> > > > > > >>> Thanks for the nice summary, this is a very good
> >> initiative.
> >> >> > > > > > >>>
> >> >> > > > > > >>> I added some comments to the respective sections
> (where I
> >> > didnt
> >> >> > > > fully
> >> >> > > > > > >> agree
> >> >> > > > > > >>> :).).
> >> >> > > > > > >>> At some point I think it would be good to have a public
> >> > hangout
> >> >> > > > > session
> >> >> > > > > > >> on
> >> >> > > > > > >>> this, which could make a more dynamic discussion.
> >> >> > > > > > >>>
> >> >> > > > > > >>> Cheers,
> >> >> > > > > > >>> Gyula
> >> >> > > > > > >>>
> >> >> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta
> (időpont:
> >> >> > 2015.
> >> >> > > > jún.
> >> >> > > > > > >> 22.,
> >> >> > > > > > >>> H, 21:34):
> >> >> > > > > > >>>
> >> >> > > > > > >>>> Hi,
> >> >> > > > > > >>>> with people proposing changes to the streaming part I
> >> also
> >> >> > > wanted
> >> >> > > > to
> >> >> > > > > > >>> throw
> >> >> > > > > > >>>> my hat into the ring. :D
> >> >> > > > > > >>>>
> >> >> > > > > > >>>> During the last few months, while I was getting
> >> acquainted
> >> >> > with
> >> >> > > > the
> >> >> > > > > > >>>> streaming system, I wrote down some thoughts I had
> about
> >> > how
> >> >> > > > things
> >> >> > > > > > >> could
> >> >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent
> >> shape
> >> >> > now,
> >> >> > > > so
> >> >> > > > > > >>> please
> >> >> > > > > > >>>> have a look if you are interested in this:
> >> >> > > > > > >>>>
> >> >> > > > > > >>>>
> >> >> > > > > > >>>
> >> >> > > > > > >>
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >
> >>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> >> >> > > > > > >>>>
> >> >> > > > > > >>>> This mostly covers:
> >> >> > > > > > >>>>  - Timestamps assigned at sources
> >> >> > > > > > >>>>  - Out-of-order processing of elements in window
> >> operators
> >> >> > > > > > >>>>  - API design
> >> >> > > > > > >>>>
> >> >> > > > > > >>>> Please let me know what you think. Comment in the
> >> document
> >> > or
> >> >> > > here
> >> >> > > > > in
> >> >> > > > > > >> the
> >> >> > > > > > >>>> mailing list.
> >> >> > > > > > >>>>
> >> >> > > > > > >>>> I have a PR in the makings that would introduce source
> >> >> > > timestamps
> >> >> > > > > and
> >> >> > > > > > >>>> watermarks for keeping track of them. I also hacked a
> >> >> > > > > proof-of-concept
> >> >> > > > > > >>> of a
> >> >> > > > > > >>>> windowing system that is able to process out-of-order
> >> > elements
> >> >> > > > > using a
> >> >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient
> >> >> > > > > > >> pre-aggregations.)
> >> >> > > > > > >>>>
> >> >> > > > > > >>>> Cheers,
> >> >> > > > > > >>>> Aljoscha
> >> >> > > > > > >>>>
> >> >> > > > > > >>>
> >> >> > > > > > >>
> >> >> > > > > > >
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Matthias J. Sax
Hi Aljoscha,

I like that you are pushing in this direction. However, IMHO you
misinterpreter the current approach. It does not assume that tuples
arrive in-order; the current approach has no notion about a
pre-defined-order (for example, the order in which the event are
created). There is only the notion of "arrival-order" at the operator.
From this "arrival-order" perspective, the result are correct(!).

Windowing in the current approach means for example, "sum up an
attribute of all events you *received* in the last 5 seconds". That is a
different meaning that "sum up an attribute of all event that *occurred*
in the last 5 seconds". Both queries are valid and Flink should support
both IMHO.


-Matthias



On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:

> Yes, now this also processes about 3 mio Elements (Window Size 5 sec, Slide
> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
>
> Performance is not my main concern, however. My concern is that the current
> model assumes elements to arrive in order, which is simply not true.
>
> In your code you have these lines for specifying the window:
> .window(Time.of(1l, TimeUnit.SECONDS))
> .every(Time.of(1l, TimeUnit.SECONDS))
>
> Although this semantically specifies a tumbling window of size 1 sec I'm
> afraid it uses the sliding window logic internally (because of the
> .every()).
>
> In my tests I only have the first line.
>
>
> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <[hidden email]> wrote:
>
>> I'm very sorry, I had a bug in the InversePreReducer. It should be
>> fixed now. Can you please run it again?
>>
>> I also tried to reproduce some of your performance numbers, but I'm
>> getting only less than 1/10th of yours. For example, in the Tumbling
>> case, Current/Reduce produces only ~100000 for me. Do you have any
>> idea what I could be doing wrong? My code:
>> http://pastebin.com/zbEjmGhk
>> I am running it on a 2 GHz Core i7.
>>
>> Best regards,
>> Gabor
>>
>>
>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>>> Hi,
>>> I also ran the tests on top of PR 856 (inverse reducer) now. The results
>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all
>>> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec)
>>> (Theoretically there would be 5000 tuples in 5 seconds but this is due to
>>> overhead). These are the results for the inverse reduce optimisation:
>>> (Tuple 0,38)
>>> (Tuple 0,829)
>>> (Tuple 0,1625)
>>> (Tuple 0,2424)
>>> (Tuple 0,3190)
>>> (Tuple 0,3198)
>>> (Tuple 0,-339368)
>>> (Tuple 0,-1315725)
>>> (Tuple 0,-2932932)
>>> (Tuple 0,-5082735)
>>> (Tuple 0,-7743256)
>>> (Tuple 0,75701046)
>>> (Tuple 0,642829470)
>>> (Tuple 0,2242018381)
>>> (Tuple 0,5190708618)
>>> (Tuple 0,10060360311)
>>> (Tuple 0,-94254951)
>>> (Tuple 0,-219806321293)
>>> (Tuple 0,-1258895232699)
>>> (Tuple 0,-4074432596329)
>>>
>>> One line is one emitted window count. This is what happens when I remove
>>> the Thread.sleep(1):
>>> (Tuple 0,660676)
>>> (Tuple 0,2553733)
>>> (Tuple 0,3542696)
>>> (Tuple 0,1)
>>> (Tuple 0,1107035)
>>> (Tuple 0,2549491)
>>> (Tuple 0,4100387)
>>> (Tuple 0,-8406583360092)
>>> (Tuple 0,-8406582150743)
>>> (Tuple 0,-8406580427190)
>>> (Tuple 0,-8406580427190)
>>> (Tuple 0,-8406580427190)
>>> (Tuple 0,6847279255682044995)
>>> (Tuple 0,6847279255682044995)
>>> (Tuple 0,-5390528042713628318)
>>> (Tuple 0,-5390528042711551780)
>>> (Tuple 0,-5390528042711551780)
>>>
>>> So at some point the pre-reducer seems to go haywire and does not recover
>>> from it. The good thing is that it does produce results now, where the
>>> previous Current/Reduce would simply hang and not produce any output.
>>>
>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote:
>>>
>>>> Hello,
>>>>
>>>> Aljoscha, can you please try the performance test of Current/Reduce
>>>> with the InversePreReducer in PR 856? (If you just call sum, it will
>>>> use an InversePreReducer.) It would be an interesting test, because
>>>> the inverse function optimization really depends on the stream being
>>>> ordered, and I think it has the potential of being faster then
>>>> Next/Reduce. Especially if the window size is much larger than the
>>>> slide size.
>>>>
>>>> Best regards,
>>>> Gabor
>>>>
>>>>
>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>>>>> I think I'll have to elaborate a bit so I created a proof-of-concept
>>>>> implementation of my Ideas and ran some throughput measurements to
>>>>> alleviate concerns about performance.
>>>>>
>>>>> First, though, I want to highlight again why the current approach does
>>>> not
>>>>> work with out-of-order elements (which, again, occur constantly due to
>>>> the
>>>>> distributed nature of the system). This is the example I posted
>> earlier:
>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks
>>>> like
>>>>> this:
>>>>>
>>>>> +--+
>>>>> | | Source
>>>>> +--+
>>>>> |
>>>>> +-----+
>>>>> | |
>>>>> | +--+
>>>>> | | | Identity Map
>>>>> | +--+
>>>>> | |
>>>>> +-----+
>>>>> |
>>>>> +--+
>>>>> | | Window
>>>>> +--+
>>>>> |
>>>>> |
>>>>> +--+
>>>>> | | Sink
>>>>> +--+
>>>>>
>>>>> So all it does is pass the elements through an identity map and then
>>>> merge
>>>>> them again before the window operator. The source emits ascending
>>>> integers
>>>>> and the window operator has a custom timestamp extractor that uses the
>>>>> integer itself as the timestamp and should create windows of size 4
>> (that
>>>>> is elements with timestamp 0-3 are one window, the next are the
>> elements
>>>>> with timestamp 4-8, and so on). Since the topology basically doubles
>> the
>>>>> elements form the source I would expect to get these windows:
>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
>>>>>
>>>>> The output is this, however:
>>>>> Window: 0, 1, 2, 3,
>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>>>>> Window: 8, 9, 10, 11,
>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>>>>> Window: 16, 17, 18, 19,
>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
>>>>> Window: 24, 25, 26, 27,
>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>>>>>
>>>>> The reason is that the elements simply arrive out-of-order. Imagine
>> what
>>>>> would happen if the elements actually arrived with some delay from
>>>>> different operations.
>>>>>
>>>>> Now, on to the performance numbers. The proof-of-concept I created is
>>>>> available here:
>>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The
>>>> basic
>>>>> idea is that sources assign the current timestamp when emitting
>> elements.
>>>>> They also periodically emit watermarks that tell us that no elements
>> with
>>>>> an earlier timestamp will be emitted. The watermarks propagate through
>>>> the
>>>>> operators. The window operator looks at the timestamp of an element
>> and
>>>>> puts it into the buffer that corresponds to that window. When the
>> window
>>>>> operator receives a watermark it will look at the in-flight windows
>>>>> (basically the buffers) and emit those windows where the window-end is
>>>>> before the watermark.
>>>>>
>>>>> For measuring throughput I did the following: The source emits tuples
>> of
>>>>> the form ("tuple", 1) in an infinite loop. The window operator sums up
>>>> the
>>>>> tuples, thereby counting how many tuples the window operator can
>> handle
>>>> in
>>>>> a given time window. There are two different implementations for the
>>>>> summation: 1) simply summing up the values in a mapWindow(), there you
>>>> get
>>>>> a List of all tuples and simple iterate over it. 2) using sum(1),
>> which
>>>> is
>>>>> implemented as a reduce() (that uses the pre-reducer optimisations).
>>>>>
>>>>> These are the performance numbers (Current is the current
>> implementation,
>>>>> Next is my proof-of-concept):
>>>>>
>>>>> Tumbling (1 sec):
>>>>>  - Current/Map: 1.6 mio
>>>>>  - Current/Reduce: 2 mio
>>>>>  - Next/Map: 2.2 mio
>>>>>  - Next/Reduce: 4 mio
>>>>>
>>>>> Sliding (5 sec, slide 1 sec):
>>>>>  - Current/Map: ca 3 mio (fluctuates a lot)
>>>>>  - Current/Reduce: No output
>>>>>  - Next/Map: ca 4 mio (fluctuates)
>>>>>  - Next/Reduce: 10 mio
>>>>>
>>>>> The Next/Reduce variant can basically scale indefinitely with window
>> size
>>>>> because the internal state does not rely on the number of elements
>> (it is
>>>>> just the current sum). The pre-reducer for sliding elements cannot
>> handle
>>>>> the amount of tuples, it produces no output. For the two Map variants
>> the
>>>>> performance fluctuates because they always keep all the elements in an
>>>>> internal buffer before emission, this seems to tax the garbage
>> collector
>>>> a
>>>>> bit and leads to random pauses.
>>>>>
>>>>> One thing that should be noted is that I had to disable the
>> fake-element
>>>>> emission thread, otherwise the Current versions would deadlock.
>>>>>
>>>>> So, I started working on this because I thought that out-of-order
>>>>> processing would be necessary for correctness. And it is certainly,
>> But
>>>> the
>>>>> proof-of-concept also shows that performance can be greatly improved.
>>>>>
>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote:
>>>>>>
>>>>>> I agree lets separate these topics from each other so we can get
>> faster
>>>>>> resolution.
>>>>>>
>>>>>> There is already a state discussion in the thread we started with
>> Paris.
>>>>>>
>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>>> I agree with supporting out-of-order out of the box :-), even if
>> this
>>>>> means
>>>>>>> a major refactoring. This is the right time to refactor the
>> streaming
>>>>> API
>>>>>>> before we pull it out of beta. I think that this is more important
>>>> than
>>>>> new
>>>>>>> features in the streaming API, which can be prioritized once the
>> API
>>>> is
>>>>> out
>>>>>>> of beta (meaning, that IMO this is the right time to stall PRs
>> until
>>>> we
>>>>>>> agree on the design).
>>>>>>>
>>>>>>> There are three sections in the document: windowing, state, and
>> API.
>>>> How
>>>>>>> convoluted are those with each other? Can we separate the
>> discussion
>>>> or
>>>>> do
>>>>>>> we need to discuss those all together? I think part of the
>> difficulty
>>>> is
>>>>>>> that we are discussing three design choices at once.
>>>>>>>
>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
>> [hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Out of order is ubiquitous in the real-world.  Typically, what
>>>>> happens is
>>>>>>>> that businesses will declare a maximum allowable delay for
>> delayed
>>>>>>>> transactions and will commit to results when that delay is
>> reached.
>>>>>>>> Transactions that arrive later than this cutoff are collected
>>>>> specially
>>>>>>> as
>>>>>>>> corrections which are reported/used when possible.
>>>>>>>>
>>>>>>>> Clearly, ordering can also be violated during processing, but if
>> the
>>>>> data
>>>>>>>> is originally out of order the situation can't be repaired by any
>>>>>>> protocol
>>>>>>>> fixes that prevent transactions from becoming disordered but has
>> to
>>>>>>> handled
>>>>>>>> at the data level.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>>>> [hidden email]
>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I also don't like big changes but sometimes they are necessary.
>>>> The
>>>>>>>> reason
>>>>>>>>> why I'm so adamant about out-of-order processing is that
>>>>> out-of-order
>>>>>>>>> elements are not some exception that occurs once in a while;
>> they
>>>>> occur
>>>>>>>>> constantly in a distributed system. For example, in this:
>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the
>>>> resulting
>>>>>>>>> windows
>>>>>>>>> are completely bogus because the current windowing system
>> assumes
>>>>>>>> elements
>>>>>>>>> to globally arrive in order, which is simply not true. (The
>>>> example
>>>>>>> has a
>>>>>>>>> source that generates increasing integers. Then these pass
>>>> through a
>>>>>>> map
>>>>>>>>> and are unioned with the original DataStream before a window
>>>>> operator.)
>>>>>>>>> This simulates elements arriving from different operators at a
>>>>>>> windowing
>>>>>>>>> operator. The example is also DOP=1, I imagine this to get
>> worse
>>>>> with
>>>>>>>>> higher DOP.
>>>>>>>>>
>>>>>>>>> What do you mean by costly? As I said, I have a
>> proof-of-concept
>>>>>>>> windowing
>>>>>>>>> operator that can handle out-or-order elements. This is an
>> example
>>>>>>> using
>>>>>>>>> the current Flink API:
>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>>>>>>>>> (It is an infinite source of tuples and a 5 second window
>> operator
>>>>> that
>>>>>>>>> counts the tuples.) The first problem is that this code
>> deadlocks
>>>>>>> because
>>>>>>>>> of the thread that emits fake elements. If I disable the fake
>>>>> element
>>>>>>>> code
>>>>>>>>> it works, but the throughput using my mockup is 4 times higher
>> .
>>>> The
>>>>>>> gap
>>>>>>>>> widens dramatically if the window size increases.
>>>>>>>>>
>>>>>>>>> So, it actually increases performance (unless I'm making a
>> mistake
>>>>> in
>>>>>>> my
>>>>>>>>> explorations) and can handle elements that arrive out-of-order
>>>>> (which
>>>>>>>>> happens basically always in a real-world windowing use-cases).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]>
>>>> wrote:
>>>>>>>>>
>>>>>>>>>> What I like a lot about Aljoscha's proposed design is that we
>>>>> need no
>>>>>>>>>> different code for "system time" vs. "event time". It only
>>>>> differs in
>>>>>>>>> where
>>>>>>>>>> the timestamps are assigned.
>>>>>>>>>>
>>>>>>>>>> The OOP approach also gives you the semantics of total
>> ordering
>>>>>>> without
>>>>>>>>>> imposing merges on the streams.
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> I agree that there should be multiple alternatives the
>> user(!)
>>>>> can
>>>>>>>>>>> choose from. Partial out-of-order processing works for
>>>> many/most
>>>>>>>>>>> aggregates. However, if you consider
>> Event-Pattern-Matching,
>>>>> global
>>>>>>>>>>> ordering in necessary (even if the performance penalty
>> might
>>>> be
>>>>>>>> high).
>>>>>>>>>>>
>>>>>>>>>>> I would also keep "system-time windows" as an alternative
>> to
>>>>>>> "source
>>>>>>>>>>> assigned ts-windows".
>>>>>>>>>>>
>>>>>>>>>>> It might also be interesting to consider the following
>> paper
>>>> for
>>>>>>>>>>> overlapping windows: "Resource sharing in continuous
>>>>> sliding-window
>>>>>>>>>>> aggregates"
>>>>>>>>>>>
>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>>>>>> Hey
>>>>>>>>>>>>
>>>>>>>>>>>> I think we should not block PRs unnecessarily if your
>>>>> suggested
>>>>>>>>> changes
>>>>>>>>>>>> might touch them at some point.
>>>>>>>>>>>>
>>>>>>>>>>>> Also I still think we should not put everything in the
>>>>> Datastream
>>>>>>>>>> because
>>>>>>>>>>>> it will be a huge mess.
>>>>>>>>>>>>
>>>>>>>>>>>> Also we need to agree on the out of order processing,
>>>> whether
>>>>> we
>>>>>>>> want
>>>>>>>>>> it
>>>>>>>>>>>> the way you proposed it(which is quite costly). Another
>>>>>>> alternative
>>>>>>>>>>>> approach there which fits in the current windowing is to
>>>>> filter
>>>>>>> out
>>>>>>>>> if
>>>>>>>>>>>> order events and apply a special handling operator on
>> them.
>>>>> This
>>>>>>>>> would
>>>>>>>>>> be
>>>>>>>>>>>> fairly lightweight.
>>>>>>>>>>>>
>>>>>>>>>>>> My point is that we need to consider some alternative
>>>>> solutions.
>>>>>>>> And
>>>>>>>>> we
>>>>>>>>>>>> should not block contributions along the way.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers
>>>>>>>>>>>> Gyula
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>>>>>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The reason I posted this now is that we need to think
>> about
>>>>> the
>>>>>>>> API
>>>>>>>>>> and
>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
>> (inverse
>>>>>>> reduce)
>>>>>>>>> and
>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream).
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the windowing, I think that the current model does
>> not
>>>>> work
>>>>>>>> for
>>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing
>>>>>>>>> infrastructure
>>>>>>>>>>> will
>>>>>>>>>>>>> basically have to be redone. Meaning also that any work
>> on
>>>>> the
>>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes
>>>>> useless.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the API, I proposed to restructure the interactions
>>>>> between
>>>>>>>> all
>>>>>>>>>> the
>>>>>>>>>>>>> different *DataStream classes and grouping/windowing.
>> (See
>>>>> API
>>>>>>>>> section
>>>>>>>>>>> of
>>>>>>>>>>>>> the doc I posted.)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>>>> [hidden email]
>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
>>>> initiative.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I added some comments to the respective sections
>> (where I
>>>>> didnt
>>>>>>>>> fully
>>>>>>>>>>>>> agree
>>>>>>>>>>>>>> :).).
>>>>>>>>>>>>>> At some point I think it would be good to have a public
>>>>> hangout
>>>>>>>>>> session
>>>>>>>>>>>>> on
>>>>>>>>>>>>>> this, which could make a more dynamic discussion.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Aljoscha Krettek <[hidden email]> ezt írta
>> (időpont:
>>>>>>> 2015.
>>>>>>>>> jún.
>>>>>>>>>>>>> 22.,
>>>>>>>>>>>>>> H, 21:34):
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> with people proposing changes to the streaming part I
>>>> also
>>>>>>>> wanted
>>>>>>>>> to
>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> During the last few months, while I was getting
>>>> acquainted
>>>>>>> with
>>>>>>>>> the
>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
>> about
>>>>> how
>>>>>>>>> things
>>>>>>>>>>>>> could
>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent
>>>> shape
>>>>>>> now,
>>>>>>>>> so
>>>>>>>>>>>>>> please
>>>>>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This mostly covers:
>>>>>>>>>>>>>>>  - Timestamps assigned at sources
>>>>>>>>>>>>>>>  - Out-of-order processing of elements in window
>>>> operators
>>>>>>>>>>>>>>>  - API design
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please let me know what you think. Comment in the
>>>> document
>>>>> or
>>>>>>>> here
>>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> mailing list.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a PR in the makings that would introduce source
>>>>>>>> timestamps
>>>>>>>>>> and
>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a
>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>> windowing system that is able to process out-of-order
>>>>> elements
>>>>>>>>>> using a
>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient
>>>>>>>>>>>>> pre-aggregations.)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Aljoscha Krettek-2
Yes, I am aware of this requirement and it would also be supported in my
proposed model.

The problem is, that the "custom timestamp" feature gives the impression
that the elements would be windowed according to a user-timestamp. The
results, however, are wrong because of the assumption about elements
arriving in order. (This is what I was trying to show with my fancy ASCII
art and result output.

On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <[hidden email]>
wrote:

> Hi Aljoscha,
>
> I like that you are pushing in this direction. However, IMHO you
> misinterpreter the current approach. It does not assume that tuples
> arrive in-order; the current approach has no notion about a
> pre-defined-order (for example, the order in which the event are
> created). There is only the notion of "arrival-order" at the operator.
> From this "arrival-order" perspective, the result are correct(!).
>
> Windowing in the current approach means for example, "sum up an
> attribute of all events you *received* in the last 5 seconds". That is a
> different meaning that "sum up an attribute of all event that *occurred*
> in the last 5 seconds". Both queries are valid and Flink should support
> both IMHO.
>
>
> -Matthias
>
>
>
> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
> > Yes, now this also processes about 3 mio Elements (Window Size 5 sec,
> Slide
> > 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
> >
> > Performance is not my main concern, however. My concern is that the
> current
> > model assumes elements to arrive in order, which is simply not true.
> >
> > In your code you have these lines for specifying the window:
> > .window(Time.of(1l, TimeUnit.SECONDS))
> > .every(Time.of(1l, TimeUnit.SECONDS))
> >
> > Although this semantically specifies a tumbling window of size 1 sec I'm
> > afraid it uses the sliding window logic internally (because of the
> > .every()).
> >
> > In my tests I only have the first line.
> >
> >
> > On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <[hidden email]> wrote:
> >
> >> I'm very sorry, I had a bug in the InversePreReducer. It should be
> >> fixed now. Can you please run it again?
> >>
> >> I also tried to reproduce some of your performance numbers, but I'm
> >> getting only less than 1/10th of yours. For example, in the Tumbling
> >> case, Current/Reduce produces only ~100000 for me. Do you have any
> >> idea what I could be doing wrong? My code:
> >> http://pastebin.com/zbEjmGhk
> >> I am running it on a 2 GHz Core i7.
> >>
> >> Best regards,
> >> Gabor
> >>
> >>
> >> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>:
> >>> Hi,
> >>> I also ran the tests on top of PR 856 (inverse reducer) now. The
> results
> >>> seem incorrect. When I insert a Thread.sleep(1) in the tuple source,
> all
> >>> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1
> sec)
> >>> (Theoretically there would be 5000 tuples in 5 seconds but this is due
> to
> >>> overhead). These are the results for the inverse reduce optimisation:
> >>> (Tuple 0,38)
> >>> (Tuple 0,829)
> >>> (Tuple 0,1625)
> >>> (Tuple 0,2424)
> >>> (Tuple 0,3190)
> >>> (Tuple 0,3198)
> >>> (Tuple 0,-339368)
> >>> (Tuple 0,-1315725)
> >>> (Tuple 0,-2932932)
> >>> (Tuple 0,-5082735)
> >>> (Tuple 0,-7743256)
> >>> (Tuple 0,75701046)
> >>> (Tuple 0,642829470)
> >>> (Tuple 0,2242018381)
> >>> (Tuple 0,5190708618)
> >>> (Tuple 0,10060360311)
> >>> (Tuple 0,-94254951)
> >>> (Tuple 0,-219806321293)
> >>> (Tuple 0,-1258895232699)
> >>> (Tuple 0,-4074432596329)
> >>>
> >>> One line is one emitted window count. This is what happens when I
> remove
> >>> the Thread.sleep(1):
> >>> (Tuple 0,660676)
> >>> (Tuple 0,2553733)
> >>> (Tuple 0,3542696)
> >>> (Tuple 0,1)
> >>> (Tuple 0,1107035)
> >>> (Tuple 0,2549491)
> >>> (Tuple 0,4100387)
> >>> (Tuple 0,-8406583360092)
> >>> (Tuple 0,-8406582150743)
> >>> (Tuple 0,-8406580427190)
> >>> (Tuple 0,-8406580427190)
> >>> (Tuple 0,-8406580427190)
> >>> (Tuple 0,6847279255682044995)
> >>> (Tuple 0,6847279255682044995)
> >>> (Tuple 0,-5390528042713628318)
> >>> (Tuple 0,-5390528042711551780)
> >>> (Tuple 0,-5390528042711551780)
> >>>
> >>> So at some point the pre-reducer seems to go haywire and does not
> recover
> >>> from it. The good thing is that it does produce results now, where the
> >>> previous Current/Reduce would simply hang and not produce any output.
> >>>
> >>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote:
> >>>
> >>>> Hello,
> >>>>
> >>>> Aljoscha, can you please try the performance test of Current/Reduce
> >>>> with the InversePreReducer in PR 856? (If you just call sum, it will
> >>>> use an InversePreReducer.) It would be an interesting test, because
> >>>> the inverse function optimization really depends on the stream being
> >>>> ordered, and I think it has the potential of being faster then
> >>>> Next/Reduce. Especially if the window size is much larger than the
> >>>> slide size.
> >>>>
> >>>> Best regards,
> >>>> Gabor
> >>>>
> >>>>
> >>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>:
> >>>>> I think I'll have to elaborate a bit so I created a proof-of-concept
> >>>>> implementation of my Ideas and ran some throughput measurements to
> >>>>> alleviate concerns about performance.
> >>>>>
> >>>>> First, though, I want to highlight again why the current approach
> does
> >>>> not
> >>>>> work with out-of-order elements (which, again, occur constantly due
> to
> >>>> the
> >>>>> distributed nature of the system). This is the example I posted
> >> earlier:
> >>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan
> looks
> >>>> like
> >>>>> this:
> >>>>>
> >>>>> +--+
> >>>>> | | Source
> >>>>> +--+
> >>>>> |
> >>>>> +-----+
> >>>>> | |
> >>>>> | +--+
> >>>>> | | | Identity Map
> >>>>> | +--+
> >>>>> | |
> >>>>> +-----+
> >>>>> |
> >>>>> +--+
> >>>>> | | Window
> >>>>> +--+
> >>>>> |
> >>>>> |
> >>>>> +--+
> >>>>> | | Sink
> >>>>> +--+
> >>>>>
> >>>>> So all it does is pass the elements through an identity map and then
> >>>> merge
> >>>>> them again before the window operator. The source emits ascending
> >>>> integers
> >>>>> and the window operator has a custom timestamp extractor that uses
> the
> >>>>> integer itself as the timestamp and should create windows of size 4
> >> (that
> >>>>> is elements with timestamp 0-3 are one window, the next are the
> >> elements
> >>>>> with timestamp 4-8, and so on). Since the topology basically doubles
> >> the
> >>>>> elements form the source I would expect to get these windows:
> >>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
> >>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
> >>>>>
> >>>>> The output is this, however:
> >>>>> Window: 0, 1, 2, 3,
> >>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
> >>>>> Window: 8, 9, 10, 11,
> >>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
> >>>>> Window: 16, 17, 18, 19,
> >>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
> >>>>> Window: 24, 25, 26, 27,
> >>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
> >>>>>
> >>>>> The reason is that the elements simply arrive out-of-order. Imagine
> >> what
> >>>>> would happen if the elements actually arrived with some delay from
> >>>>> different operations.
> >>>>>
> >>>>> Now, on to the performance numbers. The proof-of-concept I created is
> >>>>> available here:
> >>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock.
> The
> >>>> basic
> >>>>> idea is that sources assign the current timestamp when emitting
> >> elements.
> >>>>> They also periodically emit watermarks that tell us that no elements
> >> with
> >>>>> an earlier timestamp will be emitted. The watermarks propagate
> through
> >>>> the
> >>>>> operators. The window operator looks at the timestamp of an element
> >> and
> >>>>> puts it into the buffer that corresponds to that window. When the
> >> window
> >>>>> operator receives a watermark it will look at the in-flight windows
> >>>>> (basically the buffers) and emit those windows where the window-end
> is
> >>>>> before the watermark.
> >>>>>
> >>>>> For measuring throughput I did the following: The source emits tuples
> >> of
> >>>>> the form ("tuple", 1) in an infinite loop. The window operator sums
> up
> >>>> the
> >>>>> tuples, thereby counting how many tuples the window operator can
> >> handle
> >>>> in
> >>>>> a given time window. There are two different implementations for the
> >>>>> summation: 1) simply summing up the values in a mapWindow(), there
> you
> >>>> get
> >>>>> a List of all tuples and simple iterate over it. 2) using sum(1),
> >> which
> >>>> is
> >>>>> implemented as a reduce() (that uses the pre-reducer optimisations).
> >>>>>
> >>>>> These are the performance numbers (Current is the current
> >> implementation,
> >>>>> Next is my proof-of-concept):
> >>>>>
> >>>>> Tumbling (1 sec):
> >>>>>  - Current/Map: 1.6 mio
> >>>>>  - Current/Reduce: 2 mio
> >>>>>  - Next/Map: 2.2 mio
> >>>>>  - Next/Reduce: 4 mio
> >>>>>
> >>>>> Sliding (5 sec, slide 1 sec):
> >>>>>  - Current/Map: ca 3 mio (fluctuates a lot)
> >>>>>  - Current/Reduce: No output
> >>>>>  - Next/Map: ca 4 mio (fluctuates)
> >>>>>  - Next/Reduce: 10 mio
> >>>>>
> >>>>> The Next/Reduce variant can basically scale indefinitely with window
> >> size
> >>>>> because the internal state does not rely on the number of elements
> >> (it is
> >>>>> just the current sum). The pre-reducer for sliding elements cannot
> >> handle
> >>>>> the amount of tuples, it produces no output. For the two Map variants
> >> the
> >>>>> performance fluctuates because they always keep all the elements in
> an
> >>>>> internal buffer before emission, this seems to tax the garbage
> >> collector
> >>>> a
> >>>>> bit and leads to random pauses.
> >>>>>
> >>>>> One thing that should be noted is that I had to disable the
> >> fake-element
> >>>>> emission thread, otherwise the Current versions would deadlock.
> >>>>>
> >>>>> So, I started working on this because I thought that out-of-order
> >>>>> processing would be necessary for correctness. And it is certainly,
> >> But
> >>>> the
> >>>>> proof-of-concept also shows that performance can be greatly improved.
> >>>>>
> >>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]>
> wrote:
> >>>>>>
> >>>>>> I agree lets separate these topics from each other so we can get
> >> faster
> >>>>>> resolution.
> >>>>>>
> >>>>>> There is already a state discussion in the thread we started with
> >> Paris.
> >>>>>>
> >>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]
> >
> >>>>> wrote:
> >>>>>>
> >>>>>>> I agree with supporting out-of-order out of the box :-), even if
> >> this
> >>>>> means
> >>>>>>> a major refactoring. This is the right time to refactor the
> >> streaming
> >>>>> API
> >>>>>>> before we pull it out of beta. I think that this is more important
> >>>> than
> >>>>> new
> >>>>>>> features in the streaming API, which can be prioritized once the
> >> API
> >>>> is
> >>>>> out
> >>>>>>> of beta (meaning, that IMO this is the right time to stall PRs
> >> until
> >>>> we
> >>>>>>> agree on the design).
> >>>>>>>
> >>>>>>> There are three sections in the document: windowing, state, and
> >> API.
> >>>> How
> >>>>>>> convoluted are those with each other? Can we separate the
> >> discussion
> >>>> or
> >>>>> do
> >>>>>>> we need to discuss those all together? I think part of the
> >> difficulty
> >>>> is
> >>>>>>> that we are discussing three design choices at once.
> >>>>>>>
> >>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
> >> [hidden email]>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Out of order is ubiquitous in the real-world.  Typically, what
> >>>>> happens is
> >>>>>>>> that businesses will declare a maximum allowable delay for
> >> delayed
> >>>>>>>> transactions and will commit to results when that delay is
> >> reached.
> >>>>>>>> Transactions that arrive later than this cutoff are collected
> >>>>> specially
> >>>>>>> as
> >>>>>>>> corrections which are reported/used when possible.
> >>>>>>>>
> >>>>>>>> Clearly, ordering can also be violated during processing, but if
> >> the
> >>>>> data
> >>>>>>>> is originally out of order the situation can't be repaired by any
> >>>>>>> protocol
> >>>>>>>> fixes that prevent transactions from becoming disordered but has
> >> to
> >>>>>>> handled
> >>>>>>>> at the data level.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
> >>>> [hidden email]
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I also don't like big changes but sometimes they are necessary.
> >>>> The
> >>>>>>>> reason
> >>>>>>>>> why I'm so adamant about out-of-order processing is that
> >>>>> out-of-order
> >>>>>>>>> elements are not some exception that occurs once in a while;
> >> they
> >>>>> occur
> >>>>>>>>> constantly in a distributed system. For example, in this:
> >>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the
> >>>> resulting
> >>>>>>>>> windows
> >>>>>>>>> are completely bogus because the current windowing system
> >> assumes
> >>>>>>>> elements
> >>>>>>>>> to globally arrive in order, which is simply not true. (The
> >>>> example
> >>>>>>> has a
> >>>>>>>>> source that generates increasing integers. Then these pass
> >>>> through a
> >>>>>>> map
> >>>>>>>>> and are unioned with the original DataStream before a window
> >>>>> operator.)
> >>>>>>>>> This simulates elements arriving from different operators at a
> >>>>>>> windowing
> >>>>>>>>> operator. The example is also DOP=1, I imagine this to get
> >> worse
> >>>>> with
> >>>>>>>>> higher DOP.
> >>>>>>>>>
> >>>>>>>>> What do you mean by costly? As I said, I have a
> >> proof-of-concept
> >>>>>>>> windowing
> >>>>>>>>> operator that can handle out-or-order elements. This is an
> >> example
> >>>>>>> using
> >>>>>>>>> the current Flink API:
> >>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
> >>>>>>>>> (It is an infinite source of tuples and a 5 second window
> >> operator
> >>>>> that
> >>>>>>>>> counts the tuples.) The first problem is that this code
> >> deadlocks
> >>>>>>> because
> >>>>>>>>> of the thread that emits fake elements. If I disable the fake
> >>>>> element
> >>>>>>>> code
> >>>>>>>>> it works, but the throughput using my mockup is 4 times higher
> >> .
> >>>> The
> >>>>>>> gap
> >>>>>>>>> widens dramatically if the window size increases.
> >>>>>>>>>
> >>>>>>>>> So, it actually increases performance (unless I'm making a
> >> mistake
> >>>>> in
> >>>>>>> my
> >>>>>>>>> explorations) and can handle elements that arrive out-of-order
> >>>>> (which
> >>>>>>>>> happens basically always in a real-world windowing use-cases).
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> What I like a lot about Aljoscha's proposed design is that we
> >>>>> need no
> >>>>>>>>>> different code for "system time" vs. "event time". It only
> >>>>> differs in
> >>>>>>>>> where
> >>>>>>>>>> the timestamps are assigned.
> >>>>>>>>>>
> >>>>>>>>>> The OOP approach also gives you the semantics of total
> >> ordering
> >>>>>>> without
> >>>>>>>>>> imposing merges on the streams.
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
> >>>>>>>>>> [hidden email]> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> I agree that there should be multiple alternatives the
> >> user(!)
> >>>>> can
> >>>>>>>>>>> choose from. Partial out-of-order processing works for
> >>>> many/most
> >>>>>>>>>>> aggregates. However, if you consider
> >> Event-Pattern-Matching,
> >>>>> global
> >>>>>>>>>>> ordering in necessary (even if the performance penalty
> >> might
> >>>> be
> >>>>>>>> high).
> >>>>>>>>>>>
> >>>>>>>>>>> I would also keep "system-time windows" as an alternative
> >> to
> >>>>>>> "source
> >>>>>>>>>>> assigned ts-windows".
> >>>>>>>>>>>
> >>>>>>>>>>> It might also be interesting to consider the following
> >> paper
> >>>> for
> >>>>>>>>>>> overlapping windows: "Resource sharing in continuous
> >>>>> sliding-window
> >>>>>>>>>>> aggregates"
> >>>>>>>>>>>
> >>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
> >>>>>>>>>>>> Hey
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think we should not block PRs unnecessarily if your
> >>>>> suggested
> >>>>>>>>> changes
> >>>>>>>>>>>> might touch them at some point.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also I still think we should not put everything in the
> >>>>> Datastream
> >>>>>>>>>> because
> >>>>>>>>>>>> it will be a huge mess.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also we need to agree on the out of order processing,
> >>>> whether
> >>>>> we
> >>>>>>>> want
> >>>>>>>>>> it
> >>>>>>>>>>>> the way you proposed it(which is quite costly). Another
> >>>>>>> alternative
> >>>>>>>>>>>> approach there which fits in the current windowing is to
> >>>>> filter
> >>>>>>> out
> >>>>>>>>> if
> >>>>>>>>>>>> order events and apply a special handling operator on
> >> them.
> >>>>> This
> >>>>>>>>> would
> >>>>>>>>>> be
> >>>>>>>>>>>> fairly lightweight.
> >>>>>>>>>>>>
> >>>>>>>>>>>> My point is that we need to consider some alternative
> >>>>> solutions.
> >>>>>>>> And
> >>>>>>>>> we
> >>>>>>>>>>>> should not block contributions along the way.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers
> >>>>>>>>>>>> Gyula
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
> >>>>>>>>> [hidden email]>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> The reason I posted this now is that we need to think
> >> about
> >>>>> the
> >>>>>>>> API
> >>>>>>>>>> and
> >>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
> >> (inverse
> >>>>>>> reduce)
> >>>>>>>>> and
> >>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For the windowing, I think that the current model does
> >> not
> >>>>> work
> >>>>>>>> for
> >>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing
> >>>>>>>>> infrastructure
> >>>>>>>>>>> will
> >>>>>>>>>>>>> basically have to be redone. Meaning also that any work
> >> on
> >>>>> the
> >>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes
> >>>>> useless.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For the API, I proposed to restructure the interactions
> >>>>> between
> >>>>>>>> all
> >>>>>>>>>> the
> >>>>>>>>>>>>> different *DataStream classes and grouping/windowing.
> >> (See
> >>>>> API
> >>>>>>>>> section
> >>>>>>>>>>> of
> >>>>>>>>>>>>> the doc I posted.)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
> >>>> [hidden email]
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
> >>>> initiative.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I added some comments to the respective sections
> >> (where I
> >>>>> didnt
> >>>>>>>>> fully
> >>>>>>>>>>>>> agree
> >>>>>>>>>>>>>> :).).
> >>>>>>>>>>>>>> At some point I think it would be good to have a public
> >>>>> hangout
> >>>>>>>>>> session
> >>>>>>>>>>>>> on
> >>>>>>>>>>>>>> this, which could make a more dynamic discussion.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>> Gyula
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Aljoscha Krettek <[hidden email]> ezt írta
> >> (időpont:
> >>>>>>> 2015.
> >>>>>>>>> jún.
> >>>>>>>>>>>>> 22.,
> >>>>>>>>>>>>>> H, 21:34):
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>> with people proposing changes to the streaming part I
> >>>> also
> >>>>>>>> wanted
> >>>>>>>>> to
> >>>>>>>>>>>>>> throw
> >>>>>>>>>>>>>>> my hat into the ring. :D
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> During the last few months, while I was getting
> >>>> acquainted
> >>>>>>> with
> >>>>>>>>> the
> >>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
> >> about
> >>>>> how
> >>>>>>>>> things
> >>>>>>>>>>>>> could
> >>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent
> >>>> shape
> >>>>>>> now,
> >>>>>>>>> so
> >>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>> have a look if you are interested in this:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This mostly covers:
> >>>>>>>>>>>>>>>  - Timestamps assigned at sources
> >>>>>>>>>>>>>>>  - Out-of-order processing of elements in window
> >>>> operators
> >>>>>>>>>>>>>>>  - API design
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please let me know what you think. Comment in the
> >>>> document
> >>>>> or
> >>>>>>>> here
> >>>>>>>>>> in
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> mailing list.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have a PR in the makings that would introduce source
> >>>>>>>> timestamps
> >>>>>>>>>> and
> >>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a
> >>>>>>>>>> proof-of-concept
> >>>>>>>>>>>>>> of a
> >>>>>>>>>>>>>>> windowing system that is able to process out-of-order
> >>>>> elements
> >>>>>>>>>> using a
> >>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient
> >>>>>>>>>>>>> pre-aggregations.)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Thoughts About Streaming

Matthias J. Sax
Sure. I picked this up. Using the current model for "occurrence time
semantics" does not work.

I elaborated on this in the past many times (but nobody cared). It is
important to make it clear to the user what semantics are supported.
Claiming to support "sliding windows" doesn't mean anything; there are
too many different semantics out there. :)


On 06/25/2015 03:35 PM, Aljoscha Krettek wrote:

> Yes, I am aware of this requirement and it would also be supported in my
> proposed model.
>
> The problem is, that the "custom timestamp" feature gives the impression
> that the elements would be windowed according to a user-timestamp. The
> results, however, are wrong because of the assumption about elements
> arriving in order. (This is what I was trying to show with my fancy ASCII
> art and result output.
>
> On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <[hidden email]>
> wrote:
>
>> Hi Aljoscha,
>>
>> I like that you are pushing in this direction. However, IMHO you
>> misinterpreter the current approach. It does not assume that tuples
>> arrive in-order; the current approach has no notion about a
>> pre-defined-order (for example, the order in which the event are
>> created). There is only the notion of "arrival-order" at the operator.
>> From this "arrival-order" perspective, the result are correct(!).
>>
>> Windowing in the current approach means for example, "sum up an
>> attribute of all events you *received* in the last 5 seconds". That is a
>> different meaning that "sum up an attribute of all event that *occurred*
>> in the last 5 seconds". Both queries are valid and Flink should support
>> both IMHO.
>>
>>
>> -Matthias
>>
>>
>>
>> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote:
>>> Yes, now this also processes about 3 mio Elements (Window Size 5 sec,
>> Slide
>>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio.
>>>
>>> Performance is not my main concern, however. My concern is that the
>> current
>>> model assumes elements to arrive in order, which is simply not true.
>>>
>>> In your code you have these lines for specifying the window:
>>> .window(Time.of(1l, TimeUnit.SECONDS))
>>> .every(Time.of(1l, TimeUnit.SECONDS))
>>>
>>> Although this semantically specifies a tumbling window of size 1 sec I'm
>>> afraid it uses the sliding window logic internally (because of the
>>> .every()).
>>>
>>> In my tests I only have the first line.
>>>
>>>
>>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <[hidden email]> wrote:
>>>
>>>> I'm very sorry, I had a bug in the InversePreReducer. It should be
>>>> fixed now. Can you please run it again?
>>>>
>>>> I also tried to reproduce some of your performance numbers, but I'm
>>>> getting only less than 1/10th of yours. For example, in the Tumbling
>>>> case, Current/Reduce produces only ~100000 for me. Do you have any
>>>> idea what I could be doing wrong? My code:
>>>> http://pastebin.com/zbEjmGhk
>>>> I am running it on a 2 GHz Core i7.
>>>>
>>>> Best regards,
>>>> Gabor
>>>>
>>>>
>>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>>>>> Hi,
>>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The
>> results
>>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple source,
>> all
>>>>> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1
>> sec)
>>>>> (Theoretically there would be 5000 tuples in 5 seconds but this is due
>> to
>>>>> overhead). These are the results for the inverse reduce optimisation:
>>>>> (Tuple 0,38)
>>>>> (Tuple 0,829)
>>>>> (Tuple 0,1625)
>>>>> (Tuple 0,2424)
>>>>> (Tuple 0,3190)
>>>>> (Tuple 0,3198)
>>>>> (Tuple 0,-339368)
>>>>> (Tuple 0,-1315725)
>>>>> (Tuple 0,-2932932)
>>>>> (Tuple 0,-5082735)
>>>>> (Tuple 0,-7743256)
>>>>> (Tuple 0,75701046)
>>>>> (Tuple 0,642829470)
>>>>> (Tuple 0,2242018381)
>>>>> (Tuple 0,5190708618)
>>>>> (Tuple 0,10060360311)
>>>>> (Tuple 0,-94254951)
>>>>> (Tuple 0,-219806321293)
>>>>> (Tuple 0,-1258895232699)
>>>>> (Tuple 0,-4074432596329)
>>>>>
>>>>> One line is one emitted window count. This is what happens when I
>> remove
>>>>> the Thread.sleep(1):
>>>>> (Tuple 0,660676)
>>>>> (Tuple 0,2553733)
>>>>> (Tuple 0,3542696)
>>>>> (Tuple 0,1)
>>>>> (Tuple 0,1107035)
>>>>> (Tuple 0,2549491)
>>>>> (Tuple 0,4100387)
>>>>> (Tuple 0,-8406583360092)
>>>>> (Tuple 0,-8406582150743)
>>>>> (Tuple 0,-8406580427190)
>>>>> (Tuple 0,-8406580427190)
>>>>> (Tuple 0,-8406580427190)
>>>>> (Tuple 0,6847279255682044995)
>>>>> (Tuple 0,6847279255682044995)
>>>>> (Tuple 0,-5390528042713628318)
>>>>> (Tuple 0,-5390528042711551780)
>>>>> (Tuple 0,-5390528042711551780)
>>>>>
>>>>> So at some point the pre-reducer seems to go haywire and does not
>> recover
>>>>> from it. The good thing is that it does produce results now, where the
>>>>> previous Current/Reduce would simply hang and not produce any output.
>>>>>
>>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Aljoscha, can you please try the performance test of Current/Reduce
>>>>>> with the InversePreReducer in PR 856? (If you just call sum, it will
>>>>>> use an InversePreReducer.) It would be an interesting test, because
>>>>>> the inverse function optimization really depends on the stream being
>>>>>> ordered, and I think it has the potential of being faster then
>>>>>> Next/Reduce. Especially if the window size is much larger than the
>>>>>> slide size.
>>>>>>
>>>>>> Best regards,
>>>>>> Gabor
>>>>>>
>>>>>>
>>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>:
>>>>>>> I think I'll have to elaborate a bit so I created a proof-of-concept
>>>>>>> implementation of my Ideas and ran some throughput measurements to
>>>>>>> alleviate concerns about performance.
>>>>>>>
>>>>>>> First, though, I want to highlight again why the current approach
>> does
>>>>>> not
>>>>>>> work with out-of-order elements (which, again, occur constantly due
>> to
>>>>>> the
>>>>>>> distributed nature of the system). This is the example I posted
>>>> earlier:
>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan
>> looks
>>>>>> like
>>>>>>> this:
>>>>>>>
>>>>>>> +--+
>>>>>>> | | Source
>>>>>>> +--+
>>>>>>> |
>>>>>>> +-----+
>>>>>>> | |
>>>>>>> | +--+
>>>>>>> | | | Identity Map
>>>>>>> | +--+
>>>>>>> | |
>>>>>>> +-----+
>>>>>>> |
>>>>>>> +--+
>>>>>>> | | Window
>>>>>>> +--+
>>>>>>> |
>>>>>>> |
>>>>>>> +--+
>>>>>>> | | Sink
>>>>>>> +--+
>>>>>>>
>>>>>>> So all it does is pass the elements through an identity map and then
>>>>>> merge
>>>>>>> them again before the window operator. The source emits ascending
>>>>>> integers
>>>>>>> and the window operator has a custom timestamp extractor that uses
>> the
>>>>>>> integer itself as the timestamp and should create windows of size 4
>>>> (that
>>>>>>> is elements with timestamp 0-3 are one window, the next are the
>>>> elements
>>>>>>> with timestamp 4-8, and so on). Since the topology basically doubles
>>>> the
>>>>>>> elements form the source I would expect to get these windows:
>>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3
>>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8
>>>>>>>
>>>>>>> The output is this, however:
>>>>>>> Window: 0, 1, 2, 3,
>>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7,
>>>>>>> Window: 8, 9, 10, 11,
>>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
>>>>>>> Window: 16, 17, 18, 19,
>>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
>>>>>>> Window: 24, 25, 26, 27,
>>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
>>>>>>>
>>>>>>> The reason is that the elements simply arrive out-of-order. Imagine
>>>> what
>>>>>>> would happen if the elements actually arrived with some delay from
>>>>>>> different operations.
>>>>>>>
>>>>>>> Now, on to the performance numbers. The proof-of-concept I created is
>>>>>>> available here:
>>>>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock.
>> The
>>>>>> basic
>>>>>>> idea is that sources assign the current timestamp when emitting
>>>> elements.
>>>>>>> They also periodically emit watermarks that tell us that no elements
>>>> with
>>>>>>> an earlier timestamp will be emitted. The watermarks propagate
>> through
>>>>>> the
>>>>>>> operators. The window operator looks at the timestamp of an element
>>>> and
>>>>>>> puts it into the buffer that corresponds to that window. When the
>>>> window
>>>>>>> operator receives a watermark it will look at the in-flight windows
>>>>>>> (basically the buffers) and emit those windows where the window-end
>> is
>>>>>>> before the watermark.
>>>>>>>
>>>>>>> For measuring throughput I did the following: The source emits tuples
>>>> of
>>>>>>> the form ("tuple", 1) in an infinite loop. The window operator sums
>> up
>>>>>> the
>>>>>>> tuples, thereby counting how many tuples the window operator can
>>>> handle
>>>>>> in
>>>>>>> a given time window. There are two different implementations for the
>>>>>>> summation: 1) simply summing up the values in a mapWindow(), there
>> you
>>>>>> get
>>>>>>> a List of all tuples and simple iterate over it. 2) using sum(1),
>>>> which
>>>>>> is
>>>>>>> implemented as a reduce() (that uses the pre-reducer optimisations).
>>>>>>>
>>>>>>> These are the performance numbers (Current is the current
>>>> implementation,
>>>>>>> Next is my proof-of-concept):
>>>>>>>
>>>>>>> Tumbling (1 sec):
>>>>>>>  - Current/Map: 1.6 mio
>>>>>>>  - Current/Reduce: 2 mio
>>>>>>>  - Next/Map: 2.2 mio
>>>>>>>  - Next/Reduce: 4 mio
>>>>>>>
>>>>>>> Sliding (5 sec, slide 1 sec):
>>>>>>>  - Current/Map: ca 3 mio (fluctuates a lot)
>>>>>>>  - Current/Reduce: No output
>>>>>>>  - Next/Map: ca 4 mio (fluctuates)
>>>>>>>  - Next/Reduce: 10 mio
>>>>>>>
>>>>>>> The Next/Reduce variant can basically scale indefinitely with window
>>>> size
>>>>>>> because the internal state does not rely on the number of elements
>>>> (it is
>>>>>>> just the current sum). The pre-reducer for sliding elements cannot
>>>> handle
>>>>>>> the amount of tuples, it produces no output. For the two Map variants
>>>> the
>>>>>>> performance fluctuates because they always keep all the elements in
>> an
>>>>>>> internal buffer before emission, this seems to tax the garbage
>>>> collector
>>>>>> a
>>>>>>> bit and leads to random pauses.
>>>>>>>
>>>>>>> One thing that should be noted is that I had to disable the
>>>> fake-element
>>>>>>> emission thread, otherwise the Current versions would deadlock.
>>>>>>>
>>>>>>> So, I started working on this because I thought that out-of-order
>>>>>>> processing would be necessary for correctness. And it is certainly,
>>>> But
>>>>>> the
>>>>>>> proof-of-concept also shows that performance can be greatly improved.
>>>>>>>
>>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]>
>> wrote:
>>>>>>>>
>>>>>>>> I agree lets separate these topics from each other so we can get
>>>> faster
>>>>>>>> resolution.
>>>>>>>>
>>>>>>>> There is already a state discussion in the thread we started with
>>>> Paris.
>>>>>>>>
>>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]
>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I agree with supporting out-of-order out of the box :-), even if
>>>> this
>>>>>>> means
>>>>>>>>> a major refactoring. This is the right time to refactor the
>>>> streaming
>>>>>>> API
>>>>>>>>> before we pull it out of beta. I think that this is more important
>>>>>> than
>>>>>>> new
>>>>>>>>> features in the streaming API, which can be prioritized once the
>>>> API
>>>>>> is
>>>>>>> out
>>>>>>>>> of beta (meaning, that IMO this is the right time to stall PRs
>>>> until
>>>>>> we
>>>>>>>>> agree on the design).
>>>>>>>>>
>>>>>>>>> There are three sections in the document: windowing, state, and
>>>> API.
>>>>>> How
>>>>>>>>> convoluted are those with each other? Can we separate the
>>>> discussion
>>>>>> or
>>>>>>> do
>>>>>>>>> we need to discuss those all together? I think part of the
>>>> difficulty
>>>>>> is
>>>>>>>>> that we are discussing three design choices at once.
>>>>>>>>>
>>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <
>>>> [hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Out of order is ubiquitous in the real-world.  Typically, what
>>>>>>> happens is
>>>>>>>>>> that businesses will declare a maximum allowable delay for
>>>> delayed
>>>>>>>>>> transactions and will commit to results when that delay is
>>>> reached.
>>>>>>>>>> Transactions that arrive later than this cutoff are collected
>>>>>>> specially
>>>>>>>>> as
>>>>>>>>>> corrections which are reported/used when possible.
>>>>>>>>>>
>>>>>>>>>> Clearly, ordering can also be violated during processing, but if
>>>> the
>>>>>>> data
>>>>>>>>>> is originally out of order the situation can't be repaired by any
>>>>>>>>> protocol
>>>>>>>>>> fixes that prevent transactions from becoming disordered but has
>>>> to
>>>>>>>>> handled
>>>>>>>>>> at the data level.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <
>>>>>> [hidden email]
>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I also don't like big changes but sometimes they are necessary.
>>>>>> The
>>>>>>>>>> reason
>>>>>>>>>>> why I'm so adamant about out-of-order processing is that
>>>>>>> out-of-order
>>>>>>>>>>> elements are not some exception that occurs once in a while;
>>>> they
>>>>>>> occur
>>>>>>>>>>> constantly in a distributed system. For example, in this:
>>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the
>>>>>> resulting
>>>>>>>>>>> windows
>>>>>>>>>>> are completely bogus because the current windowing system
>>>> assumes
>>>>>>>>>> elements
>>>>>>>>>>> to globally arrive in order, which is simply not true. (The
>>>>>> example
>>>>>>>>> has a
>>>>>>>>>>> source that generates increasing integers. Then these pass
>>>>>> through a
>>>>>>>>> map
>>>>>>>>>>> and are unioned with the original DataStream before a window
>>>>>>> operator.)
>>>>>>>>>>> This simulates elements arriving from different operators at a
>>>>>>>>> windowing
>>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get
>>>> worse
>>>>>>> with
>>>>>>>>>>> higher DOP.
>>>>>>>>>>>
>>>>>>>>>>> What do you mean by costly? As I said, I have a
>>>> proof-of-concept
>>>>>>>>>> windowing
>>>>>>>>>>> operator that can handle out-or-order elements. This is an
>>>> example
>>>>>>>>> using
>>>>>>>>>>> the current Flink API:
>>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8.
>>>>>>>>>>> (It is an infinite source of tuples and a 5 second window
>>>> operator
>>>>>>> that
>>>>>>>>>>> counts the tuples.) The first problem is that this code
>>>> deadlocks
>>>>>>>>> because
>>>>>>>>>>> of the thread that emits fake elements. If I disable the fake
>>>>>>> element
>>>>>>>>>> code
>>>>>>>>>>> it works, but the throughput using my mockup is 4 times higher
>>>> .
>>>>>> The
>>>>>>>>> gap
>>>>>>>>>>> widens dramatically if the window size increases.
>>>>>>>>>>>
>>>>>>>>>>> So, it actually increases performance (unless I'm making a
>>>> mistake
>>>>>>> in
>>>>>>>>> my
>>>>>>>>>>> explorations) and can handle elements that arrive out-of-order
>>>>>>> (which
>>>>>>>>>>> happens basically always in a real-world windowing use-cases).
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]>
>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is that we
>>>>>>> need no
>>>>>>>>>>>> different code for "system time" vs. "event time". It only
>>>>>>> differs in
>>>>>>>>>>> where
>>>>>>>>>>>> the timestamps are assigned.
>>>>>>>>>>>>
>>>>>>>>>>>> The OOP approach also gives you the semantics of total
>>>> ordering
>>>>>>>>> without
>>>>>>>>>>>> imposing merges on the streams.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax <
>>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I agree that there should be multiple alternatives the
>>>> user(!)
>>>>>>> can
>>>>>>>>>>>>> choose from. Partial out-of-order processing works for
>>>>>> many/most
>>>>>>>>>>>>> aggregates. However, if you consider
>>>> Event-Pattern-Matching,
>>>>>>> global
>>>>>>>>>>>>> ordering in necessary (even if the performance penalty
>>>> might
>>>>>> be
>>>>>>>>>> high).
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would also keep "system-time windows" as an alternative
>>>> to
>>>>>>>>> "source
>>>>>>>>>>>>> assigned ts-windows".
>>>>>>>>>>>>>
>>>>>>>>>>>>> It might also be interesting to consider the following
>>>> paper
>>>>>> for
>>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous
>>>>>>> sliding-window
>>>>>>>>>>>>> aggregates"
>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote:
>>>>>>>>>>>>>> Hey
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your
>>>>>>> suggested
>>>>>>>>>>> changes
>>>>>>>>>>>>>> might touch them at some point.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also I still think we should not put everything in the
>>>>>>> Datastream
>>>>>>>>>>>> because
>>>>>>>>>>>>>> it will be a huge mess.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also we need to agree on the out of order processing,
>>>>>> whether
>>>>>>> we
>>>>>>>>>> want
>>>>>>>>>>>> it
>>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another
>>>>>>>>> alternative
>>>>>>>>>>>>>> approach there which fits in the current windowing is to
>>>>>>> filter
>>>>>>>>> out
>>>>>>>>>>> if
>>>>>>>>>>>>>> order events and apply a special handling operator on
>>>> them.
>>>>>>> This
>>>>>>>>>>> would
>>>>>>>>>>>> be
>>>>>>>>>>>>>> fairly lightweight.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My point is that we need to consider some alternative
>>>>>>> solutions.
>>>>>>>>>> And
>>>>>>>>>>> we
>>>>>>>>>>>>>> should not block contributions along the way.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <
>>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The reason I posted this now is that we need to think
>>>> about
>>>>>>> the
>>>>>>>>>> API
>>>>>>>>>>>> and
>>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor
>>>> (inverse
>>>>>>>>> reduce)
>>>>>>>>>>> and
>>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For the windowing, I think that the current model does
>>>> not
>>>>>>> work
>>>>>>>>>> for
>>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing
>>>>>>>>>>> infrastructure
>>>>>>>>>>>>> will
>>>>>>>>>>>>>>> basically have to be redone. Meaning also that any work
>>>> on
>>>>>>> the
>>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes
>>>>>>> useless.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For the API, I proposed to restructure the interactions
>>>>>>> between
>>>>>>>>>> all
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing.
>>>> (See
>>>>>>> API
>>>>>>>>>>> section
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>> the doc I posted.)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <
>>>>>> [hidden email]
>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good
>>>>>> initiative.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I added some comments to the respective sections
>>>> (where I
>>>>>>> didnt
>>>>>>>>>>> fully
>>>>>>>>>>>>>>> agree
>>>>>>>>>>>>>>>> :).).
>>>>>>>>>>>>>>>> At some point I think it would be good to have a public
>>>>>>> hangout
>>>>>>>>>>>> session
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> this, which could make a more dynamic discussion.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Aljoscha Krettek <[hidden email]> ezt írta
>>>> (időpont:
>>>>>>>>> 2015.
>>>>>>>>>>> jún.
>>>>>>>>>>>>>>> 22.,
>>>>>>>>>>>>>>>> H, 21:34):
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>> with people proposing changes to the streaming part I
>>>>>> also
>>>>>>>>>> wanted
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>>>> my hat into the ring. :D
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> During the last few months, while I was getting
>>>>>> acquainted
>>>>>>>>> with
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had
>>>> about
>>>>>>> how
>>>>>>>>>>> things
>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent
>>>>>> shape
>>>>>>>>> now,
>>>>>>>>>>> so
>>>>>>>>>>>>>>>> please
>>>>>>>>>>>>>>>>> have a look if you are interested in this:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This mostly covers:
>>>>>>>>>>>>>>>>>  - Timestamps assigned at sources
>>>>>>>>>>>>>>>>>  - Out-of-order processing of elements in window
>>>>>> operators
>>>>>>>>>>>>>>>>>  - API design
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the
>>>>>> document
>>>>>>> or
>>>>>>>>>> here
>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> mailing list.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce source
>>>>>>>>>> timestamps
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a
>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>> windowing system that is able to process out-of-order
>>>>>>> elements
>>>>>>>>>>>> using a
>>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient
>>>>>>>>>>>>>>> pre-aggregations.)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>>>
>>
>>
>


signature.asc (836 bytes) Download Attachment
12