Hi,
with people proposing changes to the streaming part I also wanted to throw my hat into the ring. :D During the last few months, while I was getting acquainted with the streaming system, I wrote down some thoughts I had about how things could be improved. Hopefully, they are in somewhat coherent shape now, so please have a look if you are interested in this: https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing This mostly covers: - Timestamps assigned at sources - Out-of-order processing of elements in window operators - API design Please let me know what you think. Comment in the document or here in the mailing list. I have a PR in the makings that would introduce source timestamps and watermarks for keeping track of them. I also hacked a proof-of-concept of a windowing system that is able to process out-of-order elements using a FlatMap operator. (It uses panes to perform efficient pre-aggregations.) Cheers, Aljoscha |
Hi Aljoscha,
Thanks for the nice summary, this is a very good initiative. I added some comments to the respective sections (where I didnt fully agree :).). At some point I think it would be good to have a public hangout session on this, which could make a more dynamic discussion. Cheers, Gyula Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. 22., H, 21:34): > Hi, > with people proposing changes to the streaming part I also wanted to throw > my hat into the ring. :D > > During the last few months, while I was getting acquainted with the > streaming system, I wrote down some thoughts I had about how things could > be improved. Hopefully, they are in somewhat coherent shape now, so please > have a look if you are interested in this: > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > This mostly covers: > - Timestamps assigned at sources > - Out-of-order processing of elements in window operators > - API design > > Please let me know what you think. Comment in the document or here in the > mailing list. > > I have a PR in the makings that would introduce source timestamps and > watermarks for keeping track of them. I also hacked a proof-of-concept of a > windowing system that is able to process out-of-order elements using a > FlatMap operator. (It uses panes to perform efficient pre-aggregations.) > > Cheers, > Aljoscha > |
The reason I posted this now is that we need to think about the API and
windowing before proceeding with the PRs of Gabor (inverse reduce) and Gyula (removal of "aggregate" functions on DataStream). For the windowing, I think that the current model does not work for out-of-order processing. Therefore, the whole windowing infrastructure will basically have to be redone. Meaning also that any work on the pre-aggregators or optimizations that we do now becomes useless. For the API, I proposed to restructure the interactions between all the different *DataStream classes and grouping/windowing. (See API section of the doc I posted.) On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote: > Hi Aljoscha, > > Thanks for the nice summary, this is a very good initiative. > > I added some comments to the respective sections (where I didnt fully agree > :).). > At some point I think it would be good to have a public hangout session on > this, which could make a more dynamic discussion. > > Cheers, > Gyula > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. 22., > H, 21:34): > > > Hi, > > with people proposing changes to the streaming part I also wanted to > throw > > my hat into the ring. :D > > > > During the last few months, while I was getting acquainted with the > > streaming system, I wrote down some thoughts I had about how things could > > be improved. Hopefully, they are in somewhat coherent shape now, so > please > > have a look if you are interested in this: > > > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > > > This mostly covers: > > - Timestamps assigned at sources > > - Out-of-order processing of elements in window operators > > - API design > > > > Please let me know what you think. Comment in the document or here in the > > mailing list. > > > > I have a PR in the makings that would introduce source timestamps and > > watermarks for keeping track of them. I also hacked a proof-of-concept > of a > > windowing system that is able to process out-of-order elements using a > > FlatMap operator. (It uses panes to perform efficient pre-aggregations.) > > > > Cheers, > > Aljoscha > > > |
For the windowing designs, we should also have in mind what requirements we
have on the way we keep/store the elements (in external stores, Flink managed memory, ...) On Tue, Jun 23, 2015 at 9:55 AM, Aljoscha Krettek <[hidden email]> wrote: > The reason I posted this now is that we need to think about the API and > windowing before proceeding with the PRs of Gabor (inverse reduce) and > Gyula (removal of "aggregate" functions on DataStream). > > For the windowing, I think that the current model does not work for > out-of-order processing. Therefore, the whole windowing infrastructure will > basically have to be redone. Meaning also that any work on the > pre-aggregators or optimizations that we do now becomes useless. > > For the API, I proposed to restructure the interactions between all the > different *DataStream classes and grouping/windowing. (See API section of > the doc I posted.) > > On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote: > > > Hi Aljoscha, > > > > Thanks for the nice summary, this is a very good initiative. > > > > I added some comments to the respective sections (where I didnt fully > agree > > :).). > > At some point I think it would be good to have a public hangout session > on > > this, which could make a more dynamic discussion. > > > > Cheers, > > Gyula > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. > 22., > > H, 21:34): > > > > > Hi, > > > with people proposing changes to the streaming part I also wanted to > > throw > > > my hat into the ring. :D > > > > > > During the last few months, while I was getting acquainted with the > > > streaming system, I wrote down some thoughts I had about how things > could > > > be improved. Hopefully, they are in somewhat coherent shape now, so > > please > > > have a look if you are interested in this: > > > > > > > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > > > > > This mostly covers: > > > - Timestamps assigned at sources > > > - Out-of-order processing of elements in window operators > > > - API design > > > > > > Please let me know what you think. Comment in the document or here in > the > > > mailing list. > > > > > > I have a PR in the makings that would introduce source timestamps and > > > watermarks for keeping track of them. I also hacked a proof-of-concept > > of a > > > windowing system that is able to process out-of-order elements using a > > > FlatMap operator. (It uses panes to perform efficient > pre-aggregations.) > > > > > > Cheers, > > > Aljoscha > > > > > > |
In reply to this post by Aljoscha Krettek-2
Hey
I think we should not block PRs unnecessarily if your suggested changes might touch them at some point. Also I still think we should not put everything in the Datastream because it will be a huge mess. Also we need to agree on the out of order processing, whether we want it the way you proposed it(which is quite costly). Another alternative approach there which fits in the current windowing is to filter out if order events and apply a special handling operator on them. This would be fairly lightweight. My point is that we need to consider some alternative solutions. And we should not block contributions along the way. Cheers Gyula On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <[hidden email]> wrote: > The reason I posted this now is that we need to think about the API and > windowing before proceeding with the PRs of Gabor (inverse reduce) and > Gyula (removal of "aggregate" functions on DataStream). > > For the windowing, I think that the current model does not work for > out-of-order processing. Therefore, the whole windowing infrastructure will > basically have to be redone. Meaning also that any work on the > pre-aggregators or optimizations that we do now becomes useless. > > For the API, I proposed to restructure the interactions between all the > different *DataStream classes and grouping/windowing. (See API section of > the doc I posted.) > > On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote: > > > Hi Aljoscha, > > > > Thanks for the nice summary, this is a very good initiative. > > > > I added some comments to the respective sections (where I didnt fully > agree > > :).). > > At some point I think it would be good to have a public hangout session > on > > this, which could make a more dynamic discussion. > > > > Cheers, > > Gyula > > > > Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. > 22., > > H, 21:34): > > > > > Hi, > > > with people proposing changes to the streaming part I also wanted to > > throw > > > my hat into the ring. :D > > > > > > During the last few months, while I was getting acquainted with the > > > streaming system, I wrote down some thoughts I had about how things > could > > > be improved. Hopefully, they are in somewhat coherent shape now, so > > please > > > have a look if you are interested in this: > > > > > > > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > > > > > This mostly covers: > > > - Timestamps assigned at sources > > > - Out-of-order processing of elements in window operators > > > - API design > > > > > > Please let me know what you think. Comment in the document or here in > the > > > mailing list. > > > > > > I have a PR in the makings that would introduce source timestamps and > > > watermarks for keeping track of them. I also hacked a proof-of-concept > > of a > > > windowing system that is able to process out-of-order elements using a > > > FlatMap operator. (It uses panes to perform efficient > pre-aggregations.) > > > > > > Cheers, > > > Aljoscha > > > > > > |
I agree that there should be multiple alternatives the user(!) can
choose from. Partial out-of-order processing works for many/most aggregates. However, if you consider Event-Pattern-Matching, global ordering in necessary (even if the performance penalty might be high). I would also keep "system-time windows" as an alternative to "source assigned ts-windows". It might also be interesting to consider the following paper for overlapping windows: "Resource sharing in continuous sliding-window aggregates" > https://dl.acm.org/citation.cfm?id=1316720 -Matthias On 06/23/2015 10:37 AM, Gyula Fóra wrote: > Hey > > I think we should not block PRs unnecessarily if your suggested changes > might touch them at some point. > > Also I still think we should not put everything in the Datastream because > it will be a huge mess. > > Also we need to agree on the out of order processing, whether we want it > the way you proposed it(which is quite costly). Another alternative > approach there which fits in the current windowing is to filter out if > order events and apply a special handling operator on them. This would be > fairly lightweight. > > My point is that we need to consider some alternative solutions. And we > should not block contributions along the way. > > Cheers > Gyula > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <[hidden email]> > wrote: > >> The reason I posted this now is that we need to think about the API and >> windowing before proceeding with the PRs of Gabor (inverse reduce) and >> Gyula (removal of "aggregate" functions on DataStream). >> >> For the windowing, I think that the current model does not work for >> out-of-order processing. Therefore, the whole windowing infrastructure will >> basically have to be redone. Meaning also that any work on the >> pre-aggregators or optimizations that we do now becomes useless. >> >> For the API, I proposed to restructure the interactions between all the >> different *DataStream classes and grouping/windowing. (See API section of >> the doc I posted.) >> >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote: >> >>> Hi Aljoscha, >>> >>> Thanks for the nice summary, this is a very good initiative. >>> >>> I added some comments to the respective sections (where I didnt fully >> agree >>> :).). >>> At some point I think it would be good to have a public hangout session >> on >>> this, which could make a more dynamic discussion. >>> >>> Cheers, >>> Gyula >>> >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. >> 22., >>> H, 21:34): >>> >>>> Hi, >>>> with people proposing changes to the streaming part I also wanted to >>> throw >>>> my hat into the ring. :D >>>> >>>> During the last few months, while I was getting acquainted with the >>>> streaming system, I wrote down some thoughts I had about how things >> could >>>> be improved. Hopefully, they are in somewhat coherent shape now, so >>> please >>>> have a look if you are interested in this: >>>> >>>> >>> >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>> >>>> This mostly covers: >>>> - Timestamps assigned at sources >>>> - Out-of-order processing of elements in window operators >>>> - API design >>>> >>>> Please let me know what you think. Comment in the document or here in >> the >>>> mailing list. >>>> >>>> I have a PR in the makings that would introduce source timestamps and >>>> watermarks for keeping track of them. I also hacked a proof-of-concept >>> of a >>>> windowing system that is able to process out-of-order elements using a >>>> FlatMap operator. (It uses panes to perform efficient >> pre-aggregations.) >>>> >>>> Cheers, >>>> Aljoscha >>>> >>> >> > |
What I like a lot about Aljoscha's proposed design is that we need no
different code for "system time" vs. "event time". It only differs in where the timestamps are assigned. The OOP approach also gives you the semantics of total ordering without imposing merges on the streams. On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < [hidden email]> wrote: > I agree that there should be multiple alternatives the user(!) can > choose from. Partial out-of-order processing works for many/most > aggregates. However, if you consider Event-Pattern-Matching, global > ordering in necessary (even if the performance penalty might be high). > > I would also keep "system-time windows" as an alternative to "source > assigned ts-windows". > > It might also be interesting to consider the following paper for > overlapping windows: "Resource sharing in continuous sliding-window > aggregates" > > > https://dl.acm.org/citation.cfm?id=1316720 > > -Matthias > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: > > Hey > > > > I think we should not block PRs unnecessarily if your suggested changes > > might touch them at some point. > > > > Also I still think we should not put everything in the Datastream because > > it will be a huge mess. > > > > Also we need to agree on the out of order processing, whether we want it > > the way you proposed it(which is quite costly). Another alternative > > approach there which fits in the current windowing is to filter out if > > order events and apply a special handling operator on them. This would be > > fairly lightweight. > > > > My point is that we need to consider some alternative solutions. And we > > should not block contributions along the way. > > > > Cheers > > Gyula > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <[hidden email]> > > wrote: > > > >> The reason I posted this now is that we need to think about the API and > >> windowing before proceeding with the PRs of Gabor (inverse reduce) and > >> Gyula (removal of "aggregate" functions on DataStream). > >> > >> For the windowing, I think that the current model does not work for > >> out-of-order processing. Therefore, the whole windowing infrastructure > will > >> basically have to be redone. Meaning also that any work on the > >> pre-aggregators or optimizations that we do now becomes useless. > >> > >> For the API, I proposed to restructure the interactions between all the > >> different *DataStream classes and grouping/windowing. (See API section > of > >> the doc I posted.) > >> > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote: > >> > >>> Hi Aljoscha, > >>> > >>> Thanks for the nice summary, this is a very good initiative. > >>> > >>> I added some comments to the respective sections (where I didnt fully > >> agree > >>> :).). > >>> At some point I think it would be good to have a public hangout session > >> on > >>> this, which could make a more dynamic discussion. > >>> > >>> Cheers, > >>> Gyula > >>> > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. > >> 22., > >>> H, 21:34): > >>> > >>>> Hi, > >>>> with people proposing changes to the streaming part I also wanted to > >>> throw > >>>> my hat into the ring. :D > >>>> > >>>> During the last few months, while I was getting acquainted with the > >>>> streaming system, I wrote down some thoughts I had about how things > >> could > >>>> be improved. Hopefully, they are in somewhat coherent shape now, so > >>> please > >>>> have a look if you are interested in this: > >>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > >>>> > >>>> This mostly covers: > >>>> - Timestamps assigned at sources > >>>> - Out-of-order processing of elements in window operators > >>>> - API design > >>>> > >>>> Please let me know what you think. Comment in the document or here in > >> the > >>>> mailing list. > >>>> > >>>> I have a PR in the makings that would introduce source timestamps and > >>>> watermarks for keeping track of them. I also hacked a proof-of-concept > >>> of a > >>>> windowing system that is able to process out-of-order elements using a > >>>> FlatMap operator. (It uses panes to perform efficient > >> pre-aggregations.) > >>>> > >>>> Cheers, > >>>> Aljoscha > >>>> > >>> > >> > > > > |
I also don't like big changes but sometimes they are necessary. The reason
why I'm so adamant about out-of-order processing is that out-of-order elements are not some exception that occurs once in a while; they occur constantly in a distributed system. For example, in this: https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting windows are completely bogus because the current windowing system assumes elements to globally arrive in order, which is simply not true. (The example has a source that generates increasing integers. Then these pass through a map and are unioned with the original DataStream before a window operator.) This simulates elements arriving from different operators at a windowing operator. The example is also DOP=1, I imagine this to get worse with higher DOP. What do you mean by costly? As I said, I have a proof-of-concept windowing operator that can handle out-or-order elements. This is an example using the current Flink API: https://gist.github.com/aljoscha/f8dce0691732e344bbe8. (It is an infinite source of tuples and a 5 second window operator that counts the tuples.) The first problem is that this code deadlocks because of the thread that emits fake elements. If I disable the fake element code it works, but the throughput using my mockup is 4 times higher . The gap widens dramatically if the window size increases. So, it actually increases performance (unless I'm making a mistake in my explorations) and can handle elements that arrive out-of-order (which happens basically always in a real-world windowing use-cases). On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote: > What I like a lot about Aljoscha's proposed design is that we need no > different code for "system time" vs. "event time". It only differs in where > the timestamps are assigned. > > The OOP approach also gives you the semantics of total ordering without > imposing merges on the streams. > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > [hidden email]> wrote: > > > I agree that there should be multiple alternatives the user(!) can > > choose from. Partial out-of-order processing works for many/most > > aggregates. However, if you consider Event-Pattern-Matching, global > > ordering in necessary (even if the performance penalty might be high). > > > > I would also keep "system-time windows" as an alternative to "source > > assigned ts-windows". > > > > It might also be interesting to consider the following paper for > > overlapping windows: "Resource sharing in continuous sliding-window > > aggregates" > > > > > https://dl.acm.org/citation.cfm?id=1316720 > > > > -Matthias > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: > > > Hey > > > > > > I think we should not block PRs unnecessarily if your suggested changes > > > might touch them at some point. > > > > > > Also I still think we should not put everything in the Datastream > because > > > it will be a huge mess. > > > > > > Also we need to agree on the out of order processing, whether we want > it > > > the way you proposed it(which is quite costly). Another alternative > > > approach there which fits in the current windowing is to filter out if > > > order events and apply a special handling operator on them. This would > be > > > fairly lightweight. > > > > > > My point is that we need to consider some alternative solutions. And we > > > should not block contributions along the way. > > > > > > Cheers > > > Gyula > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek <[hidden email]> > > > wrote: > > > > > >> The reason I posted this now is that we need to think about the API > and > > >> windowing before proceeding with the PRs of Gabor (inverse reduce) and > > >> Gyula (removal of "aggregate" functions on DataStream). > > >> > > >> For the windowing, I think that the current model does not work for > > >> out-of-order processing. Therefore, the whole windowing infrastructure > > will > > >> basically have to be redone. Meaning also that any work on the > > >> pre-aggregators or optimizations that we do now becomes useless. > > >> > > >> For the API, I proposed to restructure the interactions between all > the > > >> different *DataStream classes and grouping/windowing. (See API section > > of > > >> the doc I posted.) > > >> > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> wrote: > > >> > > >>> Hi Aljoscha, > > >>> > > >>> Thanks for the nice summary, this is a very good initiative. > > >>> > > >>> I added some comments to the respective sections (where I didnt fully > > >> agree > > >>> :).). > > >>> At some point I think it would be good to have a public hangout > session > > >> on > > >>> this, which could make a more dynamic discussion. > > >>> > > >>> Cheers, > > >>> Gyula > > >>> > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. jún. > > >> 22., > > >>> H, 21:34): > > >>> > > >>>> Hi, > > >>>> with people proposing changes to the streaming part I also wanted to > > >>> throw > > >>>> my hat into the ring. :D > > >>>> > > >>>> During the last few months, while I was getting acquainted with the > > >>>> streaming system, I wrote down some thoughts I had about how things > > >> could > > >>>> be improved. Hopefully, they are in somewhat coherent shape now, so > > >>> please > > >>>> have a look if you are interested in this: > > >>>> > > >>>> > > >>> > > >> > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > >>>> > > >>>> This mostly covers: > > >>>> - Timestamps assigned at sources > > >>>> - Out-of-order processing of elements in window operators > > >>>> - API design > > >>>> > > >>>> Please let me know what you think. Comment in the document or here > in > > >> the > > >>>> mailing list. > > >>>> > > >>>> I have a PR in the makings that would introduce source timestamps > and > > >>>> watermarks for keeping track of them. I also hacked a > proof-of-concept > > >>> of a > > >>>> windowing system that is able to process out-of-order elements > using a > > >>>> FlatMap operator. (It uses panes to perform efficient > > >> pre-aggregations.) > > >>>> > > >>>> Cheers, > > >>>> Aljoscha > > >>>> > > >>> > > >> > > > > > > > > |
Out of order is ubiquitous in the real-world. Typically, what happens is
that businesses will declare a maximum allowable delay for delayed transactions and will commit to results when that delay is reached. Transactions that arrive later than this cutoff are collected specially as corrections which are reported/used when possible. Clearly, ordering can also be violated during processing, but if the data is originally out of order the situation can't be repaired by any protocol fixes that prevent transactions from becoming disordered but has to handled at the data level. On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]> wrote: > I also don't like big changes but sometimes they are necessary. The reason > why I'm so adamant about out-of-order processing is that out-of-order > elements are not some exception that occurs once in a while; they occur > constantly in a distributed system. For example, in this: > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting > windows > are completely bogus because the current windowing system assumes elements > to globally arrive in order, which is simply not true. (The example has a > source that generates increasing integers. Then these pass through a map > and are unioned with the original DataStream before a window operator.) > This simulates elements arriving from different operators at a windowing > operator. The example is also DOP=1, I imagine this to get worse with > higher DOP. > > What do you mean by costly? As I said, I have a proof-of-concept windowing > operator that can handle out-or-order elements. This is an example using > the current Flink API: > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > (It is an infinite source of tuples and a 5 second window operator that > counts the tuples.) The first problem is that this code deadlocks because > of the thread that emits fake elements. If I disable the fake element code > it works, but the throughput using my mockup is 4 times higher . The gap > widens dramatically if the window size increases. > > So, it actually increases performance (unless I'm making a mistake in my > explorations) and can handle elements that arrive out-of-order (which > happens basically always in a real-world windowing use-cases). > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote: > > > What I like a lot about Aljoscha's proposed design is that we need no > > different code for "system time" vs. "event time". It only differs in > where > > the timestamps are assigned. > > > > The OOP approach also gives you the semantics of total ordering without > > imposing merges on the streams. > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > > [hidden email]> wrote: > > > > > I agree that there should be multiple alternatives the user(!) can > > > choose from. Partial out-of-order processing works for many/most > > > aggregates. However, if you consider Event-Pattern-Matching, global > > > ordering in necessary (even if the performance penalty might be high). > > > > > > I would also keep "system-time windows" as an alternative to "source > > > assigned ts-windows". > > > > > > It might also be interesting to consider the following paper for > > > overlapping windows: "Resource sharing in continuous sliding-window > > > aggregates" > > > > > > > https://dl.acm.org/citation.cfm?id=1316720 > > > > > > -Matthias > > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: > > > > Hey > > > > > > > > I think we should not block PRs unnecessarily if your suggested > changes > > > > might touch them at some point. > > > > > > > > Also I still think we should not put everything in the Datastream > > because > > > > it will be a huge mess. > > > > > > > > Also we need to agree on the out of order processing, whether we want > > it > > > > the way you proposed it(which is quite costly). Another alternative > > > > approach there which fits in the current windowing is to filter out > if > > > > order events and apply a special handling operator on them. This > would > > be > > > > fairly lightweight. > > > > > > > > My point is that we need to consider some alternative solutions. And > we > > > > should not block contributions along the way. > > > > > > > > Cheers > > > > Gyula > > > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > [hidden email]> > > > > wrote: > > > > > > > >> The reason I posted this now is that we need to think about the API > > and > > > >> windowing before proceeding with the PRs of Gabor (inverse reduce) > and > > > >> Gyula (removal of "aggregate" functions on DataStream). > > > >> > > > >> For the windowing, I think that the current model does not work for > > > >> out-of-order processing. Therefore, the whole windowing > infrastructure > > > will > > > >> basically have to be redone. Meaning also that any work on the > > > >> pre-aggregators or optimizations that we do now becomes useless. > > > >> > > > >> For the API, I proposed to restructure the interactions between all > > the > > > >> different *DataStream classes and grouping/windowing. (See API > section > > > of > > > >> the doc I posted.) > > > >> > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> > wrote: > > > >> > > > >>> Hi Aljoscha, > > > >>> > > > >>> Thanks for the nice summary, this is a very good initiative. > > > >>> > > > >>> I added some comments to the respective sections (where I didnt > fully > > > >> agree > > > >>> :).). > > > >>> At some point I think it would be good to have a public hangout > > session > > > >> on > > > >>> this, which could make a more dynamic discussion. > > > >>> > > > >>> Cheers, > > > >>> Gyula > > > >>> > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > jún. > > > >> 22., > > > >>> H, 21:34): > > > >>> > > > >>>> Hi, > > > >>>> with people proposing changes to the streaming part I also wanted > to > > > >>> throw > > > >>>> my hat into the ring. :D > > > >>>> > > > >>>> During the last few months, while I was getting acquainted with > the > > > >>>> streaming system, I wrote down some thoughts I had about how > things > > > >> could > > > >>>> be improved. Hopefully, they are in somewhat coherent shape now, > so > > > >>> please > > > >>>> have a look if you are interested in this: > > > >>>> > > > >>>> > > > >>> > > > >> > > > > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > > >>>> > > > >>>> This mostly covers: > > > >>>> - Timestamps assigned at sources > > > >>>> - Out-of-order processing of elements in window operators > > > >>>> - API design > > > >>>> > > > >>>> Please let me know what you think. Comment in the document or here > > in > > > >> the > > > >>>> mailing list. > > > >>>> > > > >>>> I have a PR in the makings that would introduce source timestamps > > and > > > >>>> watermarks for keeping track of them. I also hacked a > > proof-of-concept > > > >>> of a > > > >>>> windowing system that is able to process out-of-order elements > > using a > > > >>>> FlatMap operator. (It uses panes to perform efficient > > > >> pre-aggregations.) > > > >>>> > > > >>>> Cheers, > > > >>>> Aljoscha > > > >>>> > > > >>> > > > >> > > > > > > > > > > > > > |
I agree with supporting out-of-order out of the box :-), even if this means
a major refactoring. This is the right time to refactor the streaming API before we pull it out of beta. I think that this is more important than new features in the streaming API, which can be prioritized once the API is out of beta (meaning, that IMO this is the right time to stall PRs until we agree on the design). There are three sections in the document: windowing, state, and API. How convoluted are those with each other? Can we separate the discussion or do we need to discuss those all together? I think part of the difficulty is that we are discussing three design choices at once. On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]> wrote: > Out of order is ubiquitous in the real-world. Typically, what happens is > that businesses will declare a maximum allowable delay for delayed > transactions and will commit to results when that delay is reached. > Transactions that arrive later than this cutoff are collected specially as > corrections which are reported/used when possible. > > Clearly, ordering can also be violated during processing, but if the data > is originally out of order the situation can't be repaired by any protocol > fixes that prevent transactions from becoming disordered but has to handled > at the data level. > > > > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]> > wrote: > > > I also don't like big changes but sometimes they are necessary. The > reason > > why I'm so adamant about out-of-order processing is that out-of-order > > elements are not some exception that occurs once in a while; they occur > > constantly in a distributed system. For example, in this: > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting > > windows > > are completely bogus because the current windowing system assumes > elements > > to globally arrive in order, which is simply not true. (The example has a > > source that generates increasing integers. Then these pass through a map > > and are unioned with the original DataStream before a window operator.) > > This simulates elements arriving from different operators at a windowing > > operator. The example is also DOP=1, I imagine this to get worse with > > higher DOP. > > > > What do you mean by costly? As I said, I have a proof-of-concept > windowing > > operator that can handle out-or-order elements. This is an example using > > the current Flink API: > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > > (It is an infinite source of tuples and a 5 second window operator that > > counts the tuples.) The first problem is that this code deadlocks because > > of the thread that emits fake elements. If I disable the fake element > code > > it works, but the throughput using my mockup is 4 times higher . The gap > > widens dramatically if the window size increases. > > > > So, it actually increases performance (unless I'm making a mistake in my > > explorations) and can handle elements that arrive out-of-order (which > > happens basically always in a real-world windowing use-cases). > > > > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote: > > > > > What I like a lot about Aljoscha's proposed design is that we need no > > > different code for "system time" vs. "event time". It only differs in > > where > > > the timestamps are assigned. > > > > > > The OOP approach also gives you the semantics of total ordering without > > > imposing merges on the streams. > > > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > > > [hidden email]> wrote: > > > > > > > I agree that there should be multiple alternatives the user(!) can > > > > choose from. Partial out-of-order processing works for many/most > > > > aggregates. However, if you consider Event-Pattern-Matching, global > > > > ordering in necessary (even if the performance penalty might be > high). > > > > > > > > I would also keep "system-time windows" as an alternative to "source > > > > assigned ts-windows". > > > > > > > > It might also be interesting to consider the following paper for > > > > overlapping windows: "Resource sharing in continuous sliding-window > > > > aggregates" > > > > > > > > > https://dl.acm.org/citation.cfm?id=1316720 > > > > > > > > -Matthias > > > > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: > > > > > Hey > > > > > > > > > > I think we should not block PRs unnecessarily if your suggested > > changes > > > > > might touch them at some point. > > > > > > > > > > Also I still think we should not put everything in the Datastream > > > because > > > > > it will be a huge mess. > > > > > > > > > > Also we need to agree on the out of order processing, whether we > want > > > it > > > > > the way you proposed it(which is quite costly). Another alternative > > > > > approach there which fits in the current windowing is to filter out > > if > > > > > order events and apply a special handling operator on them. This > > would > > > be > > > > > fairly lightweight. > > > > > > > > > > My point is that we need to consider some alternative solutions. > And > > we > > > > > should not block contributions along the way. > > > > > > > > > > Cheers > > > > > Gyula > > > > > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > > [hidden email]> > > > > > wrote: > > > > > > > > > >> The reason I posted this now is that we need to think about the > API > > > and > > > > >> windowing before proceeding with the PRs of Gabor (inverse reduce) > > and > > > > >> Gyula (removal of "aggregate" functions on DataStream). > > > > >> > > > > >> For the windowing, I think that the current model does not work > for > > > > >> out-of-order processing. Therefore, the whole windowing > > infrastructure > > > > will > > > > >> basically have to be redone. Meaning also that any work on the > > > > >> pre-aggregators or optimizations that we do now becomes useless. > > > > >> > > > > >> For the API, I proposed to restructure the interactions between > all > > > the > > > > >> different *DataStream classes and grouping/windowing. (See API > > section > > > > of > > > > >> the doc I posted.) > > > > >> > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> > > wrote: > > > > >> > > > > >>> Hi Aljoscha, > > > > >>> > > > > >>> Thanks for the nice summary, this is a very good initiative. > > > > >>> > > > > >>> I added some comments to the respective sections (where I didnt > > fully > > > > >> agree > > > > >>> :).). > > > > >>> At some point I think it would be good to have a public hangout > > > session > > > > >> on > > > > >>> this, which could make a more dynamic discussion. > > > > >>> > > > > >>> Cheers, > > > > >>> Gyula > > > > >>> > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. > > jún. > > > > >> 22., > > > > >>> H, 21:34): > > > > >>> > > > > >>>> Hi, > > > > >>>> with people proposing changes to the streaming part I also > wanted > > to > > > > >>> throw > > > > >>>> my hat into the ring. :D > > > > >>>> > > > > >>>> During the last few months, while I was getting acquainted with > > the > > > > >>>> streaming system, I wrote down some thoughts I had about how > > things > > > > >> could > > > > >>>> be improved. Hopefully, they are in somewhat coherent shape now, > > so > > > > >>> please > > > > >>>> have a look if you are interested in this: > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > > > >>>> > > > > >>>> This mostly covers: > > > > >>>> - Timestamps assigned at sources > > > > >>>> - Out-of-order processing of elements in window operators > > > > >>>> - API design > > > > >>>> > > > > >>>> Please let me know what you think. Comment in the document or > here > > > in > > > > >> the > > > > >>>> mailing list. > > > > >>>> > > > > >>>> I have a PR in the makings that would introduce source > timestamps > > > and > > > > >>>> watermarks for keeping track of them. I also hacked a > > > proof-of-concept > > > > >>> of a > > > > >>>> windowing system that is able to process out-of-order elements > > > using a > > > > >>>> FlatMap operator. (It uses panes to perform efficient > > > > >> pre-aggregations.) > > > > >>>> > > > > >>>> Cheers, > > > > >>>> Aljoscha > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > |
I agree lets separate these topics from each other so we can get faster
resolution. There is already a state discussion in the thread we started with Paris. On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> wrote: > I agree with supporting out-of-order out of the box :-), even if this means > a major refactoring. This is the right time to refactor the streaming API > before we pull it out of beta. I think that this is more important than new > features in the streaming API, which can be prioritized once the API is out > of beta (meaning, that IMO this is the right time to stall PRs until we > agree on the design). > > There are three sections in the document: windowing, state, and API. How > convoluted are those with each other? Can we separate the discussion or do > we need to discuss those all together? I think part of the difficulty is > that we are discussing three design choices at once. > > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]> > wrote: > > > Out of order is ubiquitous in the real-world. Typically, what happens is > > that businesses will declare a maximum allowable delay for delayed > > transactions and will commit to results when that delay is reached. > > Transactions that arrive later than this cutoff are collected specially > as > > corrections which are reported/used when possible. > > > > Clearly, ordering can also be violated during processing, but if the data > > is originally out of order the situation can't be repaired by any > protocol > > fixes that prevent transactions from becoming disordered but has to > handled > > at the data level. > > > > > > > > > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > I also don't like big changes but sometimes they are necessary. The > > reason > > > why I'm so adamant about out-of-order processing is that out-of-order > > > elements are not some exception that occurs once in a while; they occur > > > constantly in a distributed system. For example, in this: > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting > > > windows > > > are completely bogus because the current windowing system assumes > > elements > > > to globally arrive in order, which is simply not true. (The example > has a > > > source that generates increasing integers. Then these pass through a > map > > > and are unioned with the original DataStream before a window operator.) > > > This simulates elements arriving from different operators at a > windowing > > > operator. The example is also DOP=1, I imagine this to get worse with > > > higher DOP. > > > > > > What do you mean by costly? As I said, I have a proof-of-concept > > windowing > > > operator that can handle out-or-order elements. This is an example > using > > > the current Flink API: > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > > > (It is an infinite source of tuples and a 5 second window operator that > > > counts the tuples.) The first problem is that this code deadlocks > because > > > of the thread that emits fake elements. If I disable the fake element > > code > > > it works, but the throughput using my mockup is 4 times higher . The > gap > > > widens dramatically if the window size increases. > > > > > > So, it actually increases performance (unless I'm making a mistake in > my > > > explorations) and can handle elements that arrive out-of-order (which > > > happens basically always in a real-world windowing use-cases). > > > > > > > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote: > > > > > > > What I like a lot about Aljoscha's proposed design is that we need no > > > > different code for "system time" vs. "event time". It only differs in > > > where > > > > the timestamps are assigned. > > > > > > > > The OOP approach also gives you the semantics of total ordering > without > > > > imposing merges on the streams. > > > > > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > > > > [hidden email]> wrote: > > > > > > > > > I agree that there should be multiple alternatives the user(!) can > > > > > choose from. Partial out-of-order processing works for many/most > > > > > aggregates. However, if you consider Event-Pattern-Matching, global > > > > > ordering in necessary (even if the performance penalty might be > > high). > > > > > > > > > > I would also keep "system-time windows" as an alternative to > "source > > > > > assigned ts-windows". > > > > > > > > > > It might also be interesting to consider the following paper for > > > > > overlapping windows: "Resource sharing in continuous sliding-window > > > > > aggregates" > > > > > > > > > > > https://dl.acm.org/citation.cfm?id=1316720 > > > > > > > > > > -Matthias > > > > > > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: > > > > > > Hey > > > > > > > > > > > > I think we should not block PRs unnecessarily if your suggested > > > changes > > > > > > might touch them at some point. > > > > > > > > > > > > Also I still think we should not put everything in the Datastream > > > > because > > > > > > it will be a huge mess. > > > > > > > > > > > > Also we need to agree on the out of order processing, whether we > > want > > > > it > > > > > > the way you proposed it(which is quite costly). Another > alternative > > > > > > approach there which fits in the current windowing is to filter > out > > > if > > > > > > order events and apply a special handling operator on them. This > > > would > > > > be > > > > > > fairly lightweight. > > > > > > > > > > > > My point is that we need to consider some alternative solutions. > > And > > > we > > > > > > should not block contributions along the way. > > > > > > > > > > > > Cheers > > > > > > Gyula > > > > > > > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > >> The reason I posted this now is that we need to think about the > > API > > > > and > > > > > >> windowing before proceeding with the PRs of Gabor (inverse > reduce) > > > and > > > > > >> Gyula (removal of "aggregate" functions on DataStream). > > > > > >> > > > > > >> For the windowing, I think that the current model does not work > > for > > > > > >> out-of-order processing. Therefore, the whole windowing > > > infrastructure > > > > > will > > > > > >> basically have to be redone. Meaning also that any work on the > > > > > >> pre-aggregators or optimizations that we do now becomes useless. > > > > > >> > > > > > >> For the API, I proposed to restructure the interactions between > > all > > > > the > > > > > >> different *DataStream classes and grouping/windowing. (See API > > > section > > > > > of > > > > > >> the doc I posted.) > > > > > >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email]> > > > wrote: > > > > > >> > > > > > >>> Hi Aljoscha, > > > > > >>> > > > > > >>> Thanks for the nice summary, this is a very good initiative. > > > > > >>> > > > > > >>> I added some comments to the respective sections (where I didnt > > > fully > > > > > >> agree > > > > > >>> :).). > > > > > >>> At some point I think it would be good to have a public hangout > > > > session > > > > > >> on > > > > > >>> this, which could make a more dynamic discussion. > > > > > >>> > > > > > >>> Cheers, > > > > > >>> Gyula > > > > > >>> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: > 2015. > > > jún. > > > > > >> 22., > > > > > >>> H, 21:34): > > > > > >>> > > > > > >>>> Hi, > > > > > >>>> with people proposing changes to the streaming part I also > > wanted > > > to > > > > > >>> throw > > > > > >>>> my hat into the ring. :D > > > > > >>>> > > > > > >>>> During the last few months, while I was getting acquainted > with > > > the > > > > > >>>> streaming system, I wrote down some thoughts I had about how > > > things > > > > > >> could > > > > > >>>> be improved. Hopefully, they are in somewhat coherent shape > now, > > > so > > > > > >>> please > > > > > >>>> have a look if you are interested in this: > > > > > >>>> > > > > > >>>> > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > > > > > >>>> > > > > > >>>> This mostly covers: > > > > > >>>> - Timestamps assigned at sources > > > > > >>>> - Out-of-order processing of elements in window operators > > > > > >>>> - API design > > > > > >>>> > > > > > >>>> Please let me know what you think. Comment in the document or > > here > > > > in > > > > > >> the > > > > > >>>> mailing list. > > > > > >>>> > > > > > >>>> I have a PR in the makings that would introduce source > > timestamps > > > > and > > > > > >>>> watermarks for keeping track of them. I also hacked a > > > > proof-of-concept > > > > > >>> of a > > > > > >>>> windowing system that is able to process out-of-order elements > > > > using a > > > > > >>>> FlatMap operator. (It uses panes to perform efficient > > > > > >> pre-aggregations.) > > > > > >>>> > > > > > >>>> Cheers, > > > > > >>>> Aljoscha > > > > > >>>> > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > |
I think I'll have to elaborate a bit so I created a proof-of-concept
implementation of my Ideas and ran some throughput measurements to alleviate concerns about performance. First, though, I want to highlight again why the current approach does not work with out-of-order elements (which, again, occur constantly due to the distributed nature of the system). This is the example I posted earlier: https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like this: +--+ | | Source +--+ | +-----+ | | | +--+ | | | Identity Map | +--+ | | +-----+ | +--+ | | Window +--+ | | +--+ | | Sink +--+ So all it does is pass the elements through an identity map and then merge them again before the window operator. The source emits ascending integers and the window operator has a custom timestamp extractor that uses the integer itself as the timestamp and should create windows of size 4 (that is elements with timestamp 0-3 are one window, the next are the elements with timestamp 4-8, and so on). Since the topology basically doubles the elements form the source I would expect to get these windows: Window: 0, 0, 1, 1, 2, 2, 3, 3 Window: 4, 4, 6, 6, 7, 7, 8, 8 The output is this, however: Window: 0, 1, 2, 3, Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, Window: 8, 9, 10, 11, Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, Window: 16, 17, 18, 19, Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, Window: 24, 25, 26, 27, Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, The reason is that the elements simply arrive out-of-order. Imagine what would happen if the elements actually arrived with some delay from different operations. Now, on to the performance numbers. The proof-of-concept I created is available here: https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic idea is that sources assign the current timestamp when emitting elements. They also periodically emit watermarks that tell us that no elements with an earlier timestamp will be emitted. The watermarks propagate through the operators. The window operator looks at the timestamp of an element and puts it into the buffer that corresponds to that window. When the window operator receives a watermark it will look at the in-flight windows (basically the buffers) and emit those windows where the window-end is before the watermark. For measuring throughput I did the following: The source emits tuples of the form ("tuple", 1) in an infinite loop. The window operator sums up the tuples, thereby counting how many tuples the window operator can handle in a given time window. There are two different implementations for the summation: 1) simply summing up the values in a mapWindow(), there you get a List of all tuples and simple iterate over it. 2) using sum(1), which is implemented as a reduce() (that uses the pre-reducer optimisations). These are the performance numbers (Current is the current implementation, Next is my proof-of-concept): Tumbling (1 sec): - Current/Map: 1.6 mio - Current/Reduce: 2 mio - Next/Map: 2.2 mio - Next/Reduce: 4 mio Sliding (5 sec, slide 1 sec): - Current/Map: ca 3 mio (fluctuates a lot) - Current/Reduce: No output - Next/Map: ca 4 mio (fluctuates) - Next/Reduce: 10 mio The Next/Reduce variant can basically scale indefinitely with window size because the internal state does not rely on the number of elements (it is just the current sum). The pre-reducer for sliding elements cannot handle the amount of tuples, it produces no output. For the two Map variants the performance fluctuates because they always keep all the elements in an internal buffer before emission, this seems to tax the garbage collector a bit and leads to random pauses. One thing that should be noted is that I had to disable the fake-element emission thread, otherwise the Current versions would deadlock. So, I started working on this because I thought that out-of-order processing would be necessary for correctness. And it is certainly, But the proof-of-concept also shows that performance can be greatly improved. On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote: > > I agree lets separate these topics from each other so we can get faster > resolution. > > There is already a state discussion in the thread we started with Paris. > > On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> wrote: > > > I agree with supporting out-of-order out of the box :-), even if this means > > a major refactoring. This is the right time to refactor the streaming API > > before we pull it out of beta. I think that this is more important than new > > features in the streaming API, which can be prioritized once the API is out > > of beta (meaning, that IMO this is the right time to stall PRs until we > > agree on the design). > > > > There are three sections in the document: windowing, state, and API. How > > convoluted are those with each other? Can we separate the discussion or do > > we need to discuss those all together? I think part of the difficulty is > > that we are discussing three design choices at once. > > > > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]> > > wrote: > > > > > Out of order is ubiquitous in the real-world. Typically, what happens is > > > that businesses will declare a maximum allowable delay for delayed > > > transactions and will commit to results when that delay is reached. > > > Transactions that arrive later than this cutoff are collected specially > > as > > > corrections which are reported/used when possible. > > > > > > Clearly, ordering can also be violated during processing, but if the data > > > is originally out of order the situation can't be repaired by any > > protocol > > > fixes that prevent transactions from becoming disordered but has to > > handled > > > at the data level. > > > > > > > > > > > > > > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email] > > > > wrote: > > > > > > > I also don't like big changes but sometimes they are necessary. The > > > reason > > > > why I'm so adamant about out-of-order processing is that > > > > elements are not some exception that occurs once in a while; they occur > > > > constantly in a distributed system. For example, in this: > > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting > > > > windows > > > > are completely bogus because the current windowing system assumes > > > elements > > > > to globally arrive in order, which is simply not true. (The example > > has a > > > > source that generates increasing integers. Then these pass through a > > map > > > > and are unioned with the original DataStream before a window > > > > This simulates elements arriving from different operators at a > > windowing > > > > operator. The example is also DOP=1, I imagine this to get worse with > > > > higher DOP. > > > > > > > > What do you mean by costly? As I said, I have a proof-of-concept > > > windowing > > > > operator that can handle out-or-order elements. This is an example > > using > > > > the current Flink API: > > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > > > > (It is an infinite source of tuples and a 5 second window operator that > > > > counts the tuples.) The first problem is that this code deadlocks > > because > > > > of the thread that emits fake elements. If I disable the fake element > > > code > > > > it works, but the throughput using my mockup is 4 times higher . The > > gap > > > > widens dramatically if the window size increases. > > > > > > > > So, it actually increases performance (unless I'm making a mistake in > > my > > > > explorations) and can handle elements that arrive out-of-order (which > > > > happens basically always in a real-world windowing use-cases). > > > > > > > > > > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote: > > > > > > > > > What I like a lot about Aljoscha's proposed design is that we need no > > > > > different code for "system time" vs. "event time". It only differs in > > > > where > > > > > the timestamps are assigned. > > > > > > > > > > The OOP approach also gives you the semantics of total ordering > > without > > > > > imposing merges on the streams. > > > > > > > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > > > > > [hidden email]> wrote: > > > > > > > > > > > I agree that there should be multiple alternatives the user(!) > > > > > > choose from. Partial out-of-order processing works for many/most > > > > > > aggregates. However, if you consider Event-Pattern-Matching, global > > > > > > ordering in necessary (even if the performance penalty might be > > > high). > > > > > > > > > > > > I would also keep "system-time windows" as an alternative to > > "source > > > > > > assigned ts-windows". > > > > > > > > > > > > It might also be interesting to consider the following paper for > > > > > > overlapping windows: "Resource sharing in continuous sliding-window > > > > > > aggregates" > > > > > > > > > > > > > https://dl.acm.org/citation.cfm?id=1316720 > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: > > > > > > > Hey > > > > > > > > > > > > > > I think we should not block PRs unnecessarily if your > > > > changes > > > > > > > might touch them at some point. > > > > > > > > > > > > > > Also I still think we should not put everything in the Datastream > > > > > because > > > > > > > it will be a huge mess. > > > > > > > > > > > > > > Also we need to agree on the out of order processing, whether we > > > want > > > > > it > > > > > > > the way you proposed it(which is quite costly). Another > > alternative > > > > > > > approach there which fits in the current windowing is to filter > > out > > > > if > > > > > > > order events and apply a special handling operator on them. This > > > > would > > > > > be > > > > > > > fairly lightweight. > > > > > > > > > > > > > > My point is that we need to consider some alternative solutions. > > > And > > > > we > > > > > > > should not block contributions along the way. > > > > > > > > > > > > > > Cheers > > > > > > > Gyula > > > > > > > > > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > >> The reason I posted this now is that we need to think about > > > API > > > > > and > > > > > > >> windowing before proceeding with the PRs of Gabor (inverse > > reduce) > > > > and > > > > > > >> Gyula (removal of "aggregate" functions on DataStream). > > > > > > >> > > > > > > >> For the windowing, I think that the current model does not work > > > for > > > > > > >> out-of-order processing. Therefore, the whole windowing > > > > infrastructure > > > > > > will > > > > > > >> basically have to be redone. Meaning also that any work on the > > > > > > >> pre-aggregators or optimizations that we do now becomes useless. > > > > > > >> > > > > > > >> For the API, I proposed to restructure the interactions between > > > all > > > > > the > > > > > > >> different *DataStream classes and grouping/windowing. (See API > > > > section > > > > > > of > > > > > > >> the doc I posted.) > > > > > > >> > > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email] > > > > > wrote: > > > > > > >> > > > > > > >>> Hi Aljoscha, > > > > > > >>> > > > > > > >>> Thanks for the nice summary, this is a very good initiative. > > > > > > >>> > > > > > > >>> I added some comments to the respective sections (where I > > > > fully > > > > > > >> agree > > > > > > >>> :).). > > > > > > >>> At some point I think it would be good to have a public hangout > > > > > session > > > > > > >> on > > > > > > >>> this, which could make a more dynamic discussion. > > > > > > >>> > > > > > > >>> Cheers, > > > > > > >>> Gyula > > > > > > >>> > > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: > > 2015. > > > > jún. > > > > > > >> 22., > > > > > > >>> H, 21:34): > > > > > > >>> > > > > > > >>>> Hi, > > > > > > >>>> with people proposing changes to the streaming part I also > > > wanted > > > > to > > > > > > >>> throw > > > > > > >>>> my hat into the ring. :D > > > > > > >>>> > > > > > > >>>> During the last few months, while I was getting acquainted > > with > > > > the > > > > > > >>>> streaming system, I wrote down some thoughts I had about > > > > things > > > > > > >> could > > > > > > >>>> be improved. Hopefully, they are in somewhat coherent shape > > now, > > > > so > > > > > > >>> please > > > > > > >>>> have a look if you are interested in this: > > > > > > >>>> > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >>>> > > > > > > >>>> This mostly covers: > > > > > > >>>> - Timestamps assigned at sources > > > > > > >>>> - Out-of-order processing of elements in window operators > > > > > > >>>> - API design > > > > > > >>>> > > > > > > >>>> Please let me know what you think. Comment in the document or > > > here > > > > > in > > > > > > >> the > > > > > > >>>> mailing list. > > > > > > >>>> > > > > > > >>>> I have a PR in the makings that would introduce source > > > timestamps > > > > > and > > > > > > >>>> watermarks for keeping track of them. I also hacked a > > > > > proof-of-concept > > > > > > >>> of a > > > > > > >>>> windowing system that is able to process out-of-order > > > > > using a > > > > > > >>>> FlatMap operator. (It uses panes to perform efficient > > > > > > >> pre-aggregations.) > > > > > > >>>> > > > > > > >>>> Cheers, > > > > > > >>>> Aljoscha > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Thanks for writing this up and comparing to the current implementation. It's great to see that your mockup indicates correct/expected behaviour *and* better performance. :-)
Regarding the results for the current mechanism: does this problem affects all window operators? – Ufuk On 25 Jun 2015, at 11:36, Aljoscha Krettek <[hidden email]> wrote: > I think I'll have to elaborate a bit so I created a proof-of-concept > implementation of my Ideas and ran some throughput measurements to > alleviate concerns about performance. > > First, though, I want to highlight again why the current approach does not > work with out-of-order elements (which, again, occur constantly due to the > distributed nature of the system). This is the example I posted earlier: > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like > this: > > +--+ > | | Source > +--+ > | > +-----+ > | | > | +--+ > | | | Identity Map > | +--+ > | | > +-----+ > | > +--+ > | | Window > +--+ > | > | > +--+ > | | Sink > +--+ > > So all it does is pass the elements through an identity map and then merge > them again before the window operator. The source emits ascending integers > and the window operator has a custom timestamp extractor that uses the > integer itself as the timestamp and should create windows of size 4 (that > is elements with timestamp 0-3 are one window, the next are the elements > with timestamp 4-8, and so on). Since the topology basically doubles the > elements form the source I would expect to get these windows: > Window: 0, 0, 1, 1, 2, 2, 3, 3 > Window: 4, 4, 6, 6, 7, 7, 8, 8 > > The output is this, however: > Window: 0, 1, 2, 3, > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > Window: 8, 9, 10, 11, > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > Window: 16, 17, 18, 19, > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > Window: 24, 25, 26, 27, > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > > The reason is that the elements simply arrive out-of-order. Imagine what > would happen if the elements actually arrived with some delay from > different operations. > > Now, on to the performance numbers. The proof-of-concept I created is > available here: > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic > idea is that sources assign the current timestamp when emitting elements. > They also periodically emit watermarks that tell us that no elements with > an earlier timestamp will be emitted. The watermarks propagate through the > operators. The window operator looks at the timestamp of an element and > puts it into the buffer that corresponds to that window. When the window > operator receives a watermark it will look at the in-flight windows > (basically the buffers) and emit those windows where the window-end is > before the watermark. > > For measuring throughput I did the following: The source emits tuples of > the form ("tuple", 1) in an infinite loop. The window operator sums up the > tuples, thereby counting how many tuples the window operator can handle in > a given time window. There are two different implementations for the > summation: 1) simply summing up the values in a mapWindow(), there you get > a List of all tuples and simple iterate over it. 2) using sum(1), which is > implemented as a reduce() (that uses the pre-reducer optimisations). > > These are the performance numbers (Current is the current implementation, > Next is my proof-of-concept): > > Tumbling (1 sec): > - Current/Map: 1.6 mio > - Current/Reduce: 2 mio > - Next/Map: 2.2 mio > - Next/Reduce: 4 mio > > Sliding (5 sec, slide 1 sec): > - Current/Map: ca 3 mio (fluctuates a lot) > - Current/Reduce: No output > - Next/Map: ca 4 mio (fluctuates) > - Next/Reduce: 10 mio > > The Next/Reduce variant can basically scale indefinitely with window size > because the internal state does not rely on the number of elements (it is > just the current sum). The pre-reducer for sliding elements cannot handle > the amount of tuples, it produces no output. For the two Map variants the > performance fluctuates because they always keep all the elements in an > internal buffer before emission, this seems to tax the garbage collector a > bit and leads to random pauses. > > One thing that should be noted is that I had to disable the fake-element > emission thread, otherwise the Current versions would deadlock. > > So, I started working on this because I thought that out-of-order > processing would be necessary for correctness. And it is certainly, But the > proof-of-concept also shows that performance can be greatly improved. > > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote: >> >> I agree lets separate these topics from each other so we can get faster >> resolution. >> >> There is already a state discussion in the thread we started with Paris. >> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> > wrote: >> >>> I agree with supporting out-of-order out of the box :-), even if this > means >>> a major refactoring. This is the right time to refactor the streaming > API >>> before we pull it out of beta. I think that this is more important than > new >>> features in the streaming API, which can be prioritized once the API is > out >>> of beta (meaning, that IMO this is the right time to stall PRs until we >>> agree on the design). >>> >>> There are three sections in the document: windowing, state, and API. How >>> convoluted are those with each other? Can we separate the discussion or > do >>> we need to discuss those all together? I think part of the difficulty is >>> that we are discussing three design choices at once. >>> >>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]> >>> wrote: >>> >>>> Out of order is ubiquitous in the real-world. Typically, what > happens is >>>> that businesses will declare a maximum allowable delay for delayed >>>> transactions and will commit to results when that delay is reached. >>>> Transactions that arrive later than this cutoff are collected > specially >>> as >>>> corrections which are reported/used when possible. >>>> >>>> Clearly, ordering can also be violated during processing, but if the > data >>>> is originally out of order the situation can't be repaired by any >>> protocol >>>> fixes that prevent transactions from becoming disordered but has to >>> handled >>>> at the data level. >>>> >>>> >>>> >>>> >>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email] >> >>>> wrote: >>>> >>>>> I also don't like big changes but sometimes they are necessary. The >>>> reason >>>>> why I'm so adamant about out-of-order processing is that > out-of-order >>>>> elements are not some exception that occurs once in a while; they > occur >>>>> constantly in a distributed system. For example, in this: >>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting >>>>> windows >>>>> are completely bogus because the current windowing system assumes >>>> elements >>>>> to globally arrive in order, which is simply not true. (The example >>> has a >>>>> source that generates increasing integers. Then these pass through a >>> map >>>>> and are unioned with the original DataStream before a window > operator.) >>>>> This simulates elements arriving from different operators at a >>> windowing >>>>> operator. The example is also DOP=1, I imagine this to get worse > with >>>>> higher DOP. >>>>> >>>>> What do you mean by costly? As I said, I have a proof-of-concept >>>> windowing >>>>> operator that can handle out-or-order elements. This is an example >>> using >>>>> the current Flink API: >>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >>>>> (It is an infinite source of tuples and a 5 second window operator > that >>>>> counts the tuples.) The first problem is that this code deadlocks >>> because >>>>> of the thread that emits fake elements. If I disable the fake > element >>>> code >>>>> it works, but the throughput using my mockup is 4 times higher . The >>> gap >>>>> widens dramatically if the window size increases. >>>>> >>>>> So, it actually increases performance (unless I'm making a mistake > in >>> my >>>>> explorations) and can handle elements that arrive out-of-order > (which >>>>> happens basically always in a real-world windowing use-cases). >>>>> >>>>> >>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote: >>>>> >>>>>> What I like a lot about Aljoscha's proposed design is that we > need no >>>>>> different code for "system time" vs. "event time". It only > differs in >>>>> where >>>>>> the timestamps are assigned. >>>>>> >>>>>> The OOP approach also gives you the semantics of total ordering >>> without >>>>>> imposing merges on the streams. >>>>>> >>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >>>>>> [hidden email]> wrote: >>>>>> >>>>>>> I agree that there should be multiple alternatives the user(!) > can >>>>>>> choose from. Partial out-of-order processing works for many/most >>>>>>> aggregates. However, if you consider Event-Pattern-Matching, > global >>>>>>> ordering in necessary (even if the performance penalty might be >>>> high). >>>>>>> >>>>>>> I would also keep "system-time windows" as an alternative to >>> "source >>>>>>> assigned ts-windows". >>>>>>> >>>>>>> It might also be interesting to consider the following paper for >>>>>>> overlapping windows: "Resource sharing in continuous > sliding-window >>>>>>> aggregates" >>>>>>> >>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>> Hey >>>>>>>> >>>>>>>> I think we should not block PRs unnecessarily if your > suggested >>>>> changes >>>>>>>> might touch them at some point. >>>>>>>> >>>>>>>> Also I still think we should not put everything in the > Datastream >>>>>> because >>>>>>>> it will be a huge mess. >>>>>>>> >>>>>>>> Also we need to agree on the out of order processing, whether > we >>>> want >>>>>> it >>>>>>>> the way you proposed it(which is quite costly). Another >>> alternative >>>>>>>> approach there which fits in the current windowing is to > filter >>> out >>>>> if >>>>>>>> order events and apply a special handling operator on them. > This >>>>> would >>>>>> be >>>>>>>> fairly lightweight. >>>>>>>> >>>>>>>> My point is that we need to consider some alternative > solutions. >>>> And >>>>> we >>>>>>>> should not block contributions along the way. >>>>>>>> >>>>>>>> Cheers >>>>>>>> Gyula >>>>>>>> >>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >>>>> [hidden email]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The reason I posted this now is that we need to think about > the >>>> API >>>>>> and >>>>>>>>> windowing before proceeding with the PRs of Gabor (inverse >>> reduce) >>>>> and >>>>>>>>> Gyula (removal of "aggregate" functions on DataStream). >>>>>>>>> >>>>>>>>> For the windowing, I think that the current model does not > work >>>> for >>>>>>>>> out-of-order processing. Therefore, the whole windowing >>>>> infrastructure >>>>>>> will >>>>>>>>> basically have to be redone. Meaning also that any work on > the >>>>>>>>> pre-aggregators or optimizations that we do now becomes > useless. >>>>>>>>> >>>>>>>>> For the API, I proposed to restructure the interactions > between >>>> all >>>>>> the >>>>>>>>> different *DataStream classes and grouping/windowing. (See > API >>>>> section >>>>>>> of >>>>>>>>> the doc I posted.) >>>>>>>>> >>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email] >> >>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Aljoscha, >>>>>>>>>> >>>>>>>>>> Thanks for the nice summary, this is a very good initiative. >>>>>>>>>> >>>>>>>>>> I added some comments to the respective sections (where I > didnt >>>>> fully >>>>>>>>> agree >>>>>>>>>> :).). >>>>>>>>>> At some point I think it would be good to have a public > hangout >>>>>> session >>>>>>>>> on >>>>>>>>>> this, which could make a more dynamic discussion. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Gyula >>>>>>>>>> >>>>>>>>>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: >>> 2015. >>>>> jún. >>>>>>>>> 22., >>>>>>>>>> H, 21:34): >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> with people proposing changes to the streaming part I also >>>> wanted >>>>> to >>>>>>>>>> throw >>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>> >>>>>>>>>>> During the last few months, while I was getting acquainted >>> with >>>>> the >>>>>>>>>>> streaming system, I wrote down some thoughts I had about > how >>>>> things >>>>>>>>> could >>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent shape >>> now, >>>>> so >>>>>>>>>> please >>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>> >>>>>>>>>>> This mostly covers: >>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>> - Out-of-order processing of elements in window operators >>>>>>>>>>> - API design >>>>>>>>>>> >>>>>>>>>>> Please let me know what you think. Comment in the document > or >>>> here >>>>>> in >>>>>>>>> the >>>>>>>>>>> mailing list. >>>>>>>>>>> >>>>>>>>>>> I have a PR in the makings that would introduce source >>>> timestamps >>>>>> and >>>>>>>>>>> watermarks for keeping track of them. I also hacked a >>>>>> proof-of-concept >>>>>>>>>> of a >>>>>>>>>>> windowing system that is able to process out-of-order > elements >>>>>> using a >>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient >>>>>>>>> pre-aggregations.) >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Aljoscha >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> |
In reply to this post by Aljoscha Krettek-2
Hello,
Aljoscha, can you please try the performance test of Current/Reduce with the InversePreReducer in PR 856? (If you just call sum, it will use an InversePreReducer.) It would be an interesting test, because the inverse function optimization really depends on the stream being ordered, and I think it has the potential of being faster then Next/Reduce. Especially if the window size is much larger than the slide size. Best regards, Gabor 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>: > I think I'll have to elaborate a bit so I created a proof-of-concept > implementation of my Ideas and ran some throughput measurements to > alleviate concerns about performance. > > First, though, I want to highlight again why the current approach does not > work with out-of-order elements (which, again, occur constantly due to the > distributed nature of the system). This is the example I posted earlier: > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks like > this: > > +--+ > | | Source > +--+ > | > +-----+ > | | > | +--+ > | | | Identity Map > | +--+ > | | > +-----+ > | > +--+ > | | Window > +--+ > | > | > +--+ > | | Sink > +--+ > > So all it does is pass the elements through an identity map and then merge > them again before the window operator. The source emits ascending integers > and the window operator has a custom timestamp extractor that uses the > integer itself as the timestamp and should create windows of size 4 (that > is elements with timestamp 0-3 are one window, the next are the elements > with timestamp 4-8, and so on). Since the topology basically doubles the > elements form the source I would expect to get these windows: > Window: 0, 0, 1, 1, 2, 2, 3, 3 > Window: 4, 4, 6, 6, 7, 7, 8, 8 > > The output is this, however: > Window: 0, 1, 2, 3, > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > Window: 8, 9, 10, 11, > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > Window: 16, 17, 18, 19, > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > Window: 24, 25, 26, 27, > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > > The reason is that the elements simply arrive out-of-order. Imagine what > would happen if the elements actually arrived with some delay from > different operations. > > Now, on to the performance numbers. The proof-of-concept I created is > available here: > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The basic > idea is that sources assign the current timestamp when emitting elements. > They also periodically emit watermarks that tell us that no elements with > an earlier timestamp will be emitted. The watermarks propagate through the > operators. The window operator looks at the timestamp of an element and > puts it into the buffer that corresponds to that window. When the window > operator receives a watermark it will look at the in-flight windows > (basically the buffers) and emit those windows where the window-end is > before the watermark. > > For measuring throughput I did the following: The source emits tuples of > the form ("tuple", 1) in an infinite loop. The window operator sums up the > tuples, thereby counting how many tuples the window operator can handle in > a given time window. There are two different implementations for the > summation: 1) simply summing up the values in a mapWindow(), there you get > a List of all tuples and simple iterate over it. 2) using sum(1), which is > implemented as a reduce() (that uses the pre-reducer optimisations). > > These are the performance numbers (Current is the current implementation, > Next is my proof-of-concept): > > Tumbling (1 sec): > - Current/Map: 1.6 mio > - Current/Reduce: 2 mio > - Next/Map: 2.2 mio > - Next/Reduce: 4 mio > > Sliding (5 sec, slide 1 sec): > - Current/Map: ca 3 mio (fluctuates a lot) > - Current/Reduce: No output > - Next/Map: ca 4 mio (fluctuates) > - Next/Reduce: 10 mio > > The Next/Reduce variant can basically scale indefinitely with window size > because the internal state does not rely on the number of elements (it is > just the current sum). The pre-reducer for sliding elements cannot handle > the amount of tuples, it produces no output. For the two Map variants the > performance fluctuates because they always keep all the elements in an > internal buffer before emission, this seems to tax the garbage collector a > bit and leads to random pauses. > > One thing that should be noted is that I had to disable the fake-element > emission thread, otherwise the Current versions would deadlock. > > So, I started working on this because I thought that out-of-order > processing would be necessary for correctness. And it is certainly, But the > proof-of-concept also shows that performance can be greatly improved. > > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote: >> >> I agree lets separate these topics from each other so we can get faster >> resolution. >> >> There is already a state discussion in the thread we started with Paris. >> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> > wrote: >> >> > I agree with supporting out-of-order out of the box :-), even if this > means >> > a major refactoring. This is the right time to refactor the streaming > API >> > before we pull it out of beta. I think that this is more important than > new >> > features in the streaming API, which can be prioritized once the API is > out >> > of beta (meaning, that IMO this is the right time to stall PRs until we >> > agree on the design). >> > >> > There are three sections in the document: windowing, state, and API. How >> > convoluted are those with each other? Can we separate the discussion or > do >> > we need to discuss those all together? I think part of the difficulty is >> > that we are discussing three design choices at once. >> > >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]> >> > wrote: >> > >> > > Out of order is ubiquitous in the real-world. Typically, what > happens is >> > > that businesses will declare a maximum allowable delay for delayed >> > > transactions and will commit to results when that delay is reached. >> > > Transactions that arrive later than this cutoff are collected > specially >> > as >> > > corrections which are reported/used when possible. >> > > >> > > Clearly, ordering can also be violated during processing, but if the > data >> > > is originally out of order the situation can't be repaired by any >> > protocol >> > > fixes that prevent transactions from becoming disordered but has to >> > handled >> > > at the data level. >> > > >> > > >> > > >> > > >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek <[hidden email] >> >> > > wrote: >> > > >> > > > I also don't like big changes but sometimes they are necessary. The >> > > reason >> > > > why I'm so adamant about out-of-order processing is that > out-of-order >> > > > elements are not some exception that occurs once in a while; they > occur >> > > > constantly in a distributed system. For example, in this: >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the resulting >> > > > windows >> > > > are completely bogus because the current windowing system assumes >> > > elements >> > > > to globally arrive in order, which is simply not true. (The example >> > has a >> > > > source that generates increasing integers. Then these pass through a >> > map >> > > > and are unioned with the original DataStream before a window > operator.) >> > > > This simulates elements arriving from different operators at a >> > windowing >> > > > operator. The example is also DOP=1, I imagine this to get worse > with >> > > > higher DOP. >> > > > >> > > > What do you mean by costly? As I said, I have a proof-of-concept >> > > windowing >> > > > operator that can handle out-or-order elements. This is an example >> > using >> > > > the current Flink API: >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >> > > > (It is an infinite source of tuples and a 5 second window operator > that >> > > > counts the tuples.) The first problem is that this code deadlocks >> > because >> > > > of the thread that emits fake elements. If I disable the fake > element >> > > code >> > > > it works, but the throughput using my mockup is 4 times higher . The >> > gap >> > > > widens dramatically if the window size increases. >> > > > >> > > > So, it actually increases performance (unless I'm making a mistake > in >> > my >> > > > explorations) and can handle elements that arrive out-of-order > (which >> > > > happens basically always in a real-world windowing use-cases). >> > > > >> > > > >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> wrote: >> > > > >> > > > > What I like a lot about Aljoscha's proposed design is that we > need no >> > > > > different code for "system time" vs. "event time". It only > differs in >> > > > where >> > > > > the timestamps are assigned. >> > > > > >> > > > > The OOP approach also gives you the semantics of total ordering >> > without >> > > > > imposing merges on the streams. >> > > > > >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >> > > > > [hidden email]> wrote: >> > > > > >> > > > > > I agree that there should be multiple alternatives the user(!) > can >> > > > > > choose from. Partial out-of-order processing works for many/most >> > > > > > aggregates. However, if you consider Event-Pattern-Matching, > global >> > > > > > ordering in necessary (even if the performance penalty might be >> > > high). >> > > > > > >> > > > > > I would also keep "system-time windows" as an alternative to >> > "source >> > > > > > assigned ts-windows". >> > > > > > >> > > > > > It might also be interesting to consider the following paper for >> > > > > > overlapping windows: "Resource sharing in continuous > sliding-window >> > > > > > aggregates" >> > > > > > >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720 >> > > > > > >> > > > > > -Matthias >> > > > > > >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: >> > > > > > > Hey >> > > > > > > >> > > > > > > I think we should not block PRs unnecessarily if your > suggested >> > > > changes >> > > > > > > might touch them at some point. >> > > > > > > >> > > > > > > Also I still think we should not put everything in the > Datastream >> > > > > because >> > > > > > > it will be a huge mess. >> > > > > > > >> > > > > > > Also we need to agree on the out of order processing, whether > we >> > > want >> > > > > it >> > > > > > > the way you proposed it(which is quite costly). Another >> > alternative >> > > > > > > approach there which fits in the current windowing is to > filter >> > out >> > > > if >> > > > > > > order events and apply a special handling operator on them. > This >> > > > would >> > > > > be >> > > > > > > fairly lightweight. >> > > > > > > >> > > > > > > My point is that we need to consider some alternative > solutions. >> > > And >> > > > we >> > > > > > > should not block contributions along the way. >> > > > > > > >> > > > > > > Cheers >> > > > > > > Gyula >> > > > > > > >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >> > > > [hidden email]> >> > > > > > > wrote: >> > > > > > > >> > > > > > >> The reason I posted this now is that we need to think about > the >> > > API >> > > > > and >> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse >> > reduce) >> > > > and >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream). >> > > > > > >> >> > > > > > >> For the windowing, I think that the current model does not > work >> > > for >> > > > > > >> out-of-order processing. Therefore, the whole windowing >> > > > infrastructure >> > > > > > will >> > > > > > >> basically have to be redone. Meaning also that any work on > the >> > > > > > >> pre-aggregators or optimizations that we do now becomes > useless. >> > > > > > >> >> > > > > > >> For the API, I proposed to restructure the interactions > between >> > > all >> > > > > the >> > > > > > >> different *DataStream classes and grouping/windowing. (See > API >> > > > section >> > > > > > of >> > > > > > >> the doc I posted.) >> > > > > > >> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra <[hidden email] >> >> > > > wrote: >> > > > > > >> >> > > > > > >>> Hi Aljoscha, >> > > > > > >>> >> > > > > > >>> Thanks for the nice summary, this is a very good initiative. >> > > > > > >>> >> > > > > > >>> I added some comments to the respective sections (where I > didnt >> > > > fully >> > > > > > >> agree >> > > > > > >>> :).). >> > > > > > >>> At some point I think it would be good to have a public > hangout >> > > > > session >> > > > > > >> on >> > > > > > >>> this, which could make a more dynamic discussion. >> > > > > > >>> >> > > > > > >>> Cheers, >> > > > > > >>> Gyula >> > > > > > >>> >> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: >> > 2015. >> > > > jún. >> > > > > > >> 22., >> > > > > > >>> H, 21:34): >> > > > > > >>> >> > > > > > >>>> Hi, >> > > > > > >>>> with people proposing changes to the streaming part I also >> > > wanted >> > > > to >> > > > > > >>> throw >> > > > > > >>>> my hat into the ring. :D >> > > > > > >>>> >> > > > > > >>>> During the last few months, while I was getting acquainted >> > with >> > > > the >> > > > > > >>>> streaming system, I wrote down some thoughts I had about > how >> > > > things >> > > > > > >> could >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent shape >> > now, >> > > > so >> > > > > > >>> please >> > > > > > >>>> have a look if you are interested in this: >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >> > > > > > >>>> >> > > > > > >>>> This mostly covers: >> > > > > > >>>> - Timestamps assigned at sources >> > > > > > >>>> - Out-of-order processing of elements in window operators >> > > > > > >>>> - API design >> > > > > > >>>> >> > > > > > >>>> Please let me know what you think. Comment in the document > or >> > > here >> > > > > in >> > > > > > >> the >> > > > > > >>>> mailing list. >> > > > > > >>>> >> > > > > > >>>> I have a PR in the makings that would introduce source >> > > timestamps >> > > > > and >> > > > > > >>>> watermarks for keeping track of them. I also hacked a >> > > > > proof-of-concept >> > > > > > >>> of a >> > > > > > >>>> windowing system that is able to process out-of-order > elements >> > > > > using a >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient >> > > > > > >> pre-aggregations.) >> > > > > > >>>> >> > > > > > >>>> Cheers, >> > > > > > >>>> Aljoscha >> > > > > > >>>> >> > > > > > >>> >> > > > > > >> >> > > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > |
Hi,
I also ran the tests on top of PR 856 (inverse reducer) now. The results seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec) (Theoretically there would be 5000 tuples in 5 seconds but this is due to overhead). These are the results for the inverse reduce optimisation: (Tuple 0,38) (Tuple 0,829) (Tuple 0,1625) (Tuple 0,2424) (Tuple 0,3190) (Tuple 0,3198) (Tuple 0,-339368) (Tuple 0,-1315725) (Tuple 0,-2932932) (Tuple 0,-5082735) (Tuple 0,-7743256) (Tuple 0,75701046) (Tuple 0,642829470) (Tuple 0,2242018381) (Tuple 0,5190708618) (Tuple 0,10060360311) (Tuple 0,-94254951) (Tuple 0,-219806321293) (Tuple 0,-1258895232699) (Tuple 0,-4074432596329) One line is one emitted window count. This is what happens when I remove the Thread.sleep(1): (Tuple 0,660676) (Tuple 0,2553733) (Tuple 0,3542696) (Tuple 0,1) (Tuple 0,1107035) (Tuple 0,2549491) (Tuple 0,4100387) (Tuple 0,-8406583360092) (Tuple 0,-8406582150743) (Tuple 0,-8406580427190) (Tuple 0,-8406580427190) (Tuple 0,-8406580427190) (Tuple 0,6847279255682044995) (Tuple 0,6847279255682044995) (Tuple 0,-5390528042713628318) (Tuple 0,-5390528042711551780) (Tuple 0,-5390528042711551780) So at some point the pre-reducer seems to go haywire and does not recover from it. The good thing is that it does produce results now, where the previous Current/Reduce would simply hang and not produce any output. On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote: > Hello, > > Aljoscha, can you please try the performance test of Current/Reduce > with the InversePreReducer in PR 856? (If you just call sum, it will > use an InversePreReducer.) It would be an interesting test, because > the inverse function optimization really depends on the stream being > ordered, and I think it has the potential of being faster then > Next/Reduce. Especially if the window size is much larger than the > slide size. > > Best regards, > Gabor > > > 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>: > > I think I'll have to elaborate a bit so I created a proof-of-concept > > implementation of my Ideas and ran some throughput measurements to > > alleviate concerns about performance. > > > > First, though, I want to highlight again why the current approach does > not > > work with out-of-order elements (which, again, occur constantly due to > the > > distributed nature of the system). This is the example I posted earlier: > > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks > like > > this: > > > > +--+ > > | | Source > > +--+ > > | > > +-----+ > > | | > > | +--+ > > | | | Identity Map > > | +--+ > > | | > > +-----+ > > | > > +--+ > > | | Window > > +--+ > > | > > | > > +--+ > > | | Sink > > +--+ > > > > So all it does is pass the elements through an identity map and then > merge > > them again before the window operator. The source emits ascending > integers > > and the window operator has a custom timestamp extractor that uses the > > integer itself as the timestamp and should create windows of size 4 (that > > is elements with timestamp 0-3 are one window, the next are the elements > > with timestamp 4-8, and so on). Since the topology basically doubles the > > elements form the source I would expect to get these windows: > > Window: 0, 0, 1, 1, 2, 2, 3, 3 > > Window: 4, 4, 6, 6, 7, 7, 8, 8 > > > > The output is this, however: > > Window: 0, 1, 2, 3, > > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > > Window: 8, 9, 10, 11, > > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > > Window: 16, 17, 18, 19, > > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > > Window: 24, 25, 26, 27, > > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > > > > The reason is that the elements simply arrive out-of-order. Imagine what > > would happen if the elements actually arrived with some delay from > > different operations. > > > > Now, on to the performance numbers. The proof-of-concept I created is > > available here: > > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The > basic > > idea is that sources assign the current timestamp when emitting elements. > > They also periodically emit watermarks that tell us that no elements with > > an earlier timestamp will be emitted. The watermarks propagate through > the > > operators. The window operator looks at the timestamp of an element and > > puts it into the buffer that corresponds to that window. When the window > > operator receives a watermark it will look at the in-flight windows > > (basically the buffers) and emit those windows where the window-end is > > before the watermark. > > > > For measuring throughput I did the following: The source emits tuples of > > the form ("tuple", 1) in an infinite loop. The window operator sums up > the > > tuples, thereby counting how many tuples the window operator can handle > in > > a given time window. There are two different implementations for the > > summation: 1) simply summing up the values in a mapWindow(), there you > get > > a List of all tuples and simple iterate over it. 2) using sum(1), which > is > > implemented as a reduce() (that uses the pre-reducer optimisations). > > > > These are the performance numbers (Current is the current implementation, > > Next is my proof-of-concept): > > > > Tumbling (1 sec): > > - Current/Map: 1.6 mio > > - Current/Reduce: 2 mio > > - Next/Map: 2.2 mio > > - Next/Reduce: 4 mio > > > > Sliding (5 sec, slide 1 sec): > > - Current/Map: ca 3 mio (fluctuates a lot) > > - Current/Reduce: No output > > - Next/Map: ca 4 mio (fluctuates) > > - Next/Reduce: 10 mio > > > > The Next/Reduce variant can basically scale indefinitely with window size > > because the internal state does not rely on the number of elements (it is > > just the current sum). The pre-reducer for sliding elements cannot handle > > the amount of tuples, it produces no output. For the two Map variants the > > performance fluctuates because they always keep all the elements in an > > internal buffer before emission, this seems to tax the garbage collector > a > > bit and leads to random pauses. > > > > One thing that should be noted is that I had to disable the fake-element > > emission thread, otherwise the Current versions would deadlock. > > > > So, I started working on this because I thought that out-of-order > > processing would be necessary for correctness. And it is certainly, But > the > > proof-of-concept also shows that performance can be greatly improved. > > > > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote: > >> > >> I agree lets separate these topics from each other so we can get faster > >> resolution. > >> > >> There is already a state discussion in the thread we started with Paris. > >> > >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> > > wrote: > >> > >> > I agree with supporting out-of-order out of the box :-), even if this > > means > >> > a major refactoring. This is the right time to refactor the streaming > > API > >> > before we pull it out of beta. I think that this is more important > than > > new > >> > features in the streaming API, which can be prioritized once the API > is > > out > >> > of beta (meaning, that IMO this is the right time to stall PRs until > we > >> > agree on the design). > >> > > >> > There are three sections in the document: windowing, state, and API. > How > >> > convoluted are those with each other? Can we separate the discussion > or > > do > >> > we need to discuss those all together? I think part of the difficulty > is > >> > that we are discussing three design choices at once. > >> > > >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]> > >> > wrote: > >> > > >> > > Out of order is ubiquitous in the real-world. Typically, what > > happens is > >> > > that businesses will declare a maximum allowable delay for delayed > >> > > transactions and will commit to results when that delay is reached. > >> > > Transactions that arrive later than this cutoff are collected > > specially > >> > as > >> > > corrections which are reported/used when possible. > >> > > > >> > > Clearly, ordering can also be violated during processing, but if the > > data > >> > > is originally out of order the situation can't be repaired by any > >> > protocol > >> > > fixes that prevent transactions from becoming disordered but has to > >> > handled > >> > > at the data level. > >> > > > >> > > > >> > > > >> > > > >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < > [hidden email] > >> > >> > > wrote: > >> > > > >> > > > I also don't like big changes but sometimes they are necessary. > The > >> > > reason > >> > > > why I'm so adamant about out-of-order processing is that > > out-of-order > >> > > > elements are not some exception that occurs once in a while; they > > occur > >> > > > constantly in a distributed system. For example, in this: > >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the > resulting > >> > > > windows > >> > > > are completely bogus because the current windowing system assumes > >> > > elements > >> > > > to globally arrive in order, which is simply not true. (The > example > >> > has a > >> > > > source that generates increasing integers. Then these pass > through a > >> > map > >> > > > and are unioned with the original DataStream before a window > > operator.) > >> > > > This simulates elements arriving from different operators at a > >> > windowing > >> > > > operator. The example is also DOP=1, I imagine this to get worse > > with > >> > > > higher DOP. > >> > > > > >> > > > What do you mean by costly? As I said, I have a proof-of-concept > >> > > windowing > >> > > > operator that can handle out-or-order elements. This is an example > >> > using > >> > > > the current Flink API: > >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > >> > > > (It is an infinite source of tuples and a 5 second window operator > > that > >> > > > counts the tuples.) The first problem is that this code deadlocks > >> > because > >> > > > of the thread that emits fake elements. If I disable the fake > > element > >> > > code > >> > > > it works, but the throughput using my mockup is 4 times higher . > The > >> > gap > >> > > > widens dramatically if the window size increases. > >> > > > > >> > > > So, it actually increases performance (unless I'm making a mistake > > in > >> > my > >> > > > explorations) and can handle elements that arrive out-of-order > > (which > >> > > > happens basically always in a real-world windowing use-cases). > >> > > > > >> > > > > >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> > wrote: > >> > > > > >> > > > > What I like a lot about Aljoscha's proposed design is that we > > need no > >> > > > > different code for "system time" vs. "event time". It only > > differs in > >> > > > where > >> > > > > the timestamps are assigned. > >> > > > > > >> > > > > The OOP approach also gives you the semantics of total ordering > >> > without > >> > > > > imposing merges on the streams. > >> > > > > > >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > >> > > > > [hidden email]> wrote: > >> > > > > > >> > > > > > I agree that there should be multiple alternatives the user(!) > > can > >> > > > > > choose from. Partial out-of-order processing works for > many/most > >> > > > > > aggregates. However, if you consider Event-Pattern-Matching, > > global > >> > > > > > ordering in necessary (even if the performance penalty might > be > >> > > high). > >> > > > > > > >> > > > > > I would also keep "system-time windows" as an alternative to > >> > "source > >> > > > > > assigned ts-windows". > >> > > > > > > >> > > > > > It might also be interesting to consider the following paper > for > >> > > > > > overlapping windows: "Resource sharing in continuous > > sliding-window > >> > > > > > aggregates" > >> > > > > > > >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720 > >> > > > > > > >> > > > > > -Matthias > >> > > > > > > >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: > >> > > > > > > Hey > >> > > > > > > > >> > > > > > > I think we should not block PRs unnecessarily if your > > suggested > >> > > > changes > >> > > > > > > might touch them at some point. > >> > > > > > > > >> > > > > > > Also I still think we should not put everything in the > > Datastream > >> > > > > because > >> > > > > > > it will be a huge mess. > >> > > > > > > > >> > > > > > > Also we need to agree on the out of order processing, > whether > > we > >> > > want > >> > > > > it > >> > > > > > > the way you proposed it(which is quite costly). Another > >> > alternative > >> > > > > > > approach there which fits in the current windowing is to > > filter > >> > out > >> > > > if > >> > > > > > > order events and apply a special handling operator on them. > > This > >> > > > would > >> > > > > be > >> > > > > > > fairly lightweight. > >> > > > > > > > >> > > > > > > My point is that we need to consider some alternative > > solutions. > >> > > And > >> > > > we > >> > > > > > > should not block contributions along the way. > >> > > > > > > > >> > > > > > > Cheers > >> > > > > > > Gyula > >> > > > > > > > >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > >> > > > [hidden email]> > >> > > > > > > wrote: > >> > > > > > > > >> > > > > > >> The reason I posted this now is that we need to think about > > the > >> > > API > >> > > > > and > >> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse > >> > reduce) > >> > > > and > >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream). > >> > > > > > >> > >> > > > > > >> For the windowing, I think that the current model does not > > work > >> > > for > >> > > > > > >> out-of-order processing. Therefore, the whole windowing > >> > > > infrastructure > >> > > > > > will > >> > > > > > >> basically have to be redone. Meaning also that any work on > > the > >> > > > > > >> pre-aggregators or optimizations that we do now becomes > > useless. > >> > > > > > >> > >> > > > > > >> For the API, I proposed to restructure the interactions > > between > >> > > all > >> > > > > the > >> > > > > > >> different *DataStream classes and grouping/windowing. (See > > API > >> > > > section > >> > > > > > of > >> > > > > > >> the doc I posted.) > >> > > > > > >> > >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < > [hidden email] > >> > >> > > > wrote: > >> > > > > > >> > >> > > > > > >>> Hi Aljoscha, > >> > > > > > >>> > >> > > > > > >>> Thanks for the nice summary, this is a very good > initiative. > >> > > > > > >>> > >> > > > > > >>> I added some comments to the respective sections (where I > > didnt > >> > > > fully > >> > > > > > >> agree > >> > > > > > >>> :).). > >> > > > > > >>> At some point I think it would be good to have a public > > hangout > >> > > > > session > >> > > > > > >> on > >> > > > > > >>> this, which could make a more dynamic discussion. > >> > > > > > >>> > >> > > > > > >>> Cheers, > >> > > > > > >>> Gyula > >> > > > > > >>> > >> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: > >> > 2015. > >> > > > jún. > >> > > > > > >> 22., > >> > > > > > >>> H, 21:34): > >> > > > > > >>> > >> > > > > > >>>> Hi, > >> > > > > > >>>> with people proposing changes to the streaming part I > also > >> > > wanted > >> > > > to > >> > > > > > >>> throw > >> > > > > > >>>> my hat into the ring. :D > >> > > > > > >>>> > >> > > > > > >>>> During the last few months, while I was getting > acquainted > >> > with > >> > > > the > >> > > > > > >>>> streaming system, I wrote down some thoughts I had about > > how > >> > > > things > >> > > > > > >> could > >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent > shape > >> > now, > >> > > > so > >> > > > > > >>> please > >> > > > > > >>>> have a look if you are interested in this: > >> > > > > > >>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > >> > > > > > >>>> > >> > > > > > >>>> This mostly covers: > >> > > > > > >>>> - Timestamps assigned at sources > >> > > > > > >>>> - Out-of-order processing of elements in window > operators > >> > > > > > >>>> - API design > >> > > > > > >>>> > >> > > > > > >>>> Please let me know what you think. Comment in the > document > > or > >> > > here > >> > > > > in > >> > > > > > >> the > >> > > > > > >>>> mailing list. > >> > > > > > >>>> > >> > > > > > >>>> I have a PR in the makings that would introduce source > >> > > timestamps > >> > > > > and > >> > > > > > >>>> watermarks for keeping track of them. I also hacked a > >> > > > > proof-of-concept > >> > > > > > >>> of a > >> > > > > > >>>> windowing system that is able to process out-of-order > > elements > >> > > > > using a > >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient > >> > > > > > >> pre-aggregations.) > >> > > > > > >>>> > >> > > > > > >>>> Cheers, > >> > > > > > >>>> Aljoscha > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > |
I'm very sorry, I had a bug in the InversePreReducer. It should be
fixed now. Can you please run it again? I also tried to reproduce some of your performance numbers, but I'm getting only less than 1/10th of yours. For example, in the Tumbling case, Current/Reduce produces only ~100000 for me. Do you have any idea what I could be doing wrong? My code: http://pastebin.com/zbEjmGhk I am running it on a 2 GHz Core i7. Best regards, Gabor 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>: > Hi, > I also ran the tests on top of PR 856 (inverse reducer) now. The results > seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all > the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec) > (Theoretically there would be 5000 tuples in 5 seconds but this is due to > overhead). These are the results for the inverse reduce optimisation: > (Tuple 0,38) > (Tuple 0,829) > (Tuple 0,1625) > (Tuple 0,2424) > (Tuple 0,3190) > (Tuple 0,3198) > (Tuple 0,-339368) > (Tuple 0,-1315725) > (Tuple 0,-2932932) > (Tuple 0,-5082735) > (Tuple 0,-7743256) > (Tuple 0,75701046) > (Tuple 0,642829470) > (Tuple 0,2242018381) > (Tuple 0,5190708618) > (Tuple 0,10060360311) > (Tuple 0,-94254951) > (Tuple 0,-219806321293) > (Tuple 0,-1258895232699) > (Tuple 0,-4074432596329) > > One line is one emitted window count. This is what happens when I remove > the Thread.sleep(1): > (Tuple 0,660676) > (Tuple 0,2553733) > (Tuple 0,3542696) > (Tuple 0,1) > (Tuple 0,1107035) > (Tuple 0,2549491) > (Tuple 0,4100387) > (Tuple 0,-8406583360092) > (Tuple 0,-8406582150743) > (Tuple 0,-8406580427190) > (Tuple 0,-8406580427190) > (Tuple 0,-8406580427190) > (Tuple 0,6847279255682044995) > (Tuple 0,6847279255682044995) > (Tuple 0,-5390528042713628318) > (Tuple 0,-5390528042711551780) > (Tuple 0,-5390528042711551780) > > So at some point the pre-reducer seems to go haywire and does not recover > from it. The good thing is that it does produce results now, where the > previous Current/Reduce would simply hang and not produce any output. > > On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote: > >> Hello, >> >> Aljoscha, can you please try the performance test of Current/Reduce >> with the InversePreReducer in PR 856? (If you just call sum, it will >> use an InversePreReducer.) It would be an interesting test, because >> the inverse function optimization really depends on the stream being >> ordered, and I think it has the potential of being faster then >> Next/Reduce. Especially if the window size is much larger than the >> slide size. >> >> Best regards, >> Gabor >> >> >> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>: >> > I think I'll have to elaborate a bit so I created a proof-of-concept >> > implementation of my Ideas and ran some throughput measurements to >> > alleviate concerns about performance. >> > >> > First, though, I want to highlight again why the current approach does >> not >> > work with out-of-order elements (which, again, occur constantly due to >> the >> > distributed nature of the system). This is the example I posted earlier: >> > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks >> like >> > this: >> > >> > +--+ >> > | | Source >> > +--+ >> > | >> > +-----+ >> > | | >> > | +--+ >> > | | | Identity Map >> > | +--+ >> > | | >> > +-----+ >> > | >> > +--+ >> > | | Window >> > +--+ >> > | >> > | >> > +--+ >> > | | Sink >> > +--+ >> > >> > So all it does is pass the elements through an identity map and then >> merge >> > them again before the window operator. The source emits ascending >> integers >> > and the window operator has a custom timestamp extractor that uses the >> > integer itself as the timestamp and should create windows of size 4 (that >> > is elements with timestamp 0-3 are one window, the next are the elements >> > with timestamp 4-8, and so on). Since the topology basically doubles the >> > elements form the source I would expect to get these windows: >> > Window: 0, 0, 1, 1, 2, 2, 3, 3 >> > Window: 4, 4, 6, 6, 7, 7, 8, 8 >> > >> > The output is this, however: >> > Window: 0, 1, 2, 3, >> > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >> > Window: 8, 9, 10, 11, >> > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >> > Window: 16, 17, 18, 19, >> > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, >> > Window: 24, 25, 26, 27, >> > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >> > >> > The reason is that the elements simply arrive out-of-order. Imagine what >> > would happen if the elements actually arrived with some delay from >> > different operations. >> > >> > Now, on to the performance numbers. The proof-of-concept I created is >> > available here: >> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The >> basic >> > idea is that sources assign the current timestamp when emitting elements. >> > They also periodically emit watermarks that tell us that no elements with >> > an earlier timestamp will be emitted. The watermarks propagate through >> the >> > operators. The window operator looks at the timestamp of an element and >> > puts it into the buffer that corresponds to that window. When the window >> > operator receives a watermark it will look at the in-flight windows >> > (basically the buffers) and emit those windows where the window-end is >> > before the watermark. >> > >> > For measuring throughput I did the following: The source emits tuples of >> > the form ("tuple", 1) in an infinite loop. The window operator sums up >> the >> > tuples, thereby counting how many tuples the window operator can handle >> in >> > a given time window. There are two different implementations for the >> > summation: 1) simply summing up the values in a mapWindow(), there you >> get >> > a List of all tuples and simple iterate over it. 2) using sum(1), which >> is >> > implemented as a reduce() (that uses the pre-reducer optimisations). >> > >> > These are the performance numbers (Current is the current implementation, >> > Next is my proof-of-concept): >> > >> > Tumbling (1 sec): >> > - Current/Map: 1.6 mio >> > - Current/Reduce: 2 mio >> > - Next/Map: 2.2 mio >> > - Next/Reduce: 4 mio >> > >> > Sliding (5 sec, slide 1 sec): >> > - Current/Map: ca 3 mio (fluctuates a lot) >> > - Current/Reduce: No output >> > - Next/Map: ca 4 mio (fluctuates) >> > - Next/Reduce: 10 mio >> > >> > The Next/Reduce variant can basically scale indefinitely with window size >> > because the internal state does not rely on the number of elements (it is >> > just the current sum). The pre-reducer for sliding elements cannot handle >> > the amount of tuples, it produces no output. For the two Map variants the >> > performance fluctuates because they always keep all the elements in an >> > internal buffer before emission, this seems to tax the garbage collector >> a >> > bit and leads to random pauses. >> > >> > One thing that should be noted is that I had to disable the fake-element >> > emission thread, otherwise the Current versions would deadlock. >> > >> > So, I started working on this because I thought that out-of-order >> > processing would be necessary for correctness. And it is certainly, But >> the >> > proof-of-concept also shows that performance can be greatly improved. >> > >> > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote: >> >> >> >> I agree lets separate these topics from each other so we can get faster >> >> resolution. >> >> >> >> There is already a state discussion in the thread we started with Paris. >> >> >> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> >> > wrote: >> >> >> >> > I agree with supporting out-of-order out of the box :-), even if this >> > means >> >> > a major refactoring. This is the right time to refactor the streaming >> > API >> >> > before we pull it out of beta. I think that this is more important >> than >> > new >> >> > features in the streaming API, which can be prioritized once the API >> is >> > out >> >> > of beta (meaning, that IMO this is the right time to stall PRs until >> we >> >> > agree on the design). >> >> > >> >> > There are three sections in the document: windowing, state, and API. >> How >> >> > convoluted are those with each other? Can we separate the discussion >> or >> > do >> >> > we need to discuss those all together? I think part of the difficulty >> is >> >> > that we are discussing three design choices at once. >> >> > >> >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning <[hidden email]> >> >> > wrote: >> >> > >> >> > > Out of order is ubiquitous in the real-world. Typically, what >> > happens is >> >> > > that businesses will declare a maximum allowable delay for delayed >> >> > > transactions and will commit to results when that delay is reached. >> >> > > Transactions that arrive later than this cutoff are collected >> > specially >> >> > as >> >> > > corrections which are reported/used when possible. >> >> > > >> >> > > Clearly, ordering can also be violated during processing, but if the >> > data >> >> > > is originally out of order the situation can't be repaired by any >> >> > protocol >> >> > > fixes that prevent transactions from becoming disordered but has to >> >> > handled >> >> > > at the data level. >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >> [hidden email] >> >> >> >> > > wrote: >> >> > > >> >> > > > I also don't like big changes but sometimes they are necessary. >> The >> >> > > reason >> >> > > > why I'm so adamant about out-of-order processing is that >> > out-of-order >> >> > > > elements are not some exception that occurs once in a while; they >> > occur >> >> > > > constantly in a distributed system. For example, in this: >> >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the >> resulting >> >> > > > windows >> >> > > > are completely bogus because the current windowing system assumes >> >> > > elements >> >> > > > to globally arrive in order, which is simply not true. (The >> example >> >> > has a >> >> > > > source that generates increasing integers. Then these pass >> through a >> >> > map >> >> > > > and are unioned with the original DataStream before a window >> > operator.) >> >> > > > This simulates elements arriving from different operators at a >> >> > windowing >> >> > > > operator. The example is also DOP=1, I imagine this to get worse >> > with >> >> > > > higher DOP. >> >> > > > >> >> > > > What do you mean by costly? As I said, I have a proof-of-concept >> >> > > windowing >> >> > > > operator that can handle out-or-order elements. This is an example >> >> > using >> >> > > > the current Flink API: >> >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >> >> > > > (It is an infinite source of tuples and a 5 second window operator >> > that >> >> > > > counts the tuples.) The first problem is that this code deadlocks >> >> > because >> >> > > > of the thread that emits fake elements. If I disable the fake >> > element >> >> > > code >> >> > > > it works, but the throughput using my mockup is 4 times higher . >> The >> >> > gap >> >> > > > widens dramatically if the window size increases. >> >> > > > >> >> > > > So, it actually increases performance (unless I'm making a mistake >> > in >> >> > my >> >> > > > explorations) and can handle elements that arrive out-of-order >> > (which >> >> > > > happens basically always in a real-world windowing use-cases). >> >> > > > >> >> > > > >> >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> >> wrote: >> >> > > > >> >> > > > > What I like a lot about Aljoscha's proposed design is that we >> > need no >> >> > > > > different code for "system time" vs. "event time". It only >> > differs in >> >> > > > where >> >> > > > > the timestamps are assigned. >> >> > > > > >> >> > > > > The OOP approach also gives you the semantics of total ordering >> >> > without >> >> > > > > imposing merges on the streams. >> >> > > > > >> >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >> >> > > > > [hidden email]> wrote: >> >> > > > > >> >> > > > > > I agree that there should be multiple alternatives the user(!) >> > can >> >> > > > > > choose from. Partial out-of-order processing works for >> many/most >> >> > > > > > aggregates. However, if you consider Event-Pattern-Matching, >> > global >> >> > > > > > ordering in necessary (even if the performance penalty might >> be >> >> > > high). >> >> > > > > > >> >> > > > > > I would also keep "system-time windows" as an alternative to >> >> > "source >> >> > > > > > assigned ts-windows". >> >> > > > > > >> >> > > > > > It might also be interesting to consider the following paper >> for >> >> > > > > > overlapping windows: "Resource sharing in continuous >> > sliding-window >> >> > > > > > aggregates" >> >> > > > > > >> >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720 >> >> > > > > > >> >> > > > > > -Matthias >> >> > > > > > >> >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: >> >> > > > > > > Hey >> >> > > > > > > >> >> > > > > > > I think we should not block PRs unnecessarily if your >> > suggested >> >> > > > changes >> >> > > > > > > might touch them at some point. >> >> > > > > > > >> >> > > > > > > Also I still think we should not put everything in the >> > Datastream >> >> > > > > because >> >> > > > > > > it will be a huge mess. >> >> > > > > > > >> >> > > > > > > Also we need to agree on the out of order processing, >> whether >> > we >> >> > > want >> >> > > > > it >> >> > > > > > > the way you proposed it(which is quite costly). Another >> >> > alternative >> >> > > > > > > approach there which fits in the current windowing is to >> > filter >> >> > out >> >> > > > if >> >> > > > > > > order events and apply a special handling operator on them. >> > This >> >> > > > would >> >> > > > > be >> >> > > > > > > fairly lightweight. >> >> > > > > > > >> >> > > > > > > My point is that we need to consider some alternative >> > solutions. >> >> > > And >> >> > > > we >> >> > > > > > > should not block contributions along the way. >> >> > > > > > > >> >> > > > > > > Cheers >> >> > > > > > > Gyula >> >> > > > > > > >> >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >> >> > > > [hidden email]> >> >> > > > > > > wrote: >> >> > > > > > > >> >> > > > > > >> The reason I posted this now is that we need to think about >> > the >> >> > > API >> >> > > > > and >> >> > > > > > >> windowing before proceeding with the PRs of Gabor (inverse >> >> > reduce) >> >> > > > and >> >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream). >> >> > > > > > >> >> >> > > > > > >> For the windowing, I think that the current model does not >> > work >> >> > > for >> >> > > > > > >> out-of-order processing. Therefore, the whole windowing >> >> > > > infrastructure >> >> > > > > > will >> >> > > > > > >> basically have to be redone. Meaning also that any work on >> > the >> >> > > > > > >> pre-aggregators or optimizations that we do now becomes >> > useless. >> >> > > > > > >> >> >> > > > > > >> For the API, I proposed to restructure the interactions >> > between >> >> > > all >> >> > > > > the >> >> > > > > > >> different *DataStream classes and grouping/windowing. (See >> > API >> >> > > > section >> >> > > > > > of >> >> > > > > > >> the doc I posted.) >> >> > > > > > >> >> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >> [hidden email] >> >> >> >> > > > wrote: >> >> > > > > > >> >> >> > > > > > >>> Hi Aljoscha, >> >> > > > > > >>> >> >> > > > > > >>> Thanks for the nice summary, this is a very good >> initiative. >> >> > > > > > >>> >> >> > > > > > >>> I added some comments to the respective sections (where I >> > didnt >> >> > > > fully >> >> > > > > > >> agree >> >> > > > > > >>> :).). >> >> > > > > > >>> At some point I think it would be good to have a public >> > hangout >> >> > > > > session >> >> > > > > > >> on >> >> > > > > > >>> this, which could make a more dynamic discussion. >> >> > > > > > >>> >> >> > > > > > >>> Cheers, >> >> > > > > > >>> Gyula >> >> > > > > > >>> >> >> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: >> >> > 2015. >> >> > > > jún. >> >> > > > > > >> 22., >> >> > > > > > >>> H, 21:34): >> >> > > > > > >>> >> >> > > > > > >>>> Hi, >> >> > > > > > >>>> with people proposing changes to the streaming part I >> also >> >> > > wanted >> >> > > > to >> >> > > > > > >>> throw >> >> > > > > > >>>> my hat into the ring. :D >> >> > > > > > >>>> >> >> > > > > > >>>> During the last few months, while I was getting >> acquainted >> >> > with >> >> > > > the >> >> > > > > > >>>> streaming system, I wrote down some thoughts I had about >> > how >> >> > > > things >> >> > > > > > >> could >> >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent >> shape >> >> > now, >> >> > > > so >> >> > > > > > >>> please >> >> > > > > > >>>> have a look if you are interested in this: >> >> > > > > > >>>> >> >> > > > > > >>>> >> >> > > > > > >>> >> >> > > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >> >> > > > > > >>>> >> >> > > > > > >>>> This mostly covers: >> >> > > > > > >>>> - Timestamps assigned at sources >> >> > > > > > >>>> - Out-of-order processing of elements in window >> operators >> >> > > > > > >>>> - API design >> >> > > > > > >>>> >> >> > > > > > >>>> Please let me know what you think. Comment in the >> document >> > or >> >> > > here >> >> > > > > in >> >> > > > > > >> the >> >> > > > > > >>>> mailing list. >> >> > > > > > >>>> >> >> > > > > > >>>> I have a PR in the makings that would introduce source >> >> > > timestamps >> >> > > > > and >> >> > > > > > >>>> watermarks for keeping track of them. I also hacked a >> >> > > > > proof-of-concept >> >> > > > > > >>> of a >> >> > > > > > >>>> windowing system that is able to process out-of-order >> > elements >> >> > > > > using a >> >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient >> >> > > > > > >> pre-aggregations.) >> >> > > > > > >>>> >> >> > > > > > >>>> Cheers, >> >> > > > > > >>>> Aljoscha >> >> > > > > > >>>> >> >> > > > > > >>> >> >> > > > > > >> >> >> > > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> |
Yes, now this also processes about 3 mio Elements (Window Size 5 sec, Slide
1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. Performance is not my main concern, however. My concern is that the current model assumes elements to arrive in order, which is simply not true. In your code you have these lines for specifying the window: .window(Time.of(1l, TimeUnit.SECONDS)) .every(Time.of(1l, TimeUnit.SECONDS)) Although this semantically specifies a tumbling window of size 1 sec I'm afraid it uses the sliding window logic internally (because of the .every()). In my tests I only have the first line. On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <[hidden email]> wrote: > I'm very sorry, I had a bug in the InversePreReducer. It should be > fixed now. Can you please run it again? > > I also tried to reproduce some of your performance numbers, but I'm > getting only less than 1/10th of yours. For example, in the Tumbling > case, Current/Reduce produces only ~100000 for me. Do you have any > idea what I could be doing wrong? My code: > http://pastebin.com/zbEjmGhk > I am running it on a 2 GHz Core i7. > > Best regards, > Gabor > > > 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>: > > Hi, > > I also ran the tests on top of PR 856 (inverse reducer) now. The results > > seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all > > the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec) > > (Theoretically there would be 5000 tuples in 5 seconds but this is due to > > overhead). These are the results for the inverse reduce optimisation: > > (Tuple 0,38) > > (Tuple 0,829) > > (Tuple 0,1625) > > (Tuple 0,2424) > > (Tuple 0,3190) > > (Tuple 0,3198) > > (Tuple 0,-339368) > > (Tuple 0,-1315725) > > (Tuple 0,-2932932) > > (Tuple 0,-5082735) > > (Tuple 0,-7743256) > > (Tuple 0,75701046) > > (Tuple 0,642829470) > > (Tuple 0,2242018381) > > (Tuple 0,5190708618) > > (Tuple 0,10060360311) > > (Tuple 0,-94254951) > > (Tuple 0,-219806321293) > > (Tuple 0,-1258895232699) > > (Tuple 0,-4074432596329) > > > > One line is one emitted window count. This is what happens when I remove > > the Thread.sleep(1): > > (Tuple 0,660676) > > (Tuple 0,2553733) > > (Tuple 0,3542696) > > (Tuple 0,1) > > (Tuple 0,1107035) > > (Tuple 0,2549491) > > (Tuple 0,4100387) > > (Tuple 0,-8406583360092) > > (Tuple 0,-8406582150743) > > (Tuple 0,-8406580427190) > > (Tuple 0,-8406580427190) > > (Tuple 0,-8406580427190) > > (Tuple 0,6847279255682044995) > > (Tuple 0,6847279255682044995) > > (Tuple 0,-5390528042713628318) > > (Tuple 0,-5390528042711551780) > > (Tuple 0,-5390528042711551780) > > > > So at some point the pre-reducer seems to go haywire and does not recover > > from it. The good thing is that it does produce results now, where the > > previous Current/Reduce would simply hang and not produce any output. > > > > On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote: > > > >> Hello, > >> > >> Aljoscha, can you please try the performance test of Current/Reduce > >> with the InversePreReducer in PR 856? (If you just call sum, it will > >> use an InversePreReducer.) It would be an interesting test, because > >> the inverse function optimization really depends on the stream being > >> ordered, and I think it has the potential of being faster then > >> Next/Reduce. Especially if the window size is much larger than the > >> slide size. > >> > >> Best regards, > >> Gabor > >> > >> > >> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>: > >> > I think I'll have to elaborate a bit so I created a proof-of-concept > >> > implementation of my Ideas and ran some throughput measurements to > >> > alleviate concerns about performance. > >> > > >> > First, though, I want to highlight again why the current approach does > >> not > >> > work with out-of-order elements (which, again, occur constantly due to > >> the > >> > distributed nature of the system). This is the example I posted > earlier: > >> > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks > >> like > >> > this: > >> > > >> > +--+ > >> > | | Source > >> > +--+ > >> > | > >> > +-----+ > >> > | | > >> > | +--+ > >> > | | | Identity Map > >> > | +--+ > >> > | | > >> > +-----+ > >> > | > >> > +--+ > >> > | | Window > >> > +--+ > >> > | > >> > | > >> > +--+ > >> > | | Sink > >> > +--+ > >> > > >> > So all it does is pass the elements through an identity map and then > >> merge > >> > them again before the window operator. The source emits ascending > >> integers > >> > and the window operator has a custom timestamp extractor that uses the > >> > integer itself as the timestamp and should create windows of size 4 > (that > >> > is elements with timestamp 0-3 are one window, the next are the > elements > >> > with timestamp 4-8, and so on). Since the topology basically doubles > the > >> > elements form the source I would expect to get these windows: > >> > Window: 0, 0, 1, 1, 2, 2, 3, 3 > >> > Window: 4, 4, 6, 6, 7, 7, 8, 8 > >> > > >> > The output is this, however: > >> > Window: 0, 1, 2, 3, > >> > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > >> > Window: 8, 9, 10, 11, > >> > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > >> > Window: 16, 17, 18, 19, > >> > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > >> > Window: 24, 25, 26, 27, > >> > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > >> > > >> > The reason is that the elements simply arrive out-of-order. Imagine > what > >> > would happen if the elements actually arrived with some delay from > >> > different operations. > >> > > >> > Now, on to the performance numbers. The proof-of-concept I created is > >> > available here: > >> > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The > >> basic > >> > idea is that sources assign the current timestamp when emitting > elements. > >> > They also periodically emit watermarks that tell us that no elements > with > >> > an earlier timestamp will be emitted. The watermarks propagate through > >> the > >> > operators. The window operator looks at the timestamp of an element > and > >> > puts it into the buffer that corresponds to that window. When the > window > >> > operator receives a watermark it will look at the in-flight windows > >> > (basically the buffers) and emit those windows where the window-end is > >> > before the watermark. > >> > > >> > For measuring throughput I did the following: The source emits tuples > of > >> > the form ("tuple", 1) in an infinite loop. The window operator sums up > >> the > >> > tuples, thereby counting how many tuples the window operator can > handle > >> in > >> > a given time window. There are two different implementations for the > >> > summation: 1) simply summing up the values in a mapWindow(), there you > >> get > >> > a List of all tuples and simple iterate over it. 2) using sum(1), > which > >> is > >> > implemented as a reduce() (that uses the pre-reducer optimisations). > >> > > >> > These are the performance numbers (Current is the current > implementation, > >> > Next is my proof-of-concept): > >> > > >> > Tumbling (1 sec): > >> > - Current/Map: 1.6 mio > >> > - Current/Reduce: 2 mio > >> > - Next/Map: 2.2 mio > >> > - Next/Reduce: 4 mio > >> > > >> > Sliding (5 sec, slide 1 sec): > >> > - Current/Map: ca 3 mio (fluctuates a lot) > >> > - Current/Reduce: No output > >> > - Next/Map: ca 4 mio (fluctuates) > >> > - Next/Reduce: 10 mio > >> > > >> > The Next/Reduce variant can basically scale indefinitely with window > size > >> > because the internal state does not rely on the number of elements > (it is > >> > just the current sum). The pre-reducer for sliding elements cannot > handle > >> > the amount of tuples, it produces no output. For the two Map variants > the > >> > performance fluctuates because they always keep all the elements in an > >> > internal buffer before emission, this seems to tax the garbage > collector > >> a > >> > bit and leads to random pauses. > >> > > >> > One thing that should be noted is that I had to disable the > fake-element > >> > emission thread, otherwise the Current versions would deadlock. > >> > > >> > So, I started working on this because I thought that out-of-order > >> > processing would be necessary for correctness. And it is certainly, > But > >> the > >> > proof-of-concept also shows that performance can be greatly improved. > >> > > >> > On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote: > >> >> > >> >> I agree lets separate these topics from each other so we can get > faster > >> >> resolution. > >> >> > >> >> There is already a state discussion in the thread we started with > Paris. > >> >> > >> >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> > >> > wrote: > >> >> > >> >> > I agree with supporting out-of-order out of the box :-), even if > this > >> > means > >> >> > a major refactoring. This is the right time to refactor the > streaming > >> > API > >> >> > before we pull it out of beta. I think that this is more important > >> than > >> > new > >> >> > features in the streaming API, which can be prioritized once the > API > >> is > >> > out > >> >> > of beta (meaning, that IMO this is the right time to stall PRs > until > >> we > >> >> > agree on the design). > >> >> > > >> >> > There are three sections in the document: windowing, state, and > API. > >> How > >> >> > convoluted are those with each other? Can we separate the > discussion > >> or > >> > do > >> >> > we need to discuss those all together? I think part of the > difficulty > >> is > >> >> > that we are discussing three design choices at once. > >> >> > > >> >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < > [hidden email]> > >> >> > wrote: > >> >> > > >> >> > > Out of order is ubiquitous in the real-world. Typically, what > >> > happens is > >> >> > > that businesses will declare a maximum allowable delay for > delayed > >> >> > > transactions and will commit to results when that delay is > reached. > >> >> > > Transactions that arrive later than this cutoff are collected > >> > specially > >> >> > as > >> >> > > corrections which are reported/used when possible. > >> >> > > > >> >> > > Clearly, ordering can also be violated during processing, but if > the > >> > data > >> >> > > is originally out of order the situation can't be repaired by any > >> >> > protocol > >> >> > > fixes that prevent transactions from becoming disordered but has > to > >> >> > handled > >> >> > > at the data level. > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < > >> [hidden email] > >> >> > >> >> > > wrote: > >> >> > > > >> >> > > > I also don't like big changes but sometimes they are necessary. > >> The > >> >> > > reason > >> >> > > > why I'm so adamant about out-of-order processing is that > >> > out-of-order > >> >> > > > elements are not some exception that occurs once in a while; > they > >> > occur > >> >> > > > constantly in a distributed system. For example, in this: > >> >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the > >> resulting > >> >> > > > windows > >> >> > > > are completely bogus because the current windowing system > assumes > >> >> > > elements > >> >> > > > to globally arrive in order, which is simply not true. (The > >> example > >> >> > has a > >> >> > > > source that generates increasing integers. Then these pass > >> through a > >> >> > map > >> >> > > > and are unioned with the original DataStream before a window > >> > operator.) > >> >> > > > This simulates elements arriving from different operators at a > >> >> > windowing > >> >> > > > operator. The example is also DOP=1, I imagine this to get > worse > >> > with > >> >> > > > higher DOP. > >> >> > > > > >> >> > > > What do you mean by costly? As I said, I have a > proof-of-concept > >> >> > > windowing > >> >> > > > operator that can handle out-or-order elements. This is an > example > >> >> > using > >> >> > > > the current Flink API: > >> >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > >> >> > > > (It is an infinite source of tuples and a 5 second window > operator > >> > that > >> >> > > > counts the tuples.) The first problem is that this code > deadlocks > >> >> > because > >> >> > > > of the thread that emits fake elements. If I disable the fake > >> > element > >> >> > > code > >> >> > > > it works, but the throughput using my mockup is 4 times higher > . > >> The > >> >> > gap > >> >> > > > widens dramatically if the window size increases. > >> >> > > > > >> >> > > > So, it actually increases performance (unless I'm making a > mistake > >> > in > >> >> > my > >> >> > > > explorations) and can handle elements that arrive out-of-order > >> > (which > >> >> > > > happens basically always in a real-world windowing use-cases). > >> >> > > > > >> >> > > > > >> >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> > >> wrote: > >> >> > > > > >> >> > > > > What I like a lot about Aljoscha's proposed design is that we > >> > need no > >> >> > > > > different code for "system time" vs. "event time". It only > >> > differs in > >> >> > > > where > >> >> > > > > the timestamps are assigned. > >> >> > > > > > >> >> > > > > The OOP approach also gives you the semantics of total > ordering > >> >> > without > >> >> > > > > imposing merges on the streams. > >> >> > > > > > >> >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > >> >> > > > > [hidden email]> wrote: > >> >> > > > > > >> >> > > > > > I agree that there should be multiple alternatives the > user(!) > >> > can > >> >> > > > > > choose from. Partial out-of-order processing works for > >> many/most > >> >> > > > > > aggregates. However, if you consider > Event-Pattern-Matching, > >> > global > >> >> > > > > > ordering in necessary (even if the performance penalty > might > >> be > >> >> > > high). > >> >> > > > > > > >> >> > > > > > I would also keep "system-time windows" as an alternative > to > >> >> > "source > >> >> > > > > > assigned ts-windows". > >> >> > > > > > > >> >> > > > > > It might also be interesting to consider the following > paper > >> for > >> >> > > > > > overlapping windows: "Resource sharing in continuous > >> > sliding-window > >> >> > > > > > aggregates" > >> >> > > > > > > >> >> > > > > > > https://dl.acm.org/citation.cfm?id=1316720 > >> >> > > > > > > >> >> > > > > > -Matthias > >> >> > > > > > > >> >> > > > > > On 06/23/2015 10:37 AM, Gyula Fóra wrote: > >> >> > > > > > > Hey > >> >> > > > > > > > >> >> > > > > > > I think we should not block PRs unnecessarily if your > >> > suggested > >> >> > > > changes > >> >> > > > > > > might touch them at some point. > >> >> > > > > > > > >> >> > > > > > > Also I still think we should not put everything in the > >> > Datastream > >> >> > > > > because > >> >> > > > > > > it will be a huge mess. > >> >> > > > > > > > >> >> > > > > > > Also we need to agree on the out of order processing, > >> whether > >> > we > >> >> > > want > >> >> > > > > it > >> >> > > > > > > the way you proposed it(which is quite costly). Another > >> >> > alternative > >> >> > > > > > > approach there which fits in the current windowing is to > >> > filter > >> >> > out > >> >> > > > if > >> >> > > > > > > order events and apply a special handling operator on > them. > >> > This > >> >> > > > would > >> >> > > > > be > >> >> > > > > > > fairly lightweight. > >> >> > > > > > > > >> >> > > > > > > My point is that we need to consider some alternative > >> > solutions. > >> >> > > And > >> >> > > > we > >> >> > > > > > > should not block contributions along the way. > >> >> > > > > > > > >> >> > > > > > > Cheers > >> >> > > > > > > Gyula > >> >> > > > > > > > >> >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > >> >> > > > [hidden email]> > >> >> > > > > > > wrote: > >> >> > > > > > > > >> >> > > > > > >> The reason I posted this now is that we need to think > about > >> > the > >> >> > > API > >> >> > > > > and > >> >> > > > > > >> windowing before proceeding with the PRs of Gabor > (inverse > >> >> > reduce) > >> >> > > > and > >> >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream). > >> >> > > > > > >> > >> >> > > > > > >> For the windowing, I think that the current model does > not > >> > work > >> >> > > for > >> >> > > > > > >> out-of-order processing. Therefore, the whole windowing > >> >> > > > infrastructure > >> >> > > > > > will > >> >> > > > > > >> basically have to be redone. Meaning also that any work > on > >> > the > >> >> > > > > > >> pre-aggregators or optimizations that we do now becomes > >> > useless. > >> >> > > > > > >> > >> >> > > > > > >> For the API, I proposed to restructure the interactions > >> > between > >> >> > > all > >> >> > > > > the > >> >> > > > > > >> different *DataStream classes and grouping/windowing. > (See > >> > API > >> >> > > > section > >> >> > > > > > of > >> >> > > > > > >> the doc I posted.) > >> >> > > > > > >> > >> >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < > >> [hidden email] > >> >> > >> >> > > > wrote: > >> >> > > > > > >> > >> >> > > > > > >>> Hi Aljoscha, > >> >> > > > > > >>> > >> >> > > > > > >>> Thanks for the nice summary, this is a very good > >> initiative. > >> >> > > > > > >>> > >> >> > > > > > >>> I added some comments to the respective sections > (where I > >> > didnt > >> >> > > > fully > >> >> > > > > > >> agree > >> >> > > > > > >>> :).). > >> >> > > > > > >>> At some point I think it would be good to have a public > >> > hangout > >> >> > > > > session > >> >> > > > > > >> on > >> >> > > > > > >>> this, which could make a more dynamic discussion. > >> >> > > > > > >>> > >> >> > > > > > >>> Cheers, > >> >> > > > > > >>> Gyula > >> >> > > > > > >>> > >> >> > > > > > >>> Aljoscha Krettek <[hidden email]> ezt írta > (időpont: > >> >> > 2015. > >> >> > > > jún. > >> >> > > > > > >> 22., > >> >> > > > > > >>> H, 21:34): > >> >> > > > > > >>> > >> >> > > > > > >>>> Hi, > >> >> > > > > > >>>> with people proposing changes to the streaming part I > >> also > >> >> > > wanted > >> >> > > > to > >> >> > > > > > >>> throw > >> >> > > > > > >>>> my hat into the ring. :D > >> >> > > > > > >>>> > >> >> > > > > > >>>> During the last few months, while I was getting > >> acquainted > >> >> > with > >> >> > > > the > >> >> > > > > > >>>> streaming system, I wrote down some thoughts I had > about > >> > how > >> >> > > > things > >> >> > > > > > >> could > >> >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent > >> shape > >> >> > now, > >> >> > > > so > >> >> > > > > > >>> please > >> >> > > > > > >>>> have a look if you are interested in this: > >> >> > > > > > >>>> > >> >> > > > > > >>>> > >> >> > > > > > >>> > >> >> > > > > > >> > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > > >> > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > >> >> > > > > > >>>> > >> >> > > > > > >>>> This mostly covers: > >> >> > > > > > >>>> - Timestamps assigned at sources > >> >> > > > > > >>>> - Out-of-order processing of elements in window > >> operators > >> >> > > > > > >>>> - API design > >> >> > > > > > >>>> > >> >> > > > > > >>>> Please let me know what you think. Comment in the > >> document > >> > or > >> >> > > here > >> >> > > > > in > >> >> > > > > > >> the > >> >> > > > > > >>>> mailing list. > >> >> > > > > > >>>> > >> >> > > > > > >>>> I have a PR in the makings that would introduce source > >> >> > > timestamps > >> >> > > > > and > >> >> > > > > > >>>> watermarks for keeping track of them. I also hacked a > >> >> > > > > proof-of-concept > >> >> > > > > > >>> of a > >> >> > > > > > >>>> windowing system that is able to process out-of-order > >> > elements > >> >> > > > > using a > >> >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient > >> >> > > > > > >> pre-aggregations.) > >> >> > > > > > >>>> > >> >> > > > > > >>>> Cheers, > >> >> > > > > > >>>> Aljoscha > >> >> > > > > > >>>> > >> >> > > > > > >>> > >> >> > > > > > >> > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> > |
Hi Aljoscha,
I like that you are pushing in this direction. However, IMHO you misinterpreter the current approach. It does not assume that tuples arrive in-order; the current approach has no notion about a pre-defined-order (for example, the order in which the event are created). There is only the notion of "arrival-order" at the operator. From this "arrival-order" perspective, the result are correct(!). Windowing in the current approach means for example, "sum up an attribute of all events you *received* in the last 5 seconds". That is a different meaning that "sum up an attribute of all event that *occurred* in the last 5 seconds". Both queries are valid and Flink should support both IMHO. -Matthias On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: > Yes, now this also processes about 3 mio Elements (Window Size 5 sec, Slide > 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. > > Performance is not my main concern, however. My concern is that the current > model assumes elements to arrive in order, which is simply not true. > > In your code you have these lines for specifying the window: > .window(Time.of(1l, TimeUnit.SECONDS)) > .every(Time.of(1l, TimeUnit.SECONDS)) > > Although this semantically specifies a tumbling window of size 1 sec I'm > afraid it uses the sliding window logic internally (because of the > .every()). > > In my tests I only have the first line. > > > On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <[hidden email]> wrote: > >> I'm very sorry, I had a bug in the InversePreReducer. It should be >> fixed now. Can you please run it again? >> >> I also tried to reproduce some of your performance numbers, but I'm >> getting only less than 1/10th of yours. For example, in the Tumbling >> case, Current/Reduce produces only ~100000 for me. Do you have any >> idea what I could be doing wrong? My code: >> http://pastebin.com/zbEjmGhk >> I am running it on a 2 GHz Core i7. >> >> Best regards, >> Gabor >> >> >> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>: >>> Hi, >>> I also ran the tests on top of PR 856 (inverse reducer) now. The results >>> seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all >>> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec) >>> (Theoretically there would be 5000 tuples in 5 seconds but this is due to >>> overhead). These are the results for the inverse reduce optimisation: >>> (Tuple 0,38) >>> (Tuple 0,829) >>> (Tuple 0,1625) >>> (Tuple 0,2424) >>> (Tuple 0,3190) >>> (Tuple 0,3198) >>> (Tuple 0,-339368) >>> (Tuple 0,-1315725) >>> (Tuple 0,-2932932) >>> (Tuple 0,-5082735) >>> (Tuple 0,-7743256) >>> (Tuple 0,75701046) >>> (Tuple 0,642829470) >>> (Tuple 0,2242018381) >>> (Tuple 0,5190708618) >>> (Tuple 0,10060360311) >>> (Tuple 0,-94254951) >>> (Tuple 0,-219806321293) >>> (Tuple 0,-1258895232699) >>> (Tuple 0,-4074432596329) >>> >>> One line is one emitted window count. This is what happens when I remove >>> the Thread.sleep(1): >>> (Tuple 0,660676) >>> (Tuple 0,2553733) >>> (Tuple 0,3542696) >>> (Tuple 0,1) >>> (Tuple 0,1107035) >>> (Tuple 0,2549491) >>> (Tuple 0,4100387) >>> (Tuple 0,-8406583360092) >>> (Tuple 0,-8406582150743) >>> (Tuple 0,-8406580427190) >>> (Tuple 0,-8406580427190) >>> (Tuple 0,-8406580427190) >>> (Tuple 0,6847279255682044995) >>> (Tuple 0,6847279255682044995) >>> (Tuple 0,-5390528042713628318) >>> (Tuple 0,-5390528042711551780) >>> (Tuple 0,-5390528042711551780) >>> >>> So at some point the pre-reducer seems to go haywire and does not recover >>> from it. The good thing is that it does produce results now, where the >>> previous Current/Reduce would simply hang and not produce any output. >>> >>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote: >>> >>>> Hello, >>>> >>>> Aljoscha, can you please try the performance test of Current/Reduce >>>> with the InversePreReducer in PR 856? (If you just call sum, it will >>>> use an InversePreReducer.) It would be an interesting test, because >>>> the inverse function optimization really depends on the stream being >>>> ordered, and I think it has the potential of being faster then >>>> Next/Reduce. Especially if the window size is much larger than the >>>> slide size. >>>> >>>> Best regards, >>>> Gabor >>>> >>>> >>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>: >>>>> I think I'll have to elaborate a bit so I created a proof-of-concept >>>>> implementation of my Ideas and ran some throughput measurements to >>>>> alleviate concerns about performance. >>>>> >>>>> First, though, I want to highlight again why the current approach does >>>> not >>>>> work with out-of-order elements (which, again, occur constantly due to >>>> the >>>>> distributed nature of the system). This is the example I posted >> earlier: >>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks >>>> like >>>>> this: >>>>> >>>>> +--+ >>>>> | | Source >>>>> +--+ >>>>> | >>>>> +-----+ >>>>> | | >>>>> | +--+ >>>>> | | | Identity Map >>>>> | +--+ >>>>> | | >>>>> +-----+ >>>>> | >>>>> +--+ >>>>> | | Window >>>>> +--+ >>>>> | >>>>> | >>>>> +--+ >>>>> | | Sink >>>>> +--+ >>>>> >>>>> So all it does is pass the elements through an identity map and then >>>> merge >>>>> them again before the window operator. The source emits ascending >>>> integers >>>>> and the window operator has a custom timestamp extractor that uses the >>>>> integer itself as the timestamp and should create windows of size 4 >> (that >>>>> is elements with timestamp 0-3 are one window, the next are the >> elements >>>>> with timestamp 4-8, and so on). Since the topology basically doubles >> the >>>>> elements form the source I would expect to get these windows: >>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 >>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 >>>>> >>>>> The output is this, however: >>>>> Window: 0, 1, 2, 3, >>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >>>>> Window: 8, 9, 10, 11, >>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >>>>> Window: 16, 17, 18, 19, >>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, >>>>> Window: 24, 25, 26, 27, >>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >>>>> >>>>> The reason is that the elements simply arrive out-of-order. Imagine >> what >>>>> would happen if the elements actually arrived with some delay from >>>>> different operations. >>>>> >>>>> Now, on to the performance numbers. The proof-of-concept I created is >>>>> available here: >>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The >>>> basic >>>>> idea is that sources assign the current timestamp when emitting >> elements. >>>>> They also periodically emit watermarks that tell us that no elements >> with >>>>> an earlier timestamp will be emitted. The watermarks propagate through >>>> the >>>>> operators. The window operator looks at the timestamp of an element >> and >>>>> puts it into the buffer that corresponds to that window. When the >> window >>>>> operator receives a watermark it will look at the in-flight windows >>>>> (basically the buffers) and emit those windows where the window-end is >>>>> before the watermark. >>>>> >>>>> For measuring throughput I did the following: The source emits tuples >> of >>>>> the form ("tuple", 1) in an infinite loop. The window operator sums up >>>> the >>>>> tuples, thereby counting how many tuples the window operator can >> handle >>>> in >>>>> a given time window. There are two different implementations for the >>>>> summation: 1) simply summing up the values in a mapWindow(), there you >>>> get >>>>> a List of all tuples and simple iterate over it. 2) using sum(1), >> which >>>> is >>>>> implemented as a reduce() (that uses the pre-reducer optimisations). >>>>> >>>>> These are the performance numbers (Current is the current >> implementation, >>>>> Next is my proof-of-concept): >>>>> >>>>> Tumbling (1 sec): >>>>> - Current/Map: 1.6 mio >>>>> - Current/Reduce: 2 mio >>>>> - Next/Map: 2.2 mio >>>>> - Next/Reduce: 4 mio >>>>> >>>>> Sliding (5 sec, slide 1 sec): >>>>> - Current/Map: ca 3 mio (fluctuates a lot) >>>>> - Current/Reduce: No output >>>>> - Next/Map: ca 4 mio (fluctuates) >>>>> - Next/Reduce: 10 mio >>>>> >>>>> The Next/Reduce variant can basically scale indefinitely with window >> size >>>>> because the internal state does not rely on the number of elements >> (it is >>>>> just the current sum). The pre-reducer for sliding elements cannot >> handle >>>>> the amount of tuples, it produces no output. For the two Map variants >> the >>>>> performance fluctuates because they always keep all the elements in an >>>>> internal buffer before emission, this seems to tax the garbage >> collector >>>> a >>>>> bit and leads to random pauses. >>>>> >>>>> One thing that should be noted is that I had to disable the >> fake-element >>>>> emission thread, otherwise the Current versions would deadlock. >>>>> >>>>> So, I started working on this because I thought that out-of-order >>>>> processing would be necessary for correctness. And it is certainly, >> But >>>> the >>>>> proof-of-concept also shows that performance can be greatly improved. >>>>> >>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> wrote: >>>>>> >>>>>> I agree lets separate these topics from each other so we can get >> faster >>>>>> resolution. >>>>>> >>>>>> There is already a state discussion in the thread we started with >> Paris. >>>>>> >>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email]> >>>>> wrote: >>>>>> >>>>>>> I agree with supporting out-of-order out of the box :-), even if >> this >>>>> means >>>>>>> a major refactoring. This is the right time to refactor the >> streaming >>>>> API >>>>>>> before we pull it out of beta. I think that this is more important >>>> than >>>>> new >>>>>>> features in the streaming API, which can be prioritized once the >> API >>>> is >>>>> out >>>>>>> of beta (meaning, that IMO this is the right time to stall PRs >> until >>>> we >>>>>>> agree on the design). >>>>>>> >>>>>>> There are three sections in the document: windowing, state, and >> API. >>>> How >>>>>>> convoluted are those with each other? Can we separate the >> discussion >>>> or >>>>> do >>>>>>> we need to discuss those all together? I think part of the >> difficulty >>>> is >>>>>>> that we are discussing three design choices at once. >>>>>>> >>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < >> [hidden email]> >>>>>>> wrote: >>>>>>> >>>>>>>> Out of order is ubiquitous in the real-world. Typically, what >>>>> happens is >>>>>>>> that businesses will declare a maximum allowable delay for >> delayed >>>>>>>> transactions and will commit to results when that delay is >> reached. >>>>>>>> Transactions that arrive later than this cutoff are collected >>>>> specially >>>>>>> as >>>>>>>> corrections which are reported/used when possible. >>>>>>>> >>>>>>>> Clearly, ordering can also be violated during processing, but if >> the >>>>> data >>>>>>>> is originally out of order the situation can't be repaired by any >>>>>>> protocol >>>>>>>> fixes that prevent transactions from becoming disordered but has >> to >>>>>>> handled >>>>>>>> at the data level. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >>>> [hidden email] >>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I also don't like big changes but sometimes they are necessary. >>>> The >>>>>>>> reason >>>>>>>>> why I'm so adamant about out-of-order processing is that >>>>> out-of-order >>>>>>>>> elements are not some exception that occurs once in a while; >> they >>>>> occur >>>>>>>>> constantly in a distributed system. For example, in this: >>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the >>>> resulting >>>>>>>>> windows >>>>>>>>> are completely bogus because the current windowing system >> assumes >>>>>>>> elements >>>>>>>>> to globally arrive in order, which is simply not true. (The >>>> example >>>>>>> has a >>>>>>>>> source that generates increasing integers. Then these pass >>>> through a >>>>>>> map >>>>>>>>> and are unioned with the original DataStream before a window >>>>> operator.) >>>>>>>>> This simulates elements arriving from different operators at a >>>>>>> windowing >>>>>>>>> operator. The example is also DOP=1, I imagine this to get >> worse >>>>> with >>>>>>>>> higher DOP. >>>>>>>>> >>>>>>>>> What do you mean by costly? As I said, I have a >> proof-of-concept >>>>>>>> windowing >>>>>>>>> operator that can handle out-or-order elements. This is an >> example >>>>>>> using >>>>>>>>> the current Flink API: >>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >>>>>>>>> (It is an infinite source of tuples and a 5 second window >> operator >>>>> that >>>>>>>>> counts the tuples.) The first problem is that this code >> deadlocks >>>>>>> because >>>>>>>>> of the thread that emits fake elements. If I disable the fake >>>>> element >>>>>>>> code >>>>>>>>> it works, but the throughput using my mockup is 4 times higher >> . >>>> The >>>>>>> gap >>>>>>>>> widens dramatically if the window size increases. >>>>>>>>> >>>>>>>>> So, it actually increases performance (unless I'm making a >> mistake >>>>> in >>>>>>> my >>>>>>>>> explorations) and can handle elements that arrive out-of-order >>>>> (which >>>>>>>>> happens basically always in a real-world windowing use-cases). >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> >>>> wrote: >>>>>>>>> >>>>>>>>>> What I like a lot about Aljoscha's proposed design is that we >>>>> need no >>>>>>>>>> different code for "system time" vs. "event time". It only >>>>> differs in >>>>>>>>> where >>>>>>>>>> the timestamps are assigned. >>>>>>>>>> >>>>>>>>>> The OOP approach also gives you the semantics of total >> ordering >>>>>>> without >>>>>>>>>> imposing merges on the streams. >>>>>>>>>> >>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >>>>>>>>>> [hidden email]> wrote: >>>>>>>>>> >>>>>>>>>>> I agree that there should be multiple alternatives the >> user(!) >>>>> can >>>>>>>>>>> choose from. Partial out-of-order processing works for >>>> many/most >>>>>>>>>>> aggregates. However, if you consider >> Event-Pattern-Matching, >>>>> global >>>>>>>>>>> ordering in necessary (even if the performance penalty >> might >>>> be >>>>>>>> high). >>>>>>>>>>> >>>>>>>>>>> I would also keep "system-time windows" as an alternative >> to >>>>>>> "source >>>>>>>>>>> assigned ts-windows". >>>>>>>>>>> >>>>>>>>>>> It might also be interesting to consider the following >> paper >>>> for >>>>>>>>>>> overlapping windows: "Resource sharing in continuous >>>>> sliding-window >>>>>>>>>>> aggregates" >>>>>>>>>>> >>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>>>>>> Hey >>>>>>>>>>>> >>>>>>>>>>>> I think we should not block PRs unnecessarily if your >>>>> suggested >>>>>>>>> changes >>>>>>>>>>>> might touch them at some point. >>>>>>>>>>>> >>>>>>>>>>>> Also I still think we should not put everything in the >>>>> Datastream >>>>>>>>>> because >>>>>>>>>>>> it will be a huge mess. >>>>>>>>>>>> >>>>>>>>>>>> Also we need to agree on the out of order processing, >>>> whether >>>>> we >>>>>>>> want >>>>>>>>>> it >>>>>>>>>>>> the way you proposed it(which is quite costly). Another >>>>>>> alternative >>>>>>>>>>>> approach there which fits in the current windowing is to >>>>> filter >>>>>>> out >>>>>>>>> if >>>>>>>>>>>> order events and apply a special handling operator on >> them. >>>>> This >>>>>>>>> would >>>>>>>>>> be >>>>>>>>>>>> fairly lightweight. >>>>>>>>>>>> >>>>>>>>>>>> My point is that we need to consider some alternative >>>>> solutions. >>>>>>>> And >>>>>>>>> we >>>>>>>>>>>> should not block contributions along the way. >>>>>>>>>>>> >>>>>>>>>>>> Cheers >>>>>>>>>>>> Gyula >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >>>>>>>>> [hidden email]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> The reason I posted this now is that we need to think >> about >>>>> the >>>>>>>> API >>>>>>>>>> and >>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor >> (inverse >>>>>>> reduce) >>>>>>>>> and >>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream). >>>>>>>>>>>>> >>>>>>>>>>>>> For the windowing, I think that the current model does >> not >>>>> work >>>>>>>> for >>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing >>>>>>>>> infrastructure >>>>>>>>>>> will >>>>>>>>>>>>> basically have to be redone. Meaning also that any work >> on >>>>> the >>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes >>>>> useless. >>>>>>>>>>>>> >>>>>>>>>>>>> For the API, I proposed to restructure the interactions >>>>> between >>>>>>>> all >>>>>>>>>> the >>>>>>>>>>>>> different *DataStream classes and grouping/windowing. >> (See >>>>> API >>>>>>>>> section >>>>>>>>>>> of >>>>>>>>>>>>> the doc I posted.) >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >>>> [hidden email] >>>>>> >>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the nice summary, this is a very good >>>> initiative. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I added some comments to the respective sections >> (where I >>>>> didnt >>>>>>>>> fully >>>>>>>>>>>>> agree >>>>>>>>>>>>>> :).). >>>>>>>>>>>>>> At some point I think it would be good to have a public >>>>> hangout >>>>>>>>>> session >>>>>>>>>>>>> on >>>>>>>>>>>>>> this, which could make a more dynamic discussion. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>> >>>>>>>>>>>>>> Aljoscha Krettek <[hidden email]> ezt írta >> (időpont: >>>>>>> 2015. >>>>>>>>> jún. >>>>>>>>>>>>> 22., >>>>>>>>>>>>>> H, 21:34): >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> with people proposing changes to the streaming part I >>>> also >>>>>>>> wanted >>>>>>>>> to >>>>>>>>>>>>>> throw >>>>>>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> During the last few months, while I was getting >>>> acquainted >>>>>>> with >>>>>>>>> the >>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had >> about >>>>> how >>>>>>>>> things >>>>>>>>>>>>> could >>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent >>>> shape >>>>>>> now, >>>>>>>>> so >>>>>>>>>>>>>> please >>>>>>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This mostly covers: >>>>>>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>>>>>> - Out-of-order processing of elements in window >>>> operators >>>>>>>>>>>>>>> - API design >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please let me know what you think. Comment in the >>>> document >>>>> or >>>>>>>> here >>>>>>>>>> in >>>>>>>>>>>>> the >>>>>>>>>>>>>>> mailing list. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a PR in the makings that would introduce source >>>>>>>> timestamps >>>>>>>>>> and >>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a >>>>>>>>>> proof-of-concept >>>>>>>>>>>>>> of a >>>>>>>>>>>>>>> windowing system that is able to process out-of-order >>>>> elements >>>>>>>>>> using a >>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient >>>>>>>>>>>>> pre-aggregations.) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> > |
Yes, I am aware of this requirement and it would also be supported in my
proposed model. The problem is, that the "custom timestamp" feature gives the impression that the elements would be windowed according to a user-timestamp. The results, however, are wrong because of the assumption about elements arriving in order. (This is what I was trying to show with my fancy ASCII art and result output. On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <[hidden email]> wrote: > Hi Aljoscha, > > I like that you are pushing in this direction. However, IMHO you > misinterpreter the current approach. It does not assume that tuples > arrive in-order; the current approach has no notion about a > pre-defined-order (for example, the order in which the event are > created). There is only the notion of "arrival-order" at the operator. > From this "arrival-order" perspective, the result are correct(!). > > Windowing in the current approach means for example, "sum up an > attribute of all events you *received* in the last 5 seconds". That is a > different meaning that "sum up an attribute of all event that *occurred* > in the last 5 seconds". Both queries are valid and Flink should support > both IMHO. > > > -Matthias > > > > On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: > > Yes, now this also processes about 3 mio Elements (Window Size 5 sec, > Slide > > 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. > > > > Performance is not my main concern, however. My concern is that the > current > > model assumes elements to arrive in order, which is simply not true. > > > > In your code you have these lines for specifying the window: > > .window(Time.of(1l, TimeUnit.SECONDS)) > > .every(Time.of(1l, TimeUnit.SECONDS)) > > > > Although this semantically specifies a tumbling window of size 1 sec I'm > > afraid it uses the sliding window logic internally (because of the > > .every()). > > > > In my tests I only have the first line. > > > > > > On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <[hidden email]> wrote: > > > >> I'm very sorry, I had a bug in the InversePreReducer. It should be > >> fixed now. Can you please run it again? > >> > >> I also tried to reproduce some of your performance numbers, but I'm > >> getting only less than 1/10th of yours. For example, in the Tumbling > >> case, Current/Reduce produces only ~100000 for me. Do you have any > >> idea what I could be doing wrong? My code: > >> http://pastebin.com/zbEjmGhk > >> I am running it on a 2 GHz Core i7. > >> > >> Best regards, > >> Gabor > >> > >> > >> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>: > >>> Hi, > >>> I also ran the tests on top of PR 856 (inverse reducer) now. The > results > >>> seem incorrect. When I insert a Thread.sleep(1) in the tuple source, > all > >>> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 > sec) > >>> (Theoretically there would be 5000 tuples in 5 seconds but this is due > to > >>> overhead). These are the results for the inverse reduce optimisation: > >>> (Tuple 0,38) > >>> (Tuple 0,829) > >>> (Tuple 0,1625) > >>> (Tuple 0,2424) > >>> (Tuple 0,3190) > >>> (Tuple 0,3198) > >>> (Tuple 0,-339368) > >>> (Tuple 0,-1315725) > >>> (Tuple 0,-2932932) > >>> (Tuple 0,-5082735) > >>> (Tuple 0,-7743256) > >>> (Tuple 0,75701046) > >>> (Tuple 0,642829470) > >>> (Tuple 0,2242018381) > >>> (Tuple 0,5190708618) > >>> (Tuple 0,10060360311) > >>> (Tuple 0,-94254951) > >>> (Tuple 0,-219806321293) > >>> (Tuple 0,-1258895232699) > >>> (Tuple 0,-4074432596329) > >>> > >>> One line is one emitted window count. This is what happens when I > remove > >>> the Thread.sleep(1): > >>> (Tuple 0,660676) > >>> (Tuple 0,2553733) > >>> (Tuple 0,3542696) > >>> (Tuple 0,1) > >>> (Tuple 0,1107035) > >>> (Tuple 0,2549491) > >>> (Tuple 0,4100387) > >>> (Tuple 0,-8406583360092) > >>> (Tuple 0,-8406582150743) > >>> (Tuple 0,-8406580427190) > >>> (Tuple 0,-8406580427190) > >>> (Tuple 0,-8406580427190) > >>> (Tuple 0,6847279255682044995) > >>> (Tuple 0,6847279255682044995) > >>> (Tuple 0,-5390528042713628318) > >>> (Tuple 0,-5390528042711551780) > >>> (Tuple 0,-5390528042711551780) > >>> > >>> So at some point the pre-reducer seems to go haywire and does not > recover > >>> from it. The good thing is that it does produce results now, where the > >>> previous Current/Reduce would simply hang and not produce any output. > >>> > >>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote: > >>> > >>>> Hello, > >>>> > >>>> Aljoscha, can you please try the performance test of Current/Reduce > >>>> with the InversePreReducer in PR 856? (If you just call sum, it will > >>>> use an InversePreReducer.) It would be an interesting test, because > >>>> the inverse function optimization really depends on the stream being > >>>> ordered, and I think it has the potential of being faster then > >>>> Next/Reduce. Especially if the window size is much larger than the > >>>> slide size. > >>>> > >>>> Best regards, > >>>> Gabor > >>>> > >>>> > >>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>: > >>>>> I think I'll have to elaborate a bit so I created a proof-of-concept > >>>>> implementation of my Ideas and ran some throughput measurements to > >>>>> alleviate concerns about performance. > >>>>> > >>>>> First, though, I want to highlight again why the current approach > does > >>>> not > >>>>> work with out-of-order elements (which, again, occur constantly due > to > >>>> the > >>>>> distributed nature of the system). This is the example I posted > >> earlier: > >>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan > looks > >>>> like > >>>>> this: > >>>>> > >>>>> +--+ > >>>>> | | Source > >>>>> +--+ > >>>>> | > >>>>> +-----+ > >>>>> | | > >>>>> | +--+ > >>>>> | | | Identity Map > >>>>> | +--+ > >>>>> | | > >>>>> +-----+ > >>>>> | > >>>>> +--+ > >>>>> | | Window > >>>>> +--+ > >>>>> | > >>>>> | > >>>>> +--+ > >>>>> | | Sink > >>>>> +--+ > >>>>> > >>>>> So all it does is pass the elements through an identity map and then > >>>> merge > >>>>> them again before the window operator. The source emits ascending > >>>> integers > >>>>> and the window operator has a custom timestamp extractor that uses > the > >>>>> integer itself as the timestamp and should create windows of size 4 > >> (that > >>>>> is elements with timestamp 0-3 are one window, the next are the > >> elements > >>>>> with timestamp 4-8, and so on). Since the topology basically doubles > >> the > >>>>> elements form the source I would expect to get these windows: > >>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 > >>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 > >>>>> > >>>>> The output is this, however: > >>>>> Window: 0, 1, 2, 3, > >>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > >>>>> Window: 8, 9, 10, 11, > >>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > >>>>> Window: 16, 17, 18, 19, > >>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > >>>>> Window: 24, 25, 26, 27, > >>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > >>>>> > >>>>> The reason is that the elements simply arrive out-of-order. Imagine > >> what > >>>>> would happen if the elements actually arrived with some delay from > >>>>> different operations. > >>>>> > >>>>> Now, on to the performance numbers. The proof-of-concept I created is > >>>>> available here: > >>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. > The > >>>> basic > >>>>> idea is that sources assign the current timestamp when emitting > >> elements. > >>>>> They also periodically emit watermarks that tell us that no elements > >> with > >>>>> an earlier timestamp will be emitted. The watermarks propagate > through > >>>> the > >>>>> operators. The window operator looks at the timestamp of an element > >> and > >>>>> puts it into the buffer that corresponds to that window. When the > >> window > >>>>> operator receives a watermark it will look at the in-flight windows > >>>>> (basically the buffers) and emit those windows where the window-end > is > >>>>> before the watermark. > >>>>> > >>>>> For measuring throughput I did the following: The source emits tuples > >> of > >>>>> the form ("tuple", 1) in an infinite loop. The window operator sums > up > >>>> the > >>>>> tuples, thereby counting how many tuples the window operator can > >> handle > >>>> in > >>>>> a given time window. There are two different implementations for the > >>>>> summation: 1) simply summing up the values in a mapWindow(), there > you > >>>> get > >>>>> a List of all tuples and simple iterate over it. 2) using sum(1), > >> which > >>>> is > >>>>> implemented as a reduce() (that uses the pre-reducer optimisations). > >>>>> > >>>>> These are the performance numbers (Current is the current > >> implementation, > >>>>> Next is my proof-of-concept): > >>>>> > >>>>> Tumbling (1 sec): > >>>>> - Current/Map: 1.6 mio > >>>>> - Current/Reduce: 2 mio > >>>>> - Next/Map: 2.2 mio > >>>>> - Next/Reduce: 4 mio > >>>>> > >>>>> Sliding (5 sec, slide 1 sec): > >>>>> - Current/Map: ca 3 mio (fluctuates a lot) > >>>>> - Current/Reduce: No output > >>>>> - Next/Map: ca 4 mio (fluctuates) > >>>>> - Next/Reduce: 10 mio > >>>>> > >>>>> The Next/Reduce variant can basically scale indefinitely with window > >> size > >>>>> because the internal state does not rely on the number of elements > >> (it is > >>>>> just the current sum). The pre-reducer for sliding elements cannot > >> handle > >>>>> the amount of tuples, it produces no output. For the two Map variants > >> the > >>>>> performance fluctuates because they always keep all the elements in > an > >>>>> internal buffer before emission, this seems to tax the garbage > >> collector > >>>> a > >>>>> bit and leads to random pauses. > >>>>> > >>>>> One thing that should be noted is that I had to disable the > >> fake-element > >>>>> emission thread, otherwise the Current versions would deadlock. > >>>>> > >>>>> So, I started working on this because I thought that out-of-order > >>>>> processing would be necessary for correctness. And it is certainly, > >> But > >>>> the > >>>>> proof-of-concept also shows that performance can be greatly improved. > >>>>> > >>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> > wrote: > >>>>>> > >>>>>> I agree lets separate these topics from each other so we can get > >> faster > >>>>>> resolution. > >>>>>> > >>>>>> There is already a state discussion in the thread we started with > >> Paris. > >>>>>> > >>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email] > > > >>>>> wrote: > >>>>>> > >>>>>>> I agree with supporting out-of-order out of the box :-), even if > >> this > >>>>> means > >>>>>>> a major refactoring. This is the right time to refactor the > >> streaming > >>>>> API > >>>>>>> before we pull it out of beta. I think that this is more important > >>>> than > >>>>> new > >>>>>>> features in the streaming API, which can be prioritized once the > >> API > >>>> is > >>>>> out > >>>>>>> of beta (meaning, that IMO this is the right time to stall PRs > >> until > >>>> we > >>>>>>> agree on the design). > >>>>>>> > >>>>>>> There are three sections in the document: windowing, state, and > >> API. > >>>> How > >>>>>>> convoluted are those with each other? Can we separate the > >> discussion > >>>> or > >>>>> do > >>>>>>> we need to discuss those all together? I think part of the > >> difficulty > >>>> is > >>>>>>> that we are discussing three design choices at once. > >>>>>>> > >>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < > >> [hidden email]> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Out of order is ubiquitous in the real-world. Typically, what > >>>>> happens is > >>>>>>>> that businesses will declare a maximum allowable delay for > >> delayed > >>>>>>>> transactions and will commit to results when that delay is > >> reached. > >>>>>>>> Transactions that arrive later than this cutoff are collected > >>>>> specially > >>>>>>> as > >>>>>>>> corrections which are reported/used when possible. > >>>>>>>> > >>>>>>>> Clearly, ordering can also be violated during processing, but if > >> the > >>>>> data > >>>>>>>> is originally out of order the situation can't be repaired by any > >>>>>>> protocol > >>>>>>>> fixes that prevent transactions from becoming disordered but has > >> to > >>>>>>> handled > >>>>>>>> at the data level. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < > >>>> [hidden email] > >>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> I also don't like big changes but sometimes they are necessary. > >>>> The > >>>>>>>> reason > >>>>>>>>> why I'm so adamant about out-of-order processing is that > >>>>> out-of-order > >>>>>>>>> elements are not some exception that occurs once in a while; > >> they > >>>>> occur > >>>>>>>>> constantly in a distributed system. For example, in this: > >>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the > >>>> resulting > >>>>>>>>> windows > >>>>>>>>> are completely bogus because the current windowing system > >> assumes > >>>>>>>> elements > >>>>>>>>> to globally arrive in order, which is simply not true. (The > >>>> example > >>>>>>> has a > >>>>>>>>> source that generates increasing integers. Then these pass > >>>> through a > >>>>>>> map > >>>>>>>>> and are unioned with the original DataStream before a window > >>>>> operator.) > >>>>>>>>> This simulates elements arriving from different operators at a > >>>>>>> windowing > >>>>>>>>> operator. The example is also DOP=1, I imagine this to get > >> worse > >>>>> with > >>>>>>>>> higher DOP. > >>>>>>>>> > >>>>>>>>> What do you mean by costly? As I said, I have a > >> proof-of-concept > >>>>>>>> windowing > >>>>>>>>> operator that can handle out-or-order elements. This is an > >> example > >>>>>>> using > >>>>>>>>> the current Flink API: > >>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > >>>>>>>>> (It is an infinite source of tuples and a 5 second window > >> operator > >>>>> that > >>>>>>>>> counts the tuples.) The first problem is that this code > >> deadlocks > >>>>>>> because > >>>>>>>>> of the thread that emits fake elements. If I disable the fake > >>>>> element > >>>>>>>> code > >>>>>>>>> it works, but the throughput using my mockup is 4 times higher > >> . > >>>> The > >>>>>>> gap > >>>>>>>>> widens dramatically if the window size increases. > >>>>>>>>> > >>>>>>>>> So, it actually increases performance (unless I'm making a > >> mistake > >>>>> in > >>>>>>> my > >>>>>>>>> explorations) and can handle elements that arrive out-of-order > >>>>> (which > >>>>>>>>> happens basically always in a real-world windowing use-cases). > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> > >>>> wrote: > >>>>>>>>> > >>>>>>>>>> What I like a lot about Aljoscha's proposed design is that we > >>>>> need no > >>>>>>>>>> different code for "system time" vs. "event time". It only > >>>>> differs in > >>>>>>>>> where > >>>>>>>>>> the timestamps are assigned. > >>>>>>>>>> > >>>>>>>>>> The OOP approach also gives you the semantics of total > >> ordering > >>>>>>> without > >>>>>>>>>> imposing merges on the streams. > >>>>>>>>>> > >>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > >>>>>>>>>> [hidden email]> wrote: > >>>>>>>>>> > >>>>>>>>>>> I agree that there should be multiple alternatives the > >> user(!) > >>>>> can > >>>>>>>>>>> choose from. Partial out-of-order processing works for > >>>> many/most > >>>>>>>>>>> aggregates. However, if you consider > >> Event-Pattern-Matching, > >>>>> global > >>>>>>>>>>> ordering in necessary (even if the performance penalty > >> might > >>>> be > >>>>>>>> high). > >>>>>>>>>>> > >>>>>>>>>>> I would also keep "system-time windows" as an alternative > >> to > >>>>>>> "source > >>>>>>>>>>> assigned ts-windows". > >>>>>>>>>>> > >>>>>>>>>>> It might also be interesting to consider the following > >> paper > >>>> for > >>>>>>>>>>> overlapping windows: "Resource sharing in continuous > >>>>> sliding-window > >>>>>>>>>>> aggregates" > >>>>>>>>>>> > >>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: > >>>>>>>>>>>> Hey > >>>>>>>>>>>> > >>>>>>>>>>>> I think we should not block PRs unnecessarily if your > >>>>> suggested > >>>>>>>>> changes > >>>>>>>>>>>> might touch them at some point. > >>>>>>>>>>>> > >>>>>>>>>>>> Also I still think we should not put everything in the > >>>>> Datastream > >>>>>>>>>> because > >>>>>>>>>>>> it will be a huge mess. > >>>>>>>>>>>> > >>>>>>>>>>>> Also we need to agree on the out of order processing, > >>>> whether > >>>>> we > >>>>>>>> want > >>>>>>>>>> it > >>>>>>>>>>>> the way you proposed it(which is quite costly). Another > >>>>>>> alternative > >>>>>>>>>>>> approach there which fits in the current windowing is to > >>>>> filter > >>>>>>> out > >>>>>>>>> if > >>>>>>>>>>>> order events and apply a special handling operator on > >> them. > >>>>> This > >>>>>>>>> would > >>>>>>>>>> be > >>>>>>>>>>>> fairly lightweight. > >>>>>>>>>>>> > >>>>>>>>>>>> My point is that we need to consider some alternative > >>>>> solutions. > >>>>>>>> And > >>>>>>>>> we > >>>>>>>>>>>> should not block contributions along the way. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers > >>>>>>>>>>>> Gyula > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > >>>>>>>>> [hidden email]> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> The reason I posted this now is that we need to think > >> about > >>>>> the > >>>>>>>> API > >>>>>>>>>> and > >>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor > >> (inverse > >>>>>>> reduce) > >>>>>>>>> and > >>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream). > >>>>>>>>>>>>> > >>>>>>>>>>>>> For the windowing, I think that the current model does > >> not > >>>>> work > >>>>>>>> for > >>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing > >>>>>>>>> infrastructure > >>>>>>>>>>> will > >>>>>>>>>>>>> basically have to be redone. Meaning also that any work > >> on > >>>>> the > >>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes > >>>>> useless. > >>>>>>>>>>>>> > >>>>>>>>>>>>> For the API, I proposed to restructure the interactions > >>>>> between > >>>>>>>> all > >>>>>>>>>> the > >>>>>>>>>>>>> different *DataStream classes and grouping/windowing. > >> (See > >>>>> API > >>>>>>>>> section > >>>>>>>>>>> of > >>>>>>>>>>>>> the doc I posted.) > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < > >>>> [hidden email] > >>>>>> > >>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Aljoscha, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the nice summary, this is a very good > >>>> initiative. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I added some comments to the respective sections > >> (where I > >>>>> didnt > >>>>>>>>> fully > >>>>>>>>>>>>> agree > >>>>>>>>>>>>>> :).). > >>>>>>>>>>>>>> At some point I think it would be good to have a public > >>>>> hangout > >>>>>>>>>> session > >>>>>>>>>>>>> on > >>>>>>>>>>>>>> this, which could make a more dynamic discussion. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>> Gyula > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Aljoscha Krettek <[hidden email]> ezt írta > >> (időpont: > >>>>>>> 2015. > >>>>>>>>> jún. > >>>>>>>>>>>>> 22., > >>>>>>>>>>>>>> H, 21:34): > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>> with people proposing changes to the streaming part I > >>>> also > >>>>>>>> wanted > >>>>>>>>> to > >>>>>>>>>>>>>> throw > >>>>>>>>>>>>>>> my hat into the ring. :D > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> During the last few months, while I was getting > >>>> acquainted > >>>>>>> with > >>>>>>>>> the > >>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had > >> about > >>>>> how > >>>>>>>>> things > >>>>>>>>>>>>> could > >>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent > >>>> shape > >>>>>>> now, > >>>>>>>>> so > >>>>>>>>>>>>>> please > >>>>>>>>>>>>>>> have a look if you are interested in this: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> This mostly covers: > >>>>>>>>>>>>>>> - Timestamps assigned at sources > >>>>>>>>>>>>>>> - Out-of-order processing of elements in window > >>>> operators > >>>>>>>>>>>>>>> - API design > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Please let me know what you think. Comment in the > >>>> document > >>>>> or > >>>>>>>> here > >>>>>>>>>> in > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> mailing list. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I have a PR in the makings that would introduce source > >>>>>>>> timestamps > >>>>>>>>>> and > >>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a > >>>>>>>>>> proof-of-concept > >>>>>>>>>>>>>> of a > >>>>>>>>>>>>>>> windowing system that is able to process out-of-order > >>>>> elements > >>>>>>>>>> using a > >>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient > >>>>>>>>>>>>> pre-aggregations.) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Aljoscha > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>> > >> > > > > |
Sure. I picked this up. Using the current model for "occurrence time
semantics" does not work. I elaborated on this in the past many times (but nobody cared). It is important to make it clear to the user what semantics are supported. Claiming to support "sliding windows" doesn't mean anything; there are too many different semantics out there. :) On 06/25/2015 03:35 PM, Aljoscha Krettek wrote: > Yes, I am aware of this requirement and it would also be supported in my > proposed model. > > The problem is, that the "custom timestamp" feature gives the impression > that the elements would be windowed according to a user-timestamp. The > results, however, are wrong because of the assumption about elements > arriving in order. (This is what I was trying to show with my fancy ASCII > art and result output. > > On Thu, 25 Jun 2015 at 15:26 Matthias J. Sax <[hidden email]> > wrote: > >> Hi Aljoscha, >> >> I like that you are pushing in this direction. However, IMHO you >> misinterpreter the current approach. It does not assume that tuples >> arrive in-order; the current approach has no notion about a >> pre-defined-order (for example, the order in which the event are >> created). There is only the notion of "arrival-order" at the operator. >> From this "arrival-order" perspective, the result are correct(!). >> >> Windowing in the current approach means for example, "sum up an >> attribute of all events you *received* in the last 5 seconds". That is a >> different meaning that "sum up an attribute of all event that *occurred* >> in the last 5 seconds". Both queries are valid and Flink should support >> both IMHO. >> >> >> -Matthias >> >> >> >> On 06/25/2015 03:03 PM, Aljoscha Krettek wrote: >>> Yes, now this also processes about 3 mio Elements (Window Size 5 sec, >> Slide >>> 1 sec) but it still fluctuates a lot between 1 mio. and 5 mio. >>> >>> Performance is not my main concern, however. My concern is that the >> current >>> model assumes elements to arrive in order, which is simply not true. >>> >>> In your code you have these lines for specifying the window: >>> .window(Time.of(1l, TimeUnit.SECONDS)) >>> .every(Time.of(1l, TimeUnit.SECONDS)) >>> >>> Although this semantically specifies a tumbling window of size 1 sec I'm >>> afraid it uses the sliding window logic internally (because of the >>> .every()). >>> >>> In my tests I only have the first line. >>> >>> >>> On Thu, 25 Jun 2015 at 14:32 Gábor Gévay <[hidden email]> wrote: >>> >>>> I'm very sorry, I had a bug in the InversePreReducer. It should be >>>> fixed now. Can you please run it again? >>>> >>>> I also tried to reproduce some of your performance numbers, but I'm >>>> getting only less than 1/10th of yours. For example, in the Tumbling >>>> case, Current/Reduce produces only ~100000 for me. Do you have any >>>> idea what I could be doing wrong? My code: >>>> http://pastebin.com/zbEjmGhk >>>> I am running it on a 2 GHz Core i7. >>>> >>>> Best regards, >>>> Gabor >>>> >>>> >>>> 2015-06-25 12:31 GMT+02:00 Aljoscha Krettek <[hidden email]>: >>>>> Hi, >>>>> I also ran the tests on top of PR 856 (inverse reducer) now. The >> results >>>>> seem incorrect. When I insert a Thread.sleep(1) in the tuple source, >> all >>>>> the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 >> sec) >>>>> (Theoretically there would be 5000 tuples in 5 seconds but this is due >> to >>>>> overhead). These are the results for the inverse reduce optimisation: >>>>> (Tuple 0,38) >>>>> (Tuple 0,829) >>>>> (Tuple 0,1625) >>>>> (Tuple 0,2424) >>>>> (Tuple 0,3190) >>>>> (Tuple 0,3198) >>>>> (Tuple 0,-339368) >>>>> (Tuple 0,-1315725) >>>>> (Tuple 0,-2932932) >>>>> (Tuple 0,-5082735) >>>>> (Tuple 0,-7743256) >>>>> (Tuple 0,75701046) >>>>> (Tuple 0,642829470) >>>>> (Tuple 0,2242018381) >>>>> (Tuple 0,5190708618) >>>>> (Tuple 0,10060360311) >>>>> (Tuple 0,-94254951) >>>>> (Tuple 0,-219806321293) >>>>> (Tuple 0,-1258895232699) >>>>> (Tuple 0,-4074432596329) >>>>> >>>>> One line is one emitted window count. This is what happens when I >> remove >>>>> the Thread.sleep(1): >>>>> (Tuple 0,660676) >>>>> (Tuple 0,2553733) >>>>> (Tuple 0,3542696) >>>>> (Tuple 0,1) >>>>> (Tuple 0,1107035) >>>>> (Tuple 0,2549491) >>>>> (Tuple 0,4100387) >>>>> (Tuple 0,-8406583360092) >>>>> (Tuple 0,-8406582150743) >>>>> (Tuple 0,-8406580427190) >>>>> (Tuple 0,-8406580427190) >>>>> (Tuple 0,-8406580427190) >>>>> (Tuple 0,6847279255682044995) >>>>> (Tuple 0,6847279255682044995) >>>>> (Tuple 0,-5390528042713628318) >>>>> (Tuple 0,-5390528042711551780) >>>>> (Tuple 0,-5390528042711551780) >>>>> >>>>> So at some point the pre-reducer seems to go haywire and does not >> recover >>>>> from it. The good thing is that it does produce results now, where the >>>>> previous Current/Reduce would simply hang and not produce any output. >>>>> >>>>> On Thu, 25 Jun 2015 at 12:02 Gábor Gévay <[hidden email]> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> Aljoscha, can you please try the performance test of Current/Reduce >>>>>> with the InversePreReducer in PR 856? (If you just call sum, it will >>>>>> use an InversePreReducer.) It would be an interesting test, because >>>>>> the inverse function optimization really depends on the stream being >>>>>> ordered, and I think it has the potential of being faster then >>>>>> Next/Reduce. Especially if the window size is much larger than the >>>>>> slide size. >>>>>> >>>>>> Best regards, >>>>>> Gabor >>>>>> >>>>>> >>>>>> 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek <[hidden email]>: >>>>>>> I think I'll have to elaborate a bit so I created a proof-of-concept >>>>>>> implementation of my Ideas and ran some throughput measurements to >>>>>>> alleviate concerns about performance. >>>>>>> >>>>>>> First, though, I want to highlight again why the current approach >> does >>>>>> not >>>>>>> work with out-of-order elements (which, again, occur constantly due >> to >>>>>> the >>>>>>> distributed nature of the system). This is the example I posted >>>> earlier: >>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27. The plan >> looks >>>>>> like >>>>>>> this: >>>>>>> >>>>>>> +--+ >>>>>>> | | Source >>>>>>> +--+ >>>>>>> | >>>>>>> +-----+ >>>>>>> | | >>>>>>> | +--+ >>>>>>> | | | Identity Map >>>>>>> | +--+ >>>>>>> | | >>>>>>> +-----+ >>>>>>> | >>>>>>> +--+ >>>>>>> | | Window >>>>>>> +--+ >>>>>>> | >>>>>>> | >>>>>>> +--+ >>>>>>> | | Sink >>>>>>> +--+ >>>>>>> >>>>>>> So all it does is pass the elements through an identity map and then >>>>>> merge >>>>>>> them again before the window operator. The source emits ascending >>>>>> integers >>>>>>> and the window operator has a custom timestamp extractor that uses >> the >>>>>>> integer itself as the timestamp and should create windows of size 4 >>>> (that >>>>>>> is elements with timestamp 0-3 are one window, the next are the >>>> elements >>>>>>> with timestamp 4-8, and so on). Since the topology basically doubles >>>> the >>>>>>> elements form the source I would expect to get these windows: >>>>>>> Window: 0, 0, 1, 1, 2, 2, 3, 3 >>>>>>> Window: 4, 4, 6, 6, 7, 7, 8, 8 >>>>>>> >>>>>>> The output is this, however: >>>>>>> Window: 0, 1, 2, 3, >>>>>>> Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, >>>>>>> Window: 8, 9, 10, 11, >>>>>>> Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, >>>>>>> Window: 16, 17, 18, 19, >>>>>>> Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, >>>>>>> Window: 24, 25, 26, 27, >>>>>>> Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >>>>>>> >>>>>>> The reason is that the elements simply arrive out-of-order. Imagine >>>> what >>>>>>> would happen if the elements actually arrived with some delay from >>>>>>> different operations. >>>>>>> >>>>>>> Now, on to the performance numbers. The proof-of-concept I created is >>>>>>> available here: >>>>>>> https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. >> The >>>>>> basic >>>>>>> idea is that sources assign the current timestamp when emitting >>>> elements. >>>>>>> They also periodically emit watermarks that tell us that no elements >>>> with >>>>>>> an earlier timestamp will be emitted. The watermarks propagate >> through >>>>>> the >>>>>>> operators. The window operator looks at the timestamp of an element >>>> and >>>>>>> puts it into the buffer that corresponds to that window. When the >>>> window >>>>>>> operator receives a watermark it will look at the in-flight windows >>>>>>> (basically the buffers) and emit those windows where the window-end >> is >>>>>>> before the watermark. >>>>>>> >>>>>>> For measuring throughput I did the following: The source emits tuples >>>> of >>>>>>> the form ("tuple", 1) in an infinite loop. The window operator sums >> up >>>>>> the >>>>>>> tuples, thereby counting how many tuples the window operator can >>>> handle >>>>>> in >>>>>>> a given time window. There are two different implementations for the >>>>>>> summation: 1) simply summing up the values in a mapWindow(), there >> you >>>>>> get >>>>>>> a List of all tuples and simple iterate over it. 2) using sum(1), >>>> which >>>>>> is >>>>>>> implemented as a reduce() (that uses the pre-reducer optimisations). >>>>>>> >>>>>>> These are the performance numbers (Current is the current >>>> implementation, >>>>>>> Next is my proof-of-concept): >>>>>>> >>>>>>> Tumbling (1 sec): >>>>>>> - Current/Map: 1.6 mio >>>>>>> - Current/Reduce: 2 mio >>>>>>> - Next/Map: 2.2 mio >>>>>>> - Next/Reduce: 4 mio >>>>>>> >>>>>>> Sliding (5 sec, slide 1 sec): >>>>>>> - Current/Map: ca 3 mio (fluctuates a lot) >>>>>>> - Current/Reduce: No output >>>>>>> - Next/Map: ca 4 mio (fluctuates) >>>>>>> - Next/Reduce: 10 mio >>>>>>> >>>>>>> The Next/Reduce variant can basically scale indefinitely with window >>>> size >>>>>>> because the internal state does not rely on the number of elements >>>> (it is >>>>>>> just the current sum). The pre-reducer for sliding elements cannot >>>> handle >>>>>>> the amount of tuples, it produces no output. For the two Map variants >>>> the >>>>>>> performance fluctuates because they always keep all the elements in >> an >>>>>>> internal buffer before emission, this seems to tax the garbage >>>> collector >>>>>> a >>>>>>> bit and leads to random pauses. >>>>>>> >>>>>>> One thing that should be noted is that I had to disable the >>>> fake-element >>>>>>> emission thread, otherwise the Current versions would deadlock. >>>>>>> >>>>>>> So, I started working on this because I thought that out-of-order >>>>>>> processing would be necessary for correctness. And it is certainly, >>>> But >>>>>> the >>>>>>> proof-of-concept also shows that performance can be greatly improved. >>>>>>> >>>>>>> On Wed, 24 Jun 2015 at 09:46 Gyula Fóra <[hidden email]> >> wrote: >>>>>>>> >>>>>>>> I agree lets separate these topics from each other so we can get >>>> faster >>>>>>>> resolution. >>>>>>>> >>>>>>>> There is already a state discussion in the thread we started with >>>> Paris. >>>>>>>> >>>>>>>> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas <[hidden email] >>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> I agree with supporting out-of-order out of the box :-), even if >>>> this >>>>>>> means >>>>>>>>> a major refactoring. This is the right time to refactor the >>>> streaming >>>>>>> API >>>>>>>>> before we pull it out of beta. I think that this is more important >>>>>> than >>>>>>> new >>>>>>>>> features in the streaming API, which can be prioritized once the >>>> API >>>>>> is >>>>>>> out >>>>>>>>> of beta (meaning, that IMO this is the right time to stall PRs >>>> until >>>>>> we >>>>>>>>> agree on the design). >>>>>>>>> >>>>>>>>> There are three sections in the document: windowing, state, and >>>> API. >>>>>> How >>>>>>>>> convoluted are those with each other? Can we separate the >>>> discussion >>>>>> or >>>>>>> do >>>>>>>>> we need to discuss those all together? I think part of the >>>> difficulty >>>>>> is >>>>>>>>> that we are discussing three design choices at once. >>>>>>>>> >>>>>>>>> On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning < >>>> [hidden email]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Out of order is ubiquitous in the real-world. Typically, what >>>>>>> happens is >>>>>>>>>> that businesses will declare a maximum allowable delay for >>>> delayed >>>>>>>>>> transactions and will commit to results when that delay is >>>> reached. >>>>>>>>>> Transactions that arrive later than this cutoff are collected >>>>>>> specially >>>>>>>>> as >>>>>>>>>> corrections which are reported/used when possible. >>>>>>>>>> >>>>>>>>>> Clearly, ordering can also be violated during processing, but if >>>> the >>>>>>> data >>>>>>>>>> is originally out of order the situation can't be repaired by any >>>>>>>>> protocol >>>>>>>>>> fixes that prevent transactions from becoming disordered but has >>>> to >>>>>>>>> handled >>>>>>>>>> at the data level. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < >>>>>> [hidden email] >>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I also don't like big changes but sometimes they are necessary. >>>>>> The >>>>>>>>>> reason >>>>>>>>>>> why I'm so adamant about out-of-order processing is that >>>>>>> out-of-order >>>>>>>>>>> elements are not some exception that occurs once in a while; >>>> they >>>>>>> occur >>>>>>>>>>> constantly in a distributed system. For example, in this: >>>>>>>>>>> https://gist.github.com/aljoscha/a367012646ab98208d27 the >>>>>> resulting >>>>>>>>>>> windows >>>>>>>>>>> are completely bogus because the current windowing system >>>> assumes >>>>>>>>>> elements >>>>>>>>>>> to globally arrive in order, which is simply not true. (The >>>>>> example >>>>>>>>> has a >>>>>>>>>>> source that generates increasing integers. Then these pass >>>>>> through a >>>>>>>>> map >>>>>>>>>>> and are unioned with the original DataStream before a window >>>>>>> operator.) >>>>>>>>>>> This simulates elements arriving from different operators at a >>>>>>>>> windowing >>>>>>>>>>> operator. The example is also DOP=1, I imagine this to get >>>> worse >>>>>>> with >>>>>>>>>>> higher DOP. >>>>>>>>>>> >>>>>>>>>>> What do you mean by costly? As I said, I have a >>>> proof-of-concept >>>>>>>>>> windowing >>>>>>>>>>> operator that can handle out-or-order elements. This is an >>>> example >>>>>>>>> using >>>>>>>>>>> the current Flink API: >>>>>>>>>>> https://gist.github.com/aljoscha/f8dce0691732e344bbe8. >>>>>>>>>>> (It is an infinite source of tuples and a 5 second window >>>> operator >>>>>>> that >>>>>>>>>>> counts the tuples.) The first problem is that this code >>>> deadlocks >>>>>>>>> because >>>>>>>>>>> of the thread that emits fake elements. If I disable the fake >>>>>>> element >>>>>>>>>> code >>>>>>>>>>> it works, but the throughput using my mockup is 4 times higher >>>> . >>>>>> The >>>>>>>>> gap >>>>>>>>>>> widens dramatically if the window size increases. >>>>>>>>>>> >>>>>>>>>>> So, it actually increases performance (unless I'm making a >>>> mistake >>>>>>> in >>>>>>>>> my >>>>>>>>>>> explorations) and can handle elements that arrive out-of-order >>>>>>> (which >>>>>>>>>>> happens basically always in a real-world windowing use-cases). >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, 23 Jun 2015 at 12:51 Stephan Ewen <[hidden email]> >>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> What I like a lot about Aljoscha's proposed design is that we >>>>>>> need no >>>>>>>>>>>> different code for "system time" vs. "event time". It only >>>>>>> differs in >>>>>>>>>>> where >>>>>>>>>>>> the timestamps are assigned. >>>>>>>>>>>> >>>>>>>>>>>> The OOP approach also gives you the semantics of total >>>> ordering >>>>>>>>> without >>>>>>>>>>>> imposing merges on the streams. >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < >>>>>>>>>>>> [hidden email]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I agree that there should be multiple alternatives the >>>> user(!) >>>>>>> can >>>>>>>>>>>>> choose from. Partial out-of-order processing works for >>>>>> many/most >>>>>>>>>>>>> aggregates. However, if you consider >>>> Event-Pattern-Matching, >>>>>>> global >>>>>>>>>>>>> ordering in necessary (even if the performance penalty >>>> might >>>>>> be >>>>>>>>>> high). >>>>>>>>>>>>> >>>>>>>>>>>>> I would also keep "system-time windows" as an alternative >>>> to >>>>>>>>> "source >>>>>>>>>>>>> assigned ts-windows". >>>>>>>>>>>>> >>>>>>>>>>>>> It might also be interesting to consider the following >>>> paper >>>>>> for >>>>>>>>>>>>> overlapping windows: "Resource sharing in continuous >>>>>>> sliding-window >>>>>>>>>>>>> aggregates" >>>>>>>>>>>>> >>>>>>>>>>>>>> https://dl.acm.org/citation.cfm?id=1316720 >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 06/23/2015 10:37 AM, Gyula Fóra wrote: >>>>>>>>>>>>>> Hey >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think we should not block PRs unnecessarily if your >>>>>>> suggested >>>>>>>>>>> changes >>>>>>>>>>>>>> might touch them at some point. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also I still think we should not put everything in the >>>>>>> Datastream >>>>>>>>>>>> because >>>>>>>>>>>>>> it will be a huge mess. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also we need to agree on the out of order processing, >>>>>> whether >>>>>>> we >>>>>>>>>> want >>>>>>>>>>>> it >>>>>>>>>>>>>> the way you proposed it(which is quite costly). Another >>>>>>>>> alternative >>>>>>>>>>>>>> approach there which fits in the current windowing is to >>>>>>> filter >>>>>>>>> out >>>>>>>>>>> if >>>>>>>>>>>>>> order events and apply a special handling operator on >>>> them. >>>>>>> This >>>>>>>>>>> would >>>>>>>>>>>> be >>>>>>>>>>>>>> fairly lightweight. >>>>>>>>>>>>>> >>>>>>>>>>>>>> My point is that we need to consider some alternative >>>>>>> solutions. >>>>>>>>>> And >>>>>>>>>>> we >>>>>>>>>>>>>> should not block contributions along the way. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < >>>>>>>>>>> [hidden email]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The reason I posted this now is that we need to think >>>> about >>>>>>> the >>>>>>>>>> API >>>>>>>>>>>> and >>>>>>>>>>>>>>> windowing before proceeding with the PRs of Gabor >>>> (inverse >>>>>>>>> reduce) >>>>>>>>>>> and >>>>>>>>>>>>>>> Gyula (removal of "aggregate" functions on DataStream). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For the windowing, I think that the current model does >>>> not >>>>>>> work >>>>>>>>>> for >>>>>>>>>>>>>>> out-of-order processing. Therefore, the whole windowing >>>>>>>>>>> infrastructure >>>>>>>>>>>>> will >>>>>>>>>>>>>>> basically have to be redone. Meaning also that any work >>>> on >>>>>>> the >>>>>>>>>>>>>>> pre-aggregators or optimizations that we do now becomes >>>>>>> useless. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For the API, I proposed to restructure the interactions >>>>>>> between >>>>>>>>>> all >>>>>>>>>>>> the >>>>>>>>>>>>>>> different *DataStream classes and grouping/windowing. >>>> (See >>>>>>> API >>>>>>>>>>> section >>>>>>>>>>>>> of >>>>>>>>>>>>>>> the doc I posted.) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, 22 Jun 2015 at 21:56 Gyula Fóra < >>>>>> [hidden email] >>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the nice summary, this is a very good >>>>>> initiative. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I added some comments to the respective sections >>>> (where I >>>>>>> didnt >>>>>>>>>>> fully >>>>>>>>>>>>>>> agree >>>>>>>>>>>>>>>> :).). >>>>>>>>>>>>>>>> At some point I think it would be good to have a public >>>>>>> hangout >>>>>>>>>>>> session >>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>> this, which could make a more dynamic discussion. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Aljoscha Krettek <[hidden email]> ezt írta >>>> (időpont: >>>>>>>>> 2015. >>>>>>>>>>> jún. >>>>>>>>>>>>>>> 22., >>>>>>>>>>>>>>>> H, 21:34): >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> with people proposing changes to the streaming part I >>>>>> also >>>>>>>>>> wanted >>>>>>>>>>> to >>>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>>>> my hat into the ring. :D >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> During the last few months, while I was getting >>>>>> acquainted >>>>>>>>> with >>>>>>>>>>> the >>>>>>>>>>>>>>>>> streaming system, I wrote down some thoughts I had >>>> about >>>>>>> how >>>>>>>>>>> things >>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>> be improved. Hopefully, they are in somewhat coherent >>>>>> shape >>>>>>>>> now, >>>>>>>>>>> so >>>>>>>>>>>>>>>> please >>>>>>>>>>>>>>>>> have a look if you are interested in this: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit?usp=sharing >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This mostly covers: >>>>>>>>>>>>>>>>> - Timestamps assigned at sources >>>>>>>>>>>>>>>>> - Out-of-order processing of elements in window >>>>>> operators >>>>>>>>>>>>>>>>> - API design >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Please let me know what you think. Comment in the >>>>>> document >>>>>>> or >>>>>>>>>> here >>>>>>>>>>>> in >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> mailing list. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have a PR in the makings that would introduce source >>>>>>>>>> timestamps >>>>>>>>>>>> and >>>>>>>>>>>>>>>>> watermarks for keeping track of them. I also hacked a >>>>>>>>>>>> proof-of-concept >>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>> windowing system that is able to process out-of-order >>>>>>> elements >>>>>>>>>>>> using a >>>>>>>>>>>>>>>>> FlatMap operator. (It uses panes to perform efficient >>>>>>>>>>>>>>> pre-aggregations.) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>> >>> >> >> > |
Free forum by Nabble | Edit this page |