As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere
Streams does: symmetric hash join. From [1]: "When a tuple is received on an input port, it is inserted into the window corresponding to the input port, which causes the window to trigger. As part of the trigger processing, the tuple is compared against all tuples inside the window of the opposing input port. If the tuples match, then an output tuple will be produced for each match. If at least one output was generated, a window punctuation will be generated after all the outputs." Cheers, Asterios [1] http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < [hidden email]> wrote: > Hi Paris, > > thanks for the pointer to the Naiad paper. That is quite interesting. > > The paper I mentioned [1], does not describe the semantics in detail; it > is more about the implementation for the stream-joins. However, it uses > the same semantics (from my understanding) as proposed by Gyula. > > -Matthias > > [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded > Streams". VLDB 2002. > > > > On 04/07/2015 12:38 PM, Paris Carbone wrote: > > Hello Matthias, > > > > Sure, ordering guarantees are indeed a tricky thing, I recall having > that discussion back in TU Berlin. Bear in mind thought that DataStream, > our abstract data type, represents a *partitioned* unbounded sequence of > events. There are no *global* ordering guarantees made whatsoever in that > model across partitions. If you see it more generally there are many “race > conditions” in a distributed execution graph of vertices that process > multiple inputs asynchronously, especially when you add joins and > iterations into the mix (how do you deal with reprocessing “old” tuples > that iterate in the graph). Btw have you checked the Naiad paper [1]? > Stephan cited a while ago and it is quite relevant to that discussion. > > > > Also, can you cite the paper with the joining semantics you are > referring to? That would be of good help I think. > > > > Paris > > > > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf > > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> > > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> > > On 07 Apr 2015, at 11:50, Matthias J. Sax <[hidden email] > <mailto:[hidden email]>> wrote: > > > > Hi @all, > > > > please keep me in the loop for this work. I am highly interested and I > > want to help on it. > > > > My initial thoughts are as follows: > > > > 1) Currently, system timestamps are used and the suggested approach can > > be seen as state-of-the-art (there is actually a research paper using > > the exact same join semantic). Of course, the current approach is > > inherently non-deterministic. The advantage is, that there is no > > overhead in keeping track of the order of records and the latency should > > be very low. (Additionally, state-recovery is simplified. Because, the > > processing in inherently non-deterministic, recovery can be done with > > relaxed guarantees). > > > > 2) The user should be able to "switch on" deterministic processing, > > ie, records are timestamped (either externally when generated, or > > timestamped at the sources). Because deterministic processing adds some > > overhead, the user should decide for it actively. > > In this case, the order must be preserved in each re-distribution step > > (merging is sufficient, if order is preserved within each incoming > > channel). Furthermore, deterministic processing can be achieved by sound > > window semantics (and there is a bunch of them). Even for > > single-stream-windows it's a tricky problem; for join-windows it's even > > harder. From my point of view, it is less important which semantics are > > chosen; however, the user must be aware how it works. The most tricky > > part for deterministic processing, is to deal with duplicate timestamps > > (which cannot be avoided). The timestamping for (intermediate) result > > tuples, is also an important question to be answered. > > > > > > -Matthias > > > > > > On 04/07/2015 11:37 AM, Gyula Fóra wrote: > > Hey, > > > > I agree with Kostas, if we define the exact semantics how this works, > this > > is not more ad-hoc than any other stateful operator with multiple inputs. > > (And I don't think any other system support something similar) > > > > We need to make some design choices that are similar to the issues we had > > for windowing. We need to chose how we want to evaluate the windowing > > policies (global or local) because that affects what kind of policies can > > be parallel, but I can work on these things. > > > > I think this is an amazing feature, so I wouldn't necessarily rush the > > implementation for 0.9 though. > > > > And thanks for helping writing these down. > > > > Gyula > > > > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email] > <mailto:[hidden email]>> wrote: > > > > Yes, we should write these semantics down. I volunteer to help. > > > > I don't think that this is very ad-hoc. The semantics are basically the > > following. Assuming an arriving element from the left side: > > (1) We find the right-side matches > > (2) We insert the left-side arrival into the left window > > (3) We recompute the left window > > We need to see whether right window re-computation needs to be triggered > as > > well. I think that this way of joining streams is also what the symmetric > > hash join algorithms were meant to support. > > > > Kostas > > > > > > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email]<mailto: > [hidden email]>> wrote: > > > > Is the approach of joining an element at a time from one input against a > > window on the other input not a bit arbitrary? > > > > This just joins whatever currently happens to be the window by the time > > the > > single element arrives - that is a bit non-predictable, right? > > > > As a more general point: The whole semantics of windowing and when they > > are > > triggered are a bit ad-hoc now. It would be really good to start > > formalizing that a bit and > > put it down somewhere. Users need to be able to clearly understand and > > how > > to predict the output. > > > > > > > > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email] > <mailto:[hidden email]>> > > wrote: > > > > I think it should be possible to make this compatible with the > > .window().every() calls. Maybe if there is some trigger set in "every" > > we > > would not join that stream 1 by 1 but every so many elements. The > > problem > > here is that the window and every in this case are very-very different > > than > > the normal windowing semantics. The window would define the join window > > for > > each element of the other stream while every would define how often I > > join > > This stream with the other one. > > > > We need to think to make this intuitive. > > > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < > > [hidden email]<mailto:[hidden email]>> > > wrote: > > > > That would be really neat, the problem I see there, that we do not > > distinguish between dataStream.window() and > > dataStream.window().every() > > currently, they both return WindowedDataStreams and TriggerPolicies > > of > > the > > every call do not make much sense in this setting (in fact > > practically > > the > > trigger is always set to count of one). > > > > But of course we could make it in a way, that we check that the > > eviction > > should be either null or count of 1, in every other case we throw an > > exception while building the JobGraph. > > > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < > > [hidden email]<mailto:[hidden email]>> > > wrote: > > > > Or you could define it like this: > > > > stream_A = a.window(...) > > stream_B = b.window(...) > > > > stream_A.join(stream_B).where().equals().with() > > > > So a join would just be a join of two WindowedDataStreamS. This > > would > > neatly move the windowing stuff into one place. > > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < > > [hidden email]<mailto:[hidden email]> > > > > wrote: > > Big +1 for the proposal for Peter and Gyula. I'm really for > > bringing > > the > > windowing and window join API in sync. > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]<mailto: > [hidden email]>> > > wrote: > > > > Hey guys, > > > > As Aljoscha has highlighted earlier the current window join > > semantics > > in > > the streaming api doesn't follow the changes in the windowing > > api. > > More > > precisely, we currently only support joins over time windows of > > equal > > size > > on both streams. The reason for this is that we now take a > > window > > of > > each > > of the two streams and do joins over these pairs. This would be > > a > > blocking > > operation if the windows are not closed at exactly the same time > > (and > > since > > we dont want this we only allow time windows) > > > > I talked with Peter who came up with the initial idea of an > > alternative > > approach for stream joins which works as follows: > > > > Instead of pairing windows for joins, we do element against > > window > > joins. > > What this means is that whenever we receive an element from one > > of > > the > > streams, we join this element with the current window(this > > window > > is > > constantly updated) of the other stream. This is non-blocking on > > any > > window > > definitions as we dont have to wait for windows to be completed > > and > > we > > can > > use this with any of our predefined policies like Time.of(...), > > Count.of(...), Delta.of(....). > > > > Additionally this also allows some very flexible way of defining > > window > > joins. With this we could also define grouped windowing inside > > if > > a > > join. > > An example of this would be: Join all elements of Stream1 with > > the > > last > > 5 > > elements by a given windowkey of Stream2 on some join key. > > > > This feature can be easily implemented over the current > > operators, > > so > > I > > already have a working prototype for the simple non-grouped > > case. > > My > > only > > concern is the API, the best thing I could come up with is > > something > > like > > this: > > > > stream_A.join(stream_B).onWindow(windowDefA, > > windowDefB).by(windowKey1, > > windowKey2).where(...).equalTo(...).with(...) > > > > (the user can omit the "by" and "with" calls) > > > > I think this new approach would be worthy of our "flexible > > windowing" > > in > > contrast with the current approach. > > > > Regards, > > Gyula > > > > > > > > > > > > > > > > > > > > |
Hey all,
We have spent some time with Asterios, Paris and Jonas to finalize the windowing semantics (both the current features and the window join), and I think we made very have come up with a very clear picture. We will write down the proposed semantics and publish it to the wiki next week. Cheers, Gyula On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos < [hidden email]> wrote: > As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere > Streams does: symmetric hash join. > > From [1]: > "When a tuple is received on an input port, it is inserted into the window > corresponding to the input port, which causes the window to trigger. As > part of the trigger processing, the tuple is compared against all tuples > inside the window of the opposing input port. If the tuples match, then an > output tuple will be produced for each match. If at least one output was > generated, a window punctuation will be generated after all the outputs." > > Cheers, > Asterios > > [1] > > http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html > > > > On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < > [hidden email]> wrote: > > > Hi Paris, > > > > thanks for the pointer to the Naiad paper. That is quite interesting. > > > > The paper I mentioned [1], does not describe the semantics in detail; it > > is more about the implementation for the stream-joins. However, it uses > > the same semantics (from my understanding) as proposed by Gyula. > > > > -Matthias > > > > [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded > > Streams". VLDB 2002. > > > > > > > > On 04/07/2015 12:38 PM, Paris Carbone wrote: > > > Hello Matthias, > > > > > > Sure, ordering guarantees are indeed a tricky thing, I recall having > > that discussion back in TU Berlin. Bear in mind thought that DataStream, > > our abstract data type, represents a *partitioned* unbounded sequence of > > events. There are no *global* ordering guarantees made whatsoever in that > > model across partitions. If you see it more generally there are many > “race > > conditions” in a distributed execution graph of vertices that process > > multiple inputs asynchronously, especially when you add joins and > > iterations into the mix (how do you deal with reprocessing “old” tuples > > that iterate in the graph). Btw have you checked the Naiad paper [1]? > > Stephan cited a while ago and it is quite relevant to that discussion. > > > > > > Also, can you cite the paper with the joining semantics you are > > referring to? That would be of good help I think. > > > > > > Paris > > > > > > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf > > > > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> > > > > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> > > > On 07 Apr 2015, at 11:50, Matthias J. Sax < > [hidden email] > > <mailto:[hidden email]>> wrote: > > > > > > Hi @all, > > > > > > please keep me in the loop for this work. I am highly interested and I > > > want to help on it. > > > > > > My initial thoughts are as follows: > > > > > > 1) Currently, system timestamps are used and the suggested approach can > > > be seen as state-of-the-art (there is actually a research paper using > > > the exact same join semantic). Of course, the current approach is > > > inherently non-deterministic. The advantage is, that there is no > > > overhead in keeping track of the order of records and the latency > should > > > be very low. (Additionally, state-recovery is simplified. Because, the > > > processing in inherently non-deterministic, recovery can be done with > > > relaxed guarantees). > > > > > > 2) The user should be able to "switch on" deterministic processing, > > > ie, records are timestamped (either externally when generated, or > > > timestamped at the sources). Because deterministic processing adds some > > > overhead, the user should decide for it actively. > > > In this case, the order must be preserved in each re-distribution step > > > (merging is sufficient, if order is preserved within each incoming > > > channel). Furthermore, deterministic processing can be achieved by > sound > > > window semantics (and there is a bunch of them). Even for > > > single-stream-windows it's a tricky problem; for join-windows it's even > > > harder. From my point of view, it is less important which semantics are > > > chosen; however, the user must be aware how it works. The most tricky > > > part for deterministic processing, is to deal with duplicate timestamps > > > (which cannot be avoided). The timestamping for (intermediate) result > > > tuples, is also an important question to be answered. > > > > > > > > > -Matthias > > > > > > > > > On 04/07/2015 11:37 AM, Gyula Fóra wrote: > > > Hey, > > > > > > I agree with Kostas, if we define the exact semantics how this works, > > this > > > is not more ad-hoc than any other stateful operator with multiple > inputs. > > > (And I don't think any other system support something similar) > > > > > > We need to make some design choices that are similar to the issues we > had > > > for windowing. We need to chose how we want to evaluate the windowing > > > policies (global or local) because that affects what kind of policies > can > > > be parallel, but I can work on these things. > > > > > > I think this is an amazing feature, so I wouldn't necessarily rush the > > > implementation for 0.9 though. > > > > > > And thanks for helping writing these down. > > > > > > Gyula > > > > > > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email] > > <mailto:[hidden email]>> wrote: > > > > > > Yes, we should write these semantics down. I volunteer to help. > > > > > > I don't think that this is very ad-hoc. The semantics are basically the > > > following. Assuming an arriving element from the left side: > > > (1) We find the right-side matches > > > (2) We insert the left-side arrival into the left window > > > (3) We recompute the left window > > > We need to see whether right window re-computation needs to be > triggered > > as > > > well. I think that this way of joining streams is also what the > symmetric > > > hash join algorithms were meant to support. > > > > > > Kostas > > > > > > > > > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email] > <mailto: > > [hidden email]>> wrote: > > > > > > Is the approach of joining an element at a time from one input against > a > > > window on the other input not a bit arbitrary? > > > > > > This just joins whatever currently happens to be the window by the time > > > the > > > single element arrives - that is a bit non-predictable, right? > > > > > > As a more general point: The whole semantics of windowing and when they > > > are > > > triggered are a bit ad-hoc now. It would be really good to start > > > formalizing that a bit and > > > put it down somewhere. Users need to be able to clearly understand and > > > how > > > to predict the output. > > > > > > > > > > > > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email] > > <mailto:[hidden email]>> > > > wrote: > > > > > > I think it should be possible to make this compatible with the > > > .window().every() calls. Maybe if there is some trigger set in "every" > > > we > > > would not join that stream 1 by 1 but every so many elements. The > > > problem > > > here is that the window and every in this case are very-very different > > > than > > > the normal windowing semantics. The window would define the join window > > > for > > > each element of the other stream while every would define how often I > > > join > > > This stream with the other one. > > > > > > We need to think to make this intuitive. > > > > > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < > > > [hidden email]<mailto:[hidden email]>> > > > wrote: > > > > > > That would be really neat, the problem I see there, that we do not > > > distinguish between dataStream.window() and > > > dataStream.window().every() > > > currently, they both return WindowedDataStreams and TriggerPolicies > > > of > > > the > > > every call do not make much sense in this setting (in fact > > > practically > > > the > > > trigger is always set to count of one). > > > > > > But of course we could make it in a way, that we check that the > > > eviction > > > should be either null or count of 1, in every other case we throw an > > > exception while building the JobGraph. > > > > > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < > > > [hidden email]<mailto:[hidden email]>> > > > wrote: > > > > > > Or you could define it like this: > > > > > > stream_A = a.window(...) > > > stream_B = b.window(...) > > > > > > stream_A.join(stream_B).where().equals().with() > > > > > > So a join would just be a join of two WindowedDataStreamS. This > > > would > > > neatly move the windowing stuff into one place. > > > > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < > > > [hidden email]<mailto:[hidden email]> > > > > > > wrote: > > > Big +1 for the proposal for Peter and Gyula. I'm really for > > > bringing > > > the > > > windowing and window join API in sync. > > > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]<mailto: > > [hidden email]>> > > > wrote: > > > > > > Hey guys, > > > > > > As Aljoscha has highlighted earlier the current window join > > > semantics > > > in > > > the streaming api doesn't follow the changes in the windowing > > > api. > > > More > > > precisely, we currently only support joins over time windows of > > > equal > > > size > > > on both streams. The reason for this is that we now take a > > > window > > > of > > > each > > > of the two streams and do joins over these pairs. This would be > > > a > > > blocking > > > operation if the windows are not closed at exactly the same time > > > (and > > > since > > > we dont want this we only allow time windows) > > > > > > I talked with Peter who came up with the initial idea of an > > > alternative > > > approach for stream joins which works as follows: > > > > > > Instead of pairing windows for joins, we do element against > > > window > > > joins. > > > What this means is that whenever we receive an element from one > > > of > > > the > > > streams, we join this element with the current window(this > > > window > > > is > > > constantly updated) of the other stream. This is non-blocking on > > > any > > > window > > > definitions as we dont have to wait for windows to be completed > > > and > > > we > > > can > > > use this with any of our predefined policies like Time.of(...), > > > Count.of(...), Delta.of(....). > > > > > > Additionally this also allows some very flexible way of defining > > > window > > > joins. With this we could also define grouped windowing inside > > > if > > > a > > > join. > > > An example of this would be: Join all elements of Stream1 with > > > the > > > last > > > 5 > > > elements by a given windowkey of Stream2 on some join key. > > > > > > This feature can be easily implemented over the current > > > operators, > > > so > > > I > > > already have a working prototype for the simple non-grouped > > > case. > > > My > > > only > > > concern is the API, the best thing I could come up with is > > > something > > > like > > > this: > > > > > > stream_A.join(stream_B).onWindow(windowDefA, > > > windowDefB).by(windowKey1, > > > windowKey2).where(...).equalTo(...).with(...) > > > > > > (the user can omit the "by" and "with" calls) > > > > > > I think this new approach would be worthy of our "flexible > > > windowing" > > > in > > > contrast with the current approach. > > > > > > Regards, > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Perfect! I am eager to see what you came up with!
On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <[hidden email]> wrote: > Hey all, > > We have spent some time with Asterios, Paris and Jonas to finalize the > windowing semantics (both the current features and the window join), and I > think we made very have come up with a very clear picture. > > We will write down the proposed semantics and publish it to the wiki next > week. > > Cheers, > Gyula > > On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos < > [hidden email]> wrote: > > > As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere > > Streams does: symmetric hash join. > > > > From [1]: > > "When a tuple is received on an input port, it is inserted into the > window > > corresponding to the input port, which causes the window to trigger. As > > part of the trigger processing, the tuple is compared against all tuples > > inside the window of the opposing input port. If the tuples match, then > an > > output tuple will be produced for each match. If at least one output was > > generated, a window punctuation will be generated after all the outputs." > > > > Cheers, > > Asterios > > > > [1] > > > > > http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html > > > > > > > > On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < > > [hidden email]> wrote: > > > > > Hi Paris, > > > > > > thanks for the pointer to the Naiad paper. That is quite interesting. > > > > > > The paper I mentioned [1], does not describe the semantics in detail; > it > > > is more about the implementation for the stream-joins. However, it uses > > > the same semantics (from my understanding) as proposed by Gyula. > > > > > > -Matthias > > > > > > [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded > > > Streams". VLDB 2002. > > > > > > > > > > > > On 04/07/2015 12:38 PM, Paris Carbone wrote: > > > > Hello Matthias, > > > > > > > > Sure, ordering guarantees are indeed a tricky thing, I recall having > > > that discussion back in TU Berlin. Bear in mind thought that > DataStream, > > > our abstract data type, represents a *partitioned* unbounded sequence > of > > > events. There are no *global* ordering guarantees made whatsoever in > that > > > model across partitions. If you see it more generally there are many > > “race > > > conditions” in a distributed execution graph of vertices that process > > > multiple inputs asynchronously, especially when you add joins and > > > iterations into the mix (how do you deal with reprocessing “old” tuples > > > that iterate in the graph). Btw have you checked the Naiad paper [1]? > > > Stephan cited a while ago and it is quite relevant to that discussion. > > > > > > > > Also, can you cite the paper with the joining semantics you are > > > referring to? That would be of good help I think. > > > > > > > > Paris > > > > > > > > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf > > > > > > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> > > > > > > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> > > > > On 07 Apr 2015, at 11:50, Matthias J. Sax < > > [hidden email] > > > <mailto:[hidden email]>> wrote: > > > > > > > > Hi @all, > > > > > > > > please keep me in the loop for this work. I am highly interested and > I > > > > want to help on it. > > > > > > > > My initial thoughts are as follows: > > > > > > > > 1) Currently, system timestamps are used and the suggested approach > can > > > > be seen as state-of-the-art (there is actually a research paper using > > > > the exact same join semantic). Of course, the current approach is > > > > inherently non-deterministic. The advantage is, that there is no > > > > overhead in keeping track of the order of records and the latency > > should > > > > be very low. (Additionally, state-recovery is simplified. Because, > the > > > > processing in inherently non-deterministic, recovery can be done with > > > > relaxed guarantees). > > > > > > > > 2) The user should be able to "switch on" deterministic processing, > > > > ie, records are timestamped (either externally when generated, or > > > > timestamped at the sources). Because deterministic processing adds > some > > > > overhead, the user should decide for it actively. > > > > In this case, the order must be preserved in each re-distribution > step > > > > (merging is sufficient, if order is preserved within each incoming > > > > channel). Furthermore, deterministic processing can be achieved by > > sound > > > > window semantics (and there is a bunch of them). Even for > > > > single-stream-windows it's a tricky problem; for join-windows it's > even > > > > harder. From my point of view, it is less important which semantics > are > > > > chosen; however, the user must be aware how it works. The most tricky > > > > part for deterministic processing, is to deal with duplicate > timestamps > > > > (which cannot be avoided). The timestamping for (intermediate) result > > > > tuples, is also an important question to be answered. > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 04/07/2015 11:37 AM, Gyula Fóra wrote: > > > > Hey, > > > > > > > > I agree with Kostas, if we define the exact semantics how this works, > > > this > > > > is not more ad-hoc than any other stateful operator with multiple > > inputs. > > > > (And I don't think any other system support something similar) > > > > > > > > We need to make some design choices that are similar to the issues we > > had > > > > for windowing. We need to chose how we want to evaluate the windowing > > > > policies (global or local) because that affects what kind of policies > > can > > > > be parallel, but I can work on these things. > > > > > > > > I think this is an amazing feature, so I wouldn't necessarily rush > the > > > > implementation for 0.9 though. > > > > > > > > And thanks for helping writing these down. > > > > > > > > Gyula > > > > > > > > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email] > > > <mailto:[hidden email]>> wrote: > > > > > > > > Yes, we should write these semantics down. I volunteer to help. > > > > > > > > I don't think that this is very ad-hoc. The semantics are basically > the > > > > following. Assuming an arriving element from the left side: > > > > (1) We find the right-side matches > > > > (2) We insert the left-side arrival into the left window > > > > (3) We recompute the left window > > > > We need to see whether right window re-computation needs to be > > triggered > > > as > > > > well. I think that this way of joining streams is also what the > > symmetric > > > > hash join algorithms were meant to support. > > > > > > > > Kostas > > > > > > > > > > > > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email] > > <mailto: > > > [hidden email]>> wrote: > > > > > > > > Is the approach of joining an element at a time from one input > against > > a > > > > window on the other input not a bit arbitrary? > > > > > > > > This just joins whatever currently happens to be the window by the > time > > > > the > > > > single element arrives - that is a bit non-predictable, right? > > > > > > > > As a more general point: The whole semantics of windowing and when > they > > > > are > > > > triggered are a bit ad-hoc now. It would be really good to start > > > > formalizing that a bit and > > > > put it down somewhere. Users need to be able to clearly understand > and > > > > how > > > > to predict the output. > > > > > > > > > > > > > > > > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email] > > > <mailto:[hidden email]>> > > > > wrote: > > > > > > > > I think it should be possible to make this compatible with the > > > > .window().every() calls. Maybe if there is some trigger set in > "every" > > > > we > > > > would not join that stream 1 by 1 but every so many elements. The > > > > problem > > > > here is that the window and every in this case are very-very > different > > > > than > > > > the normal windowing semantics. The window would define the join > window > > > > for > > > > each element of the other stream while every would define how often I > > > > join > > > > This stream with the other one. > > > > > > > > We need to think to make this intuitive. > > > > > > > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < > > > > [hidden email]<mailto:[hidden email]>> > > > > wrote: > > > > > > > > That would be really neat, the problem I see there, that we do not > > > > distinguish between dataStream.window() and > > > > dataStream.window().every() > > > > currently, they both return WindowedDataStreams and TriggerPolicies > > > > of > > > > the > > > > every call do not make much sense in this setting (in fact > > > > practically > > > > the > > > > trigger is always set to count of one). > > > > > > > > But of course we could make it in a way, that we check that the > > > > eviction > > > > should be either null or count of 1, in every other case we throw an > > > > exception while building the JobGraph. > > > > > > > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < > > > > [hidden email]<mailto:[hidden email]>> > > > > wrote: > > > > > > > > Or you could define it like this: > > > > > > > > stream_A = a.window(...) > > > > stream_B = b.window(...) > > > > > > > > stream_A.join(stream_B).where().equals().with() > > > > > > > > So a join would just be a join of two WindowedDataStreamS. This > > > > would > > > > neatly move the windowing stuff into one place. > > > > > > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < > > > > [hidden email]<mailto:[hidden email]> > > > > > > > > wrote: > > > > Big +1 for the proposal for Peter and Gyula. I'm really for > > > > bringing > > > > the > > > > windowing and window join API in sync. > > > > > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email] > <mailto: > > > [hidden email]>> > > > > wrote: > > > > > > > > Hey guys, > > > > > > > > As Aljoscha has highlighted earlier the current window join > > > > semantics > > > > in > > > > the streaming api doesn't follow the changes in the windowing > > > > api. > > > > More > > > > precisely, we currently only support joins over time windows of > > > > equal > > > > size > > > > on both streams. The reason for this is that we now take a > > > > window > > > > of > > > > each > > > > of the two streams and do joins over these pairs. This would be > > > > a > > > > blocking > > > > operation if the windows are not closed at exactly the same time > > > > (and > > > > since > > > > we dont want this we only allow time windows) > > > > > > > > I talked with Peter who came up with the initial idea of an > > > > alternative > > > > approach for stream joins which works as follows: > > > > > > > > Instead of pairing windows for joins, we do element against > > > > window > > > > joins. > > > > What this means is that whenever we receive an element from one > > > > of > > > > the > > > > streams, we join this element with the current window(this > > > > window > > > > is > > > > constantly updated) of the other stream. This is non-blocking on > > > > any > > > > window > > > > definitions as we dont have to wait for windows to be completed > > > > and > > > > we > > > > can > > > > use this with any of our predefined policies like Time.of(...), > > > > Count.of(...), Delta.of(....). > > > > > > > > Additionally this also allows some very flexible way of defining > > > > window > > > > joins. With this we could also define grouped windowing inside > > > > if > > > > a > > > > join. > > > > An example of this would be: Join all elements of Stream1 with > > > > the > > > > last > > > > 5 > > > > elements by a given windowkey of Stream2 on some join key. > > > > > > > > This feature can be easily implemented over the current > > > > operators, > > > > so > > > > I > > > > already have a working prototype for the simple non-grouped > > > > case. > > > > My > > > > only > > > > concern is the API, the best thing I could come up with is > > > > something > > > > like > > > > this: > > > > > > > > stream_A.join(stream_B).onWindow(windowDefA, > > > > windowDefB).by(windowKey1, > > > > windowKey2).where(...).equalTo(...).with(...) > > > > > > > > (the user can omit the "by" and "with" calls) > > > > > > > > I think this new approach would be worthy of our "flexible > > > > windowing" > > > > in > > > > contrast with the current approach. > > > > > > > > Regards, > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Did anyone read these:
https://cloud.google.com/dataflow/model/windowing, https://cloud.google.com/dataflow/model/triggers ? The semantics seem very straightforward and I'm sure the google guys spent some time thinking this through. :D On Mon, Apr 20, 2015 at 3:43 PM, Stephan Ewen <[hidden email]> wrote: > Perfect! I am eager to see what you came up with! > > On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <[hidden email]> wrote: > >> Hey all, >> >> We have spent some time with Asterios, Paris and Jonas to finalize the >> windowing semantics (both the current features and the window join), and I >> think we made very have come up with a very clear picture. >> >> We will write down the proposed semantics and publish it to the wiki next >> week. >> >> Cheers, >> Gyula >> >> On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos < >> [hidden email]> wrote: >> >> > As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere >> > Streams does: symmetric hash join. >> > >> > From [1]: >> > "When a tuple is received on an input port, it is inserted into the >> window >> > corresponding to the input port, which causes the window to trigger. As >> > part of the trigger processing, the tuple is compared against all tuples >> > inside the window of the opposing input port. If the tuples match, then >> an >> > output tuple will be produced for each match. If at least one output was >> > generated, a window punctuation will be generated after all the outputs." >> > >> > Cheers, >> > Asterios >> > >> > [1] >> > >> > >> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html >> > >> > >> > >> > On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < >> > [hidden email]> wrote: >> > >> > > Hi Paris, >> > > >> > > thanks for the pointer to the Naiad paper. That is quite interesting. >> > > >> > > The paper I mentioned [1], does not describe the semantics in detail; >> it >> > > is more about the implementation for the stream-joins. However, it uses >> > > the same semantics (from my understanding) as proposed by Gyula. >> > > >> > > -Matthias >> > > >> > > [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded >> > > Streams". VLDB 2002. >> > > >> > > >> > > >> > > On 04/07/2015 12:38 PM, Paris Carbone wrote: >> > > > Hello Matthias, >> > > > >> > > > Sure, ordering guarantees are indeed a tricky thing, I recall having >> > > that discussion back in TU Berlin. Bear in mind thought that >> DataStream, >> > > our abstract data type, represents a *partitioned* unbounded sequence >> of >> > > events. There are no *global* ordering guarantees made whatsoever in >> that >> > > model across partitions. If you see it more generally there are many >> > “race >> > > conditions” in a distributed execution graph of vertices that process >> > > multiple inputs asynchronously, especially when you add joins and >> > > iterations into the mix (how do you deal with reprocessing “old” tuples >> > > that iterate in the graph). Btw have you checked the Naiad paper [1]? >> > > Stephan cited a while ago and it is quite relevant to that discussion. >> > > > >> > > > Also, can you cite the paper with the joining semantics you are >> > > referring to? That would be of good help I think. >> > > > >> > > > Paris >> > > > >> > > > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >> > > > >> > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >> > > > >> > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >> > > > On 07 Apr 2015, at 11:50, Matthias J. Sax < >> > [hidden email] >> > > <mailto:[hidden email]>> wrote: >> > > > >> > > > Hi @all, >> > > > >> > > > please keep me in the loop for this work. I am highly interested and >> I >> > > > want to help on it. >> > > > >> > > > My initial thoughts are as follows: >> > > > >> > > > 1) Currently, system timestamps are used and the suggested approach >> can >> > > > be seen as state-of-the-art (there is actually a research paper using >> > > > the exact same join semantic). Of course, the current approach is >> > > > inherently non-deterministic. The advantage is, that there is no >> > > > overhead in keeping track of the order of records and the latency >> > should >> > > > be very low. (Additionally, state-recovery is simplified. Because, >> the >> > > > processing in inherently non-deterministic, recovery can be done with >> > > > relaxed guarantees). >> > > > >> > > > 2) The user should be able to "switch on" deterministic processing, >> > > > ie, records are timestamped (either externally when generated, or >> > > > timestamped at the sources). Because deterministic processing adds >> some >> > > > overhead, the user should decide for it actively. >> > > > In this case, the order must be preserved in each re-distribution >> step >> > > > (merging is sufficient, if order is preserved within each incoming >> > > > channel). Furthermore, deterministic processing can be achieved by >> > sound >> > > > window semantics (and there is a bunch of them). Even for >> > > > single-stream-windows it's a tricky problem; for join-windows it's >> even >> > > > harder. From my point of view, it is less important which semantics >> are >> > > > chosen; however, the user must be aware how it works. The most tricky >> > > > part for deterministic processing, is to deal with duplicate >> timestamps >> > > > (which cannot be avoided). The timestamping for (intermediate) result >> > > > tuples, is also an important question to be answered. >> > > > >> > > > >> > > > -Matthias >> > > > >> > > > >> > > > On 04/07/2015 11:37 AM, Gyula Fóra wrote: >> > > > Hey, >> > > > >> > > > I agree with Kostas, if we define the exact semantics how this works, >> > > this >> > > > is not more ad-hoc than any other stateful operator with multiple >> > inputs. >> > > > (And I don't think any other system support something similar) >> > > > >> > > > We need to make some design choices that are similar to the issues we >> > had >> > > > for windowing. We need to chose how we want to evaluate the windowing >> > > > policies (global or local) because that affects what kind of policies >> > can >> > > > be parallel, but I can work on these things. >> > > > >> > > > I think this is an amazing feature, so I wouldn't necessarily rush >> the >> > > > implementation for 0.9 though. >> > > > >> > > > And thanks for helping writing these down. >> > > > >> > > > Gyula >> > > > >> > > > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email] >> > > <mailto:[hidden email]>> wrote: >> > > > >> > > > Yes, we should write these semantics down. I volunteer to help. >> > > > >> > > > I don't think that this is very ad-hoc. The semantics are basically >> the >> > > > following. Assuming an arriving element from the left side: >> > > > (1) We find the right-side matches >> > > > (2) We insert the left-side arrival into the left window >> > > > (3) We recompute the left window >> > > > We need to see whether right window re-computation needs to be >> > triggered >> > > as >> > > > well. I think that this way of joining streams is also what the >> > symmetric >> > > > hash join algorithms were meant to support. >> > > > >> > > > Kostas >> > > > >> > > > >> > > > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email] >> > <mailto: >> > > [hidden email]>> wrote: >> > > > >> > > > Is the approach of joining an element at a time from one input >> against >> > a >> > > > window on the other input not a bit arbitrary? >> > > > >> > > > This just joins whatever currently happens to be the window by the >> time >> > > > the >> > > > single element arrives - that is a bit non-predictable, right? >> > > > >> > > > As a more general point: The whole semantics of windowing and when >> they >> > > > are >> > > > triggered are a bit ad-hoc now. It would be really good to start >> > > > formalizing that a bit and >> > > > put it down somewhere. Users need to be able to clearly understand >> and >> > > > how >> > > > to predict the output. >> > > > >> > > > >> > > > >> > > > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email] >> > > <mailto:[hidden email]>> >> > > > wrote: >> > > > >> > > > I think it should be possible to make this compatible with the >> > > > .window().every() calls. Maybe if there is some trigger set in >> "every" >> > > > we >> > > > would not join that stream 1 by 1 but every so many elements. The >> > > > problem >> > > > here is that the window and every in this case are very-very >> different >> > > > than >> > > > the normal windowing semantics. The window would define the join >> window >> > > > for >> > > > each element of the other stream while every would define how often I >> > > > join >> > > > This stream with the other one. >> > > > >> > > > We need to think to make this intuitive. >> > > > >> > > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >> > > > [hidden email]<mailto:[hidden email]>> >> > > > wrote: >> > > > >> > > > That would be really neat, the problem I see there, that we do not >> > > > distinguish between dataStream.window() and >> > > > dataStream.window().every() >> > > > currently, they both return WindowedDataStreams and TriggerPolicies >> > > > of >> > > > the >> > > > every call do not make much sense in this setting (in fact >> > > > practically >> > > > the >> > > > trigger is always set to count of one). >> > > > >> > > > But of course we could make it in a way, that we check that the >> > > > eviction >> > > > should be either null or count of 1, in every other case we throw an >> > > > exception while building the JobGraph. >> > > > >> > > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >> > > > [hidden email]<mailto:[hidden email]>> >> > > > wrote: >> > > > >> > > > Or you could define it like this: >> > > > >> > > > stream_A = a.window(...) >> > > > stream_B = b.window(...) >> > > > >> > > > stream_A.join(stream_B).where().equals().with() >> > > > >> > > > So a join would just be a join of two WindowedDataStreamS. This >> > > > would >> > > > neatly move the windowing stuff into one place. >> > > > >> > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >> > > > [hidden email]<mailto:[hidden email]> >> > > > >> > > > wrote: >> > > > Big +1 for the proposal for Peter and Gyula. I'm really for >> > > > bringing >> > > > the >> > > > windowing and window join API in sync. >> > > > >> > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email] >> <mailto: >> > > [hidden email]>> >> > > > wrote: >> > > > >> > > > Hey guys, >> > > > >> > > > As Aljoscha has highlighted earlier the current window join >> > > > semantics >> > > > in >> > > > the streaming api doesn't follow the changes in the windowing >> > > > api. >> > > > More >> > > > precisely, we currently only support joins over time windows of >> > > > equal >> > > > size >> > > > on both streams. The reason for this is that we now take a >> > > > window >> > > > of >> > > > each >> > > > of the two streams and do joins over these pairs. This would be >> > > > a >> > > > blocking >> > > > operation if the windows are not closed at exactly the same time >> > > > (and >> > > > since >> > > > we dont want this we only allow time windows) >> > > > >> > > > I talked with Peter who came up with the initial idea of an >> > > > alternative >> > > > approach for stream joins which works as follows: >> > > > >> > > > Instead of pairing windows for joins, we do element against >> > > > window >> > > > joins. >> > > > What this means is that whenever we receive an element from one >> > > > of >> > > > the >> > > > streams, we join this element with the current window(this >> > > > window >> > > > is >> > > > constantly updated) of the other stream. This is non-blocking on >> > > > any >> > > > window >> > > > definitions as we dont have to wait for windows to be completed >> > > > and >> > > > we >> > > > can >> > > > use this with any of our predefined policies like Time.of(...), >> > > > Count.of(...), Delta.of(....). >> > > > >> > > > Additionally this also allows some very flexible way of defining >> > > > window >> > > > joins. With this we could also define grouped windowing inside >> > > > if >> > > > a >> > > > join. >> > > > An example of this would be: Join all elements of Stream1 with >> > > > the >> > > > last >> > > > 5 >> > > > elements by a given windowkey of Stream2 on some join key. >> > > > >> > > > This feature can be easily implemented over the current >> > > > operators, >> > > > so >> > > > I >> > > > already have a working prototype for the simple non-grouped >> > > > case. >> > > > My >> > > > only >> > > > concern is the API, the best thing I could come up with is >> > > > something >> > > > like >> > > > this: >> > > > >> > > > stream_A.join(stream_B).onWindow(windowDefA, >> > > > windowDefB).by(windowKey1, >> > > > windowKey2).where(...).equalTo(...).with(...) >> > > > >> > > > (the user can omit the "by" and "with" calls) >> > > > >> > > > I think this new approach would be worthy of our "flexible >> > > > windowing" >> > > > in >> > > > contrast with the current approach. >> > > > >> > > > Regards, >> > > > Gyula >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > >> > > >> > >> |
Interesting read. Thanks for the pointer.
Take home message (in my understanding): - they support wall-clock, attribute-ts, and count windows -> default is attribute-ts (and not wall-clock as in Flink) -> it is not specified, if a global order is applied to windows, but I doubt it, because of their Watermark approach - they allow the user to assign timestamps for attribute-ts windows - they deal with out-of-order data (-> not sure what the last sentence means exactly: "...causing the late elements to be emitted as they arrive." ?) - their "Watermark" approach might yield high latencies However, they don't talk about joins... :( On 04/24/2015 02:25 PM, Aljoscha Krettek wrote: > Did anyone read these: > https://cloud.google.com/dataflow/model/windowing, > https://cloud.google.com/dataflow/model/triggers ? > > The semantics seem very straightforward and I'm sure the google guys > spent some time thinking this through. :D > > On Mon, Apr 20, 2015 at 3:43 PM, Stephan Ewen <[hidden email]> wrote: >> Perfect! I am eager to see what you came up with! >> >> On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <[hidden email]> wrote: >> >>> Hey all, >>> >>> We have spent some time with Asterios, Paris and Jonas to finalize the >>> windowing semantics (both the current features and the window join), and I >>> think we made very have come up with a very clear picture. >>> >>> We will write down the proposed semantics and publish it to the wiki next >>> week. >>> >>> Cheers, >>> Gyula >>> >>> On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos < >>> [hidden email]> wrote: >>> >>>> As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere >>>> Streams does: symmetric hash join. >>>> >>>> From [1]: >>>> "When a tuple is received on an input port, it is inserted into the >>> window >>>> corresponding to the input port, which causes the window to trigger. As >>>> part of the trigger processing, the tuple is compared against all tuples >>>> inside the window of the opposing input port. If the tuples match, then >>> an >>>> output tuple will be produced for each match. If at least one output was >>>> generated, a window punctuation will be generated after all the outputs." >>>> >>>> Cheers, >>>> Asterios >>>> >>>> [1] >>>> >>>> >>> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html >>>> >>>> >>>> >>>> On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < >>>> [hidden email]> wrote: >>>> >>>>> Hi Paris, >>>>> >>>>> thanks for the pointer to the Naiad paper. That is quite interesting. >>>>> >>>>> The paper I mentioned [1], does not describe the semantics in detail; >>> it >>>>> is more about the implementation for the stream-joins. However, it uses >>>>> the same semantics (from my understanding) as proposed by Gyula. >>>>> >>>>> -Matthias >>>>> >>>>> [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded >>>>> Streams". VLDB 2002. >>>>> >>>>> >>>>> >>>>> On 04/07/2015 12:38 PM, Paris Carbone wrote: >>>>>> Hello Matthias, >>>>>> >>>>>> Sure, ordering guarantees are indeed a tricky thing, I recall having >>>>> that discussion back in TU Berlin. Bear in mind thought that >>> DataStream, >>>>> our abstract data type, represents a *partitioned* unbounded sequence >>> of >>>>> events. There are no *global* ordering guarantees made whatsoever in >>> that >>>>> model across partitions. If you see it more generally there are many >>>> “race >>>>> conditions” in a distributed execution graph of vertices that process >>>>> multiple inputs asynchronously, especially when you add joins and >>>>> iterations into the mix (how do you deal with reprocessing “old” tuples >>>>> that iterate in the graph). Btw have you checked the Naiad paper [1]? >>>>> Stephan cited a while ago and it is quite relevant to that discussion. >>>>>> >>>>>> Also, can you cite the paper with the joining semantics you are >>>>> referring to? That would be of good help I think. >>>>>> >>>>>> Paris >>>>>> >>>>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >>>>>> >>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>> >>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>> On 07 Apr 2015, at 11:50, Matthias J. Sax < >>>> [hidden email] >>>>> <mailto:[hidden email]>> wrote: >>>>>> >>>>>> Hi @all, >>>>>> >>>>>> please keep me in the loop for this work. I am highly interested and >>> I >>>>>> want to help on it. >>>>>> >>>>>> My initial thoughts are as follows: >>>>>> >>>>>> 1) Currently, system timestamps are used and the suggested approach >>> can >>>>>> be seen as state-of-the-art (there is actually a research paper using >>>>>> the exact same join semantic). Of course, the current approach is >>>>>> inherently non-deterministic. The advantage is, that there is no >>>>>> overhead in keeping track of the order of records and the latency >>>> should >>>>>> be very low. (Additionally, state-recovery is simplified. Because, >>> the >>>>>> processing in inherently non-deterministic, recovery can be done with >>>>>> relaxed guarantees). >>>>>> >>>>>> 2) The user should be able to "switch on" deterministic processing, >>>>>> ie, records are timestamped (either externally when generated, or >>>>>> timestamped at the sources). Because deterministic processing adds >>> some >>>>>> overhead, the user should decide for it actively. >>>>>> In this case, the order must be preserved in each re-distribution >>> step >>>>>> (merging is sufficient, if order is preserved within each incoming >>>>>> channel). Furthermore, deterministic processing can be achieved by >>>> sound >>>>>> window semantics (and there is a bunch of them). Even for >>>>>> single-stream-windows it's a tricky problem; for join-windows it's >>> even >>>>>> harder. From my point of view, it is less important which semantics >>> are >>>>>> chosen; however, the user must be aware how it works. The most tricky >>>>>> part for deterministic processing, is to deal with duplicate >>> timestamps >>>>>> (which cannot be avoided). The timestamping for (intermediate) result >>>>>> tuples, is also an important question to be answered. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: >>>>>> Hey, >>>>>> >>>>>> I agree with Kostas, if we define the exact semantics how this works, >>>>> this >>>>>> is not more ad-hoc than any other stateful operator with multiple >>>> inputs. >>>>>> (And I don't think any other system support something similar) >>>>>> >>>>>> We need to make some design choices that are similar to the issues we >>>> had >>>>>> for windowing. We need to chose how we want to evaluate the windowing >>>>>> policies (global or local) because that affects what kind of policies >>>> can >>>>>> be parallel, but I can work on these things. >>>>>> >>>>>> I think this is an amazing feature, so I wouldn't necessarily rush >>> the >>>>>> implementation for 0.9 though. >>>>>> >>>>>> And thanks for helping writing these down. >>>>>> >>>>>> Gyula >>>>>> >>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email] >>>>> <mailto:[hidden email]>> wrote: >>>>>> >>>>>> Yes, we should write these semantics down. I volunteer to help. >>>>>> >>>>>> I don't think that this is very ad-hoc. The semantics are basically >>> the >>>>>> following. Assuming an arriving element from the left side: >>>>>> (1) We find the right-side matches >>>>>> (2) We insert the left-side arrival into the left window >>>>>> (3) We recompute the left window >>>>>> We need to see whether right window re-computation needs to be >>>> triggered >>>>> as >>>>>> well. I think that this way of joining streams is also what the >>>> symmetric >>>>>> hash join algorithms were meant to support. >>>>>> >>>>>> Kostas >>>>>> >>>>>> >>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email] >>>> <mailto: >>>>> [hidden email]>> wrote: >>>>>> >>>>>> Is the approach of joining an element at a time from one input >>> against >>>> a >>>>>> window on the other input not a bit arbitrary? >>>>>> >>>>>> This just joins whatever currently happens to be the window by the >>> time >>>>>> the >>>>>> single element arrives - that is a bit non-predictable, right? >>>>>> >>>>>> As a more general point: The whole semantics of windowing and when >>> they >>>>>> are >>>>>> triggered are a bit ad-hoc now. It would be really good to start >>>>>> formalizing that a bit and >>>>>> put it down somewhere. Users need to be able to clearly understand >>> and >>>>>> how >>>>>> to predict the output. >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email] >>>>> <mailto:[hidden email]>> >>>>>> wrote: >>>>>> >>>>>> I think it should be possible to make this compatible with the >>>>>> .window().every() calls. Maybe if there is some trigger set in >>> "every" >>>>>> we >>>>>> would not join that stream 1 by 1 but every so many elements. The >>>>>> problem >>>>>> here is that the window and every in this case are very-very >>> different >>>>>> than >>>>>> the normal windowing semantics. The window would define the join >>> window >>>>>> for >>>>>> each element of the other stream while every would define how often I >>>>>> join >>>>>> This stream with the other one. >>>>>> >>>>>> We need to think to make this intuitive. >>>>>> >>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >>>>>> [hidden email]<mailto:[hidden email]>> >>>>>> wrote: >>>>>> >>>>>> That would be really neat, the problem I see there, that we do not >>>>>> distinguish between dataStream.window() and >>>>>> dataStream.window().every() >>>>>> currently, they both return WindowedDataStreams and TriggerPolicies >>>>>> of >>>>>> the >>>>>> every call do not make much sense in this setting (in fact >>>>>> practically >>>>>> the >>>>>> trigger is always set to count of one). >>>>>> >>>>>> But of course we could make it in a way, that we check that the >>>>>> eviction >>>>>> should be either null or count of 1, in every other case we throw an >>>>>> exception while building the JobGraph. >>>>>> >>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >>>>>> [hidden email]<mailto:[hidden email]>> >>>>>> wrote: >>>>>> >>>>>> Or you could define it like this: >>>>>> >>>>>> stream_A = a.window(...) >>>>>> stream_B = b.window(...) >>>>>> >>>>>> stream_A.join(stream_B).where().equals().with() >>>>>> >>>>>> So a join would just be a join of two WindowedDataStreamS. This >>>>>> would >>>>>> neatly move the windowing stuff into one place. >>>>>> >>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >>>>>> [hidden email]<mailto:[hidden email]> >>>>>> >>>>>> wrote: >>>>>> Big +1 for the proposal for Peter and Gyula. I'm really for >>>>>> bringing >>>>>> the >>>>>> windowing and window join API in sync. >>>>>> >>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email] >>> <mailto: >>>>> [hidden email]>> >>>>>> wrote: >>>>>> >>>>>> Hey guys, >>>>>> >>>>>> As Aljoscha has highlighted earlier the current window join >>>>>> semantics >>>>>> in >>>>>> the streaming api doesn't follow the changes in the windowing >>>>>> api. >>>>>> More >>>>>> precisely, we currently only support joins over time windows of >>>>>> equal >>>>>> size >>>>>> on both streams. The reason for this is that we now take a >>>>>> window >>>>>> of >>>>>> each >>>>>> of the two streams and do joins over these pairs. This would be >>>>>> a >>>>>> blocking >>>>>> operation if the windows are not closed at exactly the same time >>>>>> (and >>>>>> since >>>>>> we dont want this we only allow time windows) >>>>>> >>>>>> I talked with Peter who came up with the initial idea of an >>>>>> alternative >>>>>> approach for stream joins which works as follows: >>>>>> >>>>>> Instead of pairing windows for joins, we do element against >>>>>> window >>>>>> joins. >>>>>> What this means is that whenever we receive an element from one >>>>>> of >>>>>> the >>>>>> streams, we join this element with the current window(this >>>>>> window >>>>>> is >>>>>> constantly updated) of the other stream. This is non-blocking on >>>>>> any >>>>>> window >>>>>> definitions as we dont have to wait for windows to be completed >>>>>> and >>>>>> we >>>>>> can >>>>>> use this with any of our predefined policies like Time.of(...), >>>>>> Count.of(...), Delta.of(....). >>>>>> >>>>>> Additionally this also allows some very flexible way of defining >>>>>> window >>>>>> joins. With this we could also define grouped windowing inside >>>>>> if >>>>>> a >>>>>> join. >>>>>> An example of this would be: Join all elements of Stream1 with >>>>>> the >>>>>> last >>>>>> 5 >>>>>> elements by a given windowkey of Stream2 on some join key. >>>>>> >>>>>> This feature can be easily implemented over the current >>>>>> operators, >>>>>> so >>>>>> I >>>>>> already have a working prototype for the simple non-grouped >>>>>> case. >>>>>> My >>>>>> only >>>>>> concern is the API, the best thing I could come up with is >>>>>> something >>>>>> like >>>>>> this: >>>>>> >>>>>> stream_A.join(stream_B).onWindow(windowDefA, >>>>>> windowDefB).by(windowKey1, >>>>>> windowKey2).where(...).equalTo(...).with(...) >>>>>> >>>>>> (the user can omit the "by" and "with" calls) >>>>>> >>>>>> I think this new approach would be worthy of our "flexible >>>>>> windowing" >>>>>> in >>>>>> contrast with the current approach. >>>>>> >>>>>> Regards, >>>>>> Gyula >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> > |
There is a simple reason for that: They don't support joins. :D
They support n-ary co-group, however. This is implemented using tagging and a group-by-key operation. So only elements in the same window can end up in the same co-grouped result. On Fri, Apr 24, 2015 at 3:51 PM, Matthias J. Sax <[hidden email]> wrote: > Interesting read. Thanks for the pointer. > > Take home message (in my understanding): > - they support wall-clock, attribute-ts, and count windows > -> default is attribute-ts (and not wall-clock as in Flink) > -> it is not specified, if a global order is applied to windows, but I > doubt it, because of their Watermark approach > - they allow the user to assign timestamps for attribute-ts windows > - they deal with out-of-order data (-> not sure what the last sentence > means exactly: "...causing the late elements to be emitted as they > arrive." ?) > - their "Watermark" approach might yield high latencies > > However, they don't talk about joins... :( > > > > > On 04/24/2015 02:25 PM, Aljoscha Krettek wrote: >> Did anyone read these: >> https://cloud.google.com/dataflow/model/windowing, >> https://cloud.google.com/dataflow/model/triggers ? >> >> The semantics seem very straightforward and I'm sure the google guys >> spent some time thinking this through. :D >> >> On Mon, Apr 20, 2015 at 3:43 PM, Stephan Ewen <[hidden email]> wrote: >>> Perfect! I am eager to see what you came up with! >>> >>> On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <[hidden email]> wrote: >>> >>>> Hey all, >>>> >>>> We have spent some time with Asterios, Paris and Jonas to finalize the >>>> windowing semantics (both the current features and the window join), and I >>>> think we made very have come up with a very clear picture. >>>> >>>> We will write down the proposed semantics and publish it to the wiki next >>>> week. >>>> >>>> Cheers, >>>> Gyula >>>> >>>> On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos < >>>> [hidden email]> wrote: >>>> >>>>> As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere >>>>> Streams does: symmetric hash join. >>>>> >>>>> From [1]: >>>>> "When a tuple is received on an input port, it is inserted into the >>>> window >>>>> corresponding to the input port, which causes the window to trigger. As >>>>> part of the trigger processing, the tuple is compared against all tuples >>>>> inside the window of the opposing input port. If the tuples match, then >>>> an >>>>> output tuple will be produced for each match. If at least one output was >>>>> generated, a window punctuation will be generated after all the outputs." >>>>> >>>>> Cheers, >>>>> Asterios >>>>> >>>>> [1] >>>>> >>>>> >>>> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html >>>>> >>>>> >>>>> >>>>> On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < >>>>> [hidden email]> wrote: >>>>> >>>>>> Hi Paris, >>>>>> >>>>>> thanks for the pointer to the Naiad paper. That is quite interesting. >>>>>> >>>>>> The paper I mentioned [1], does not describe the semantics in detail; >>>> it >>>>>> is more about the implementation for the stream-joins. However, it uses >>>>>> the same semantics (from my understanding) as proposed by Gyula. >>>>>> >>>>>> -Matthias >>>>>> >>>>>> [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded >>>>>> Streams". VLDB 2002. >>>>>> >>>>>> >>>>>> >>>>>> On 04/07/2015 12:38 PM, Paris Carbone wrote: >>>>>>> Hello Matthias, >>>>>>> >>>>>>> Sure, ordering guarantees are indeed a tricky thing, I recall having >>>>>> that discussion back in TU Berlin. Bear in mind thought that >>>> DataStream, >>>>>> our abstract data type, represents a *partitioned* unbounded sequence >>>> of >>>>>> events. There are no *global* ordering guarantees made whatsoever in >>>> that >>>>>> model across partitions. If you see it more generally there are many >>>>> “race >>>>>> conditions” in a distributed execution graph of vertices that process >>>>>> multiple inputs asynchronously, especially when you add joins and >>>>>> iterations into the mix (how do you deal with reprocessing “old” tuples >>>>>> that iterate in the graph). Btw have you checked the Naiad paper [1]? >>>>>> Stephan cited a while ago and it is quite relevant to that discussion. >>>>>>> >>>>>>> Also, can you cite the paper with the joining semantics you are >>>>>> referring to? That would be of good help I think. >>>>>>> >>>>>>> Paris >>>>>>> >>>>>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >>>>>>> >>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>>> >>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>>> On 07 Apr 2015, at 11:50, Matthias J. Sax < >>>>> [hidden email] >>>>>> <mailto:[hidden email]>> wrote: >>>>>>> >>>>>>> Hi @all, >>>>>>> >>>>>>> please keep me in the loop for this work. I am highly interested and >>>> I >>>>>>> want to help on it. >>>>>>> >>>>>>> My initial thoughts are as follows: >>>>>>> >>>>>>> 1) Currently, system timestamps are used and the suggested approach >>>> can >>>>>>> be seen as state-of-the-art (there is actually a research paper using >>>>>>> the exact same join semantic). Of course, the current approach is >>>>>>> inherently non-deterministic. The advantage is, that there is no >>>>>>> overhead in keeping track of the order of records and the latency >>>>> should >>>>>>> be very low. (Additionally, state-recovery is simplified. Because, >>>> the >>>>>>> processing in inherently non-deterministic, recovery can be done with >>>>>>> relaxed guarantees). >>>>>>> >>>>>>> 2) The user should be able to "switch on" deterministic processing, >>>>>>> ie, records are timestamped (either externally when generated, or >>>>>>> timestamped at the sources). Because deterministic processing adds >>>> some >>>>>>> overhead, the user should decide for it actively. >>>>>>> In this case, the order must be preserved in each re-distribution >>>> step >>>>>>> (merging is sufficient, if order is preserved within each incoming >>>>>>> channel). Furthermore, deterministic processing can be achieved by >>>>> sound >>>>>>> window semantics (and there is a bunch of them). Even for >>>>>>> single-stream-windows it's a tricky problem; for join-windows it's >>>> even >>>>>>> harder. From my point of view, it is less important which semantics >>>> are >>>>>>> chosen; however, the user must be aware how it works. The most tricky >>>>>>> part for deterministic processing, is to deal with duplicate >>>> timestamps >>>>>>> (which cannot be avoided). The timestamping for (intermediate) result >>>>>>> tuples, is also an important question to be answered. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: >>>>>>> Hey, >>>>>>> >>>>>>> I agree with Kostas, if we define the exact semantics how this works, >>>>>> this >>>>>>> is not more ad-hoc than any other stateful operator with multiple >>>>> inputs. >>>>>>> (And I don't think any other system support something similar) >>>>>>> >>>>>>> We need to make some design choices that are similar to the issues we >>>>> had >>>>>>> for windowing. We need to chose how we want to evaluate the windowing >>>>>>> policies (global or local) because that affects what kind of policies >>>>> can >>>>>>> be parallel, but I can work on these things. >>>>>>> >>>>>>> I think this is an amazing feature, so I wouldn't necessarily rush >>>> the >>>>>>> implementation for 0.9 though. >>>>>>> >>>>>>> And thanks for helping writing these down. >>>>>>> >>>>>>> Gyula >>>>>>> >>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email] >>>>>> <mailto:[hidden email]>> wrote: >>>>>>> >>>>>>> Yes, we should write these semantics down. I volunteer to help. >>>>>>> >>>>>>> I don't think that this is very ad-hoc. The semantics are basically >>>> the >>>>>>> following. Assuming an arriving element from the left side: >>>>>>> (1) We find the right-side matches >>>>>>> (2) We insert the left-side arrival into the left window >>>>>>> (3) We recompute the left window >>>>>>> We need to see whether right window re-computation needs to be >>>>> triggered >>>>>> as >>>>>>> well. I think that this way of joining streams is also what the >>>>> symmetric >>>>>>> hash join algorithms were meant to support. >>>>>>> >>>>>>> Kostas >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email] >>>>> <mailto: >>>>>> [hidden email]>> wrote: >>>>>>> >>>>>>> Is the approach of joining an element at a time from one input >>>> against >>>>> a >>>>>>> window on the other input not a bit arbitrary? >>>>>>> >>>>>>> This just joins whatever currently happens to be the window by the >>>> time >>>>>>> the >>>>>>> single element arrives - that is a bit non-predictable, right? >>>>>>> >>>>>>> As a more general point: The whole semantics of windowing and when >>>> they >>>>>>> are >>>>>>> triggered are a bit ad-hoc now. It would be really good to start >>>>>>> formalizing that a bit and >>>>>>> put it down somewhere. Users need to be able to clearly understand >>>> and >>>>>>> how >>>>>>> to predict the output. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email] >>>>>> <mailto:[hidden email]>> >>>>>>> wrote: >>>>>>> >>>>>>> I think it should be possible to make this compatible with the >>>>>>> .window().every() calls. Maybe if there is some trigger set in >>>> "every" >>>>>>> we >>>>>>> would not join that stream 1 by 1 but every so many elements. The >>>>>>> problem >>>>>>> here is that the window and every in this case are very-very >>>> different >>>>>>> than >>>>>>> the normal windowing semantics. The window would define the join >>>> window >>>>>>> for >>>>>>> each element of the other stream while every would define how often I >>>>>>> join >>>>>>> This stream with the other one. >>>>>>> >>>>>>> We need to think to make this intuitive. >>>>>>> >>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >>>>>>> [hidden email]<mailto:[hidden email]>> >>>>>>> wrote: >>>>>>> >>>>>>> That would be really neat, the problem I see there, that we do not >>>>>>> distinguish between dataStream.window() and >>>>>>> dataStream.window().every() >>>>>>> currently, they both return WindowedDataStreams and TriggerPolicies >>>>>>> of >>>>>>> the >>>>>>> every call do not make much sense in this setting (in fact >>>>>>> practically >>>>>>> the >>>>>>> trigger is always set to count of one). >>>>>>> >>>>>>> But of course we could make it in a way, that we check that the >>>>>>> eviction >>>>>>> should be either null or count of 1, in every other case we throw an >>>>>>> exception while building the JobGraph. >>>>>>> >>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >>>>>>> [hidden email]<mailto:[hidden email]>> >>>>>>> wrote: >>>>>>> >>>>>>> Or you could define it like this: >>>>>>> >>>>>>> stream_A = a.window(...) >>>>>>> stream_B = b.window(...) >>>>>>> >>>>>>> stream_A.join(stream_B).where().equals().with() >>>>>>> >>>>>>> So a join would just be a join of two WindowedDataStreamS. This >>>>>>> would >>>>>>> neatly move the windowing stuff into one place. >>>>>>> >>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >>>>>>> [hidden email]<mailto:[hidden email]> >>>>>>> >>>>>>> wrote: >>>>>>> Big +1 for the proposal for Peter and Gyula. I'm really for >>>>>>> bringing >>>>>>> the >>>>>>> windowing and window join API in sync. >>>>>>> >>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email] >>>> <mailto: >>>>>> [hidden email]>> >>>>>>> wrote: >>>>>>> >>>>>>> Hey guys, >>>>>>> >>>>>>> As Aljoscha has highlighted earlier the current window join >>>>>>> semantics >>>>>>> in >>>>>>> the streaming api doesn't follow the changes in the windowing >>>>>>> api. >>>>>>> More >>>>>>> precisely, we currently only support joins over time windows of >>>>>>> equal >>>>>>> size >>>>>>> on both streams. The reason for this is that we now take a >>>>>>> window >>>>>>> of >>>>>>> each >>>>>>> of the two streams and do joins over these pairs. This would be >>>>>>> a >>>>>>> blocking >>>>>>> operation if the windows are not closed at exactly the same time >>>>>>> (and >>>>>>> since >>>>>>> we dont want this we only allow time windows) >>>>>>> >>>>>>> I talked with Peter who came up with the initial idea of an >>>>>>> alternative >>>>>>> approach for stream joins which works as follows: >>>>>>> >>>>>>> Instead of pairing windows for joins, we do element against >>>>>>> window >>>>>>> joins. >>>>>>> What this means is that whenever we receive an element from one >>>>>>> of >>>>>>> the >>>>>>> streams, we join this element with the current window(this >>>>>>> window >>>>>>> is >>>>>>> constantly updated) of the other stream. This is non-blocking on >>>>>>> any >>>>>>> window >>>>>>> definitions as we dont have to wait for windows to be completed >>>>>>> and >>>>>>> we >>>>>>> can >>>>>>> use this with any of our predefined policies like Time.of(...), >>>>>>> Count.of(...), Delta.of(....). >>>>>>> >>>>>>> Additionally this also allows some very flexible way of defining >>>>>>> window >>>>>>> joins. With this we could also define grouped windowing inside >>>>>>> if >>>>>>> a >>>>>>> join. >>>>>>> An example of this would be: Join all elements of Stream1 with >>>>>>> the >>>>>>> last >>>>>>> 5 >>>>>>> elements by a given windowkey of Stream2 on some join key. >>>>>>> >>>>>>> This feature can be easily implemented over the current >>>>>>> operators, >>>>>>> so >>>>>>> I >>>>>>> already have a working prototype for the simple non-grouped >>>>>>> case. >>>>>>> My >>>>>>> only >>>>>>> concern is the API, the best thing I could come up with is >>>>>>> something >>>>>>> like >>>>>>> this: >>>>>>> >>>>>>> stream_A.join(stream_B).onWindow(windowDefA, >>>>>>> windowDefB).by(windowKey1, >>>>>>> windowKey2).where(...).equalTo(...).with(...) >>>>>>> >>>>>>> (the user can omit the "by" and "with" calls) >>>>>>> >>>>>>> I think this new approach would be worthy of our "flexible >>>>>>> windowing" >>>>>>> in >>>>>>> contrast with the current approach. >>>>>>> >>>>>>> Regards, >>>>>>> Gyula >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >> > |
Free forum by Nabble | Edit this page |