Rework of the window-join semantics

classic Classic list List threaded Threaded
26 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Rework of the window-join semantics

Gyula Fóra-2
Hey guys,

As Aljoscha has highlighted earlier the current window join semantics in
the streaming api doesn't follow the changes in the windowing api. More
precisely, we currently only support joins over time windows of equal size
on both streams. The reason for this is that we now take a window of each
of the two streams and do joins over these pairs. This would be a blocking
operation if the windows are not closed at exactly the same time (and since
we dont want this we only allow time windows)

I talked with Peter who came up with the initial idea of an alternative
approach for stream joins which works as follows:

Instead of pairing windows for joins, we do element against window joins.
What this means is that whenever we receive an element from one of the
streams, we join this element with the current window(this window is
constantly updated) of the other stream. This is non-blocking on any window
definitions as we dont have to wait for windows to be completed and we can
use this with any of our predefined policies like Time.of(...),
Count.of(...), Delta.of(....).

Additionally this also allows some very flexible way of defining window
joins. With this we could also define grouped windowing inside if a join.
An example of this would be: Join all elements of Stream1 with the last 5
elements by a given windowkey of Stream2 on some join key.

This feature can be easily implemented over the current operators, so I
already have a working prototype for the simple non-grouped case. My only
concern is the API, the best thing I could come up with is something like
this:

stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1,
windowKey2).where(...).equalTo(...).with(...)

(the user can omit the "by" and "with" calls)

I think this new approach would be worthy of our "flexible windowing" in
contrast with the current approach.

Regards,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Márton Balassi
Big +1 for the proposal for Peter and Gyula. I'm really for bringing the
windowing and window join API in sync.

On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]> wrote:

> Hey guys,
>
> As Aljoscha has highlighted earlier the current window join semantics in
> the streaming api doesn't follow the changes in the windowing api. More
> precisely, we currently only support joins over time windows of equal size
> on both streams. The reason for this is that we now take a window of each
> of the two streams and do joins over these pairs. This would be a blocking
> operation if the windows are not closed at exactly the same time (and since
> we dont want this we only allow time windows)
>
> I talked with Peter who came up with the initial idea of an alternative
> approach for stream joins which works as follows:
>
> Instead of pairing windows for joins, we do element against window joins.
> What this means is that whenever we receive an element from one of the
> streams, we join this element with the current window(this window is
> constantly updated) of the other stream. This is non-blocking on any window
> definitions as we dont have to wait for windows to be completed and we can
> use this with any of our predefined policies like Time.of(...),
> Count.of(...), Delta.of(....).
>
> Additionally this also allows some very flexible way of defining window
> joins. With this we could also define grouped windowing inside if a join.
> An example of this would be: Join all elements of Stream1 with the last 5
> elements by a given windowkey of Stream2 on some join key.
>
> This feature can be easily implemented over the current operators, so I
> already have a working prototype for the simple non-grouped case. My only
> concern is the API, the best thing I could come up with is something like
> this:
>
> stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1,
> windowKey2).where(...).equalTo(...).with(...)
>
> (the user can omit the "by" and "with" calls)
>
> I think this new approach would be worthy of our "flexible windowing" in
> contrast with the current approach.
>
> Regards,
> Gyula
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Aljoscha Krettek-2
Or you could define it like this:

stream_A = a.window(...)
stream_B = b.window(...)

stream_A.join(stream_B).where().equals().with()

So a join would just be a join of two WindowedDataStreamS. This would
neatly move the windowing stuff into one place.

On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <[hidden email]> wrote:

> Big +1 for the proposal for Peter and Gyula. I'm really for bringing the
> windowing and window join API in sync.
>
> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]> wrote:
>
>> Hey guys,
>>
>> As Aljoscha has highlighted earlier the current window join semantics in
>> the streaming api doesn't follow the changes in the windowing api. More
>> precisely, we currently only support joins over time windows of equal size
>> on both streams. The reason for this is that we now take a window of each
>> of the two streams and do joins over these pairs. This would be a blocking
>> operation if the windows are not closed at exactly the same time (and since
>> we dont want this we only allow time windows)
>>
>> I talked with Peter who came up with the initial idea of an alternative
>> approach for stream joins which works as follows:
>>
>> Instead of pairing windows for joins, we do element against window joins.
>> What this means is that whenever we receive an element from one of the
>> streams, we join this element with the current window(this window is
>> constantly updated) of the other stream. This is non-blocking on any window
>> definitions as we dont have to wait for windows to be completed and we can
>> use this with any of our predefined policies like Time.of(...),
>> Count.of(...), Delta.of(....).
>>
>> Additionally this also allows some very flexible way of defining window
>> joins. With this we could also define grouped windowing inside if a join.
>> An example of this would be: Join all elements of Stream1 with the last 5
>> elements by a given windowkey of Stream2 on some join key.
>>
>> This feature can be easily implemented over the current operators, so I
>> already have a working prototype for the simple non-grouped case. My only
>> concern is the API, the best thing I could come up with is something like
>> this:
>>
>> stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1,
>> windowKey2).where(...).equalTo(...).with(...)
>>
>> (the user can omit the "by" and "with" calls)
>>
>> I think this new approach would be worthy of our "flexible windowing" in
>> contrast with the current approach.
>>
>> Regards,
>> Gyula
>>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Márton Balassi
That would be really neat, the problem I see there, that we do not
distinguish between dataStream.window() and dataStream.window().every()
currently, they both return WindowedDataStreams and TriggerPolicies of the
every call do not make much sense in this setting (in fact practically the
trigger is always set to count of one).

But of course we could make it in a way, that we check that the eviction
should be either null or count of 1, in every other case we throw an
exception while building the JobGraph.

On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Or you could define it like this:
>
> stream_A = a.window(...)
> stream_B = b.window(...)
>
> stream_A.join(stream_B).where().equals().with()
>
> So a join would just be a join of two WindowedDataStreamS. This would
> neatly move the windowing stuff into one place.
>
> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <[hidden email]>
> wrote:
> > Big +1 for the proposal for Peter and Gyula. I'm really for bringing the
> > windowing and window join API in sync.
> >
> > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]> wrote:
> >
> >> Hey guys,
> >>
> >> As Aljoscha has highlighted earlier the current window join semantics in
> >> the streaming api doesn't follow the changes in the windowing api. More
> >> precisely, we currently only support joins over time windows of equal
> size
> >> on both streams. The reason for this is that we now take a window of
> each
> >> of the two streams and do joins over these pairs. This would be a
> blocking
> >> operation if the windows are not closed at exactly the same time (and
> since
> >> we dont want this we only allow time windows)
> >>
> >> I talked with Peter who came up with the initial idea of an alternative
> >> approach for stream joins which works as follows:
> >>
> >> Instead of pairing windows for joins, we do element against window
> joins.
> >> What this means is that whenever we receive an element from one of the
> >> streams, we join this element with the current window(this window is
> >> constantly updated) of the other stream. This is non-blocking on any
> window
> >> definitions as we dont have to wait for windows to be completed and we
> can
> >> use this with any of our predefined policies like Time.of(...),
> >> Count.of(...), Delta.of(....).
> >>
> >> Additionally this also allows some very flexible way of defining window
> >> joins. With this we could also define grouped windowing inside if a
> join.
> >> An example of this would be: Join all elements of Stream1 with the last
> 5
> >> elements by a given windowkey of Stream2 on some join key.
> >>
> >> This feature can be easily implemented over the current operators, so I
> >> already have a working prototype for the simple non-grouped case. My
> only
> >> concern is the API, the best thing I could come up with is something
> like
> >> this:
> >>
> >> stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1,
> >> windowKey2).where(...).equalTo(...).with(...)
> >>
> >> (the user can omit the "by" and "with" calls)
> >>
> >> I think this new approach would be worthy of our "flexible windowing" in
> >> contrast with the current approach.
> >>
> >> Regards,
> >> Gyula
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Gyula Fóra
I think it should be possible to make this compatible with the
.window().every() calls. Maybe if there is some trigger set in "every" we
would not join that stream 1 by 1 but every so many elements. The problem
here is that the window and every in this case are very-very different than
the normal windowing semantics. The window would define the join window for
each element of the other stream while every would define how often I join
This stream with the other one.

We need to think to make this intuitive.

On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <[hidden email]>
wrote:

> That would be really neat, the problem I see there, that we do not
> distinguish between dataStream.window() and dataStream.window().every()
> currently, they both return WindowedDataStreams and TriggerPolicies of the
> every call do not make much sense in this setting (in fact practically the
> trigger is always set to count of one).
>
> But of course we could make it in a way, that we check that the eviction
> should be either null or count of 1, in every other case we throw an
> exception while building the JobGraph.
>
> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Or you could define it like this:
> >
> > stream_A = a.window(...)
> > stream_B = b.window(...)
> >
> > stream_A.join(stream_B).where().equals().with()
> >
> > So a join would just be a join of two WindowedDataStreamS. This would
> > neatly move the windowing stuff into one place.
> >
> > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <[hidden email]
> >
> > wrote:
> > > Big +1 for the proposal for Peter and Gyula. I'm really for bringing
> the
> > > windowing and window join API in sync.
> > >
> > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]> wrote:
> > >
> > >> Hey guys,
> > >>
> > >> As Aljoscha has highlighted earlier the current window join semantics
> in
> > >> the streaming api doesn't follow the changes in the windowing api.
> More
> > >> precisely, we currently only support joins over time windows of equal
> > size
> > >> on both streams. The reason for this is that we now take a window of
> > each
> > >> of the two streams and do joins over these pairs. This would be a
> > blocking
> > >> operation if the windows are not closed at exactly the same time (and
> > since
> > >> we dont want this we only allow time windows)
> > >>
> > >> I talked with Peter who came up with the initial idea of an
> alternative
> > >> approach for stream joins which works as follows:
> > >>
> > >> Instead of pairing windows for joins, we do element against window
> > joins.
> > >> What this means is that whenever we receive an element from one of the
> > >> streams, we join this element with the current window(this window is
> > >> constantly updated) of the other stream. This is non-blocking on any
> > window
> > >> definitions as we dont have to wait for windows to be completed and we
> > can
> > >> use this with any of our predefined policies like Time.of(...),
> > >> Count.of(...), Delta.of(....).
> > >>
> > >> Additionally this also allows some very flexible way of defining
> window
> > >> joins. With this we could also define grouped windowing inside if a
> > join.
> > >> An example of this would be: Join all elements of Stream1 with the
> last
> > 5
> > >> elements by a given windowkey of Stream2 on some join key.
> > >>
> > >> This feature can be easily implemented over the current operators, so
> I
> > >> already have a working prototype for the simple non-grouped case. My
> > only
> > >> concern is the API, the best thing I could come up with is something
> > like
> > >> this:
> > >>
> > >> stream_A.join(stream_B).onWindow(windowDefA,
> windowDefB).by(windowKey1,
> > >> windowKey2).where(...).equalTo(...).with(...)
> > >>
> > >> (the user can omit the "by" and "with" calls)
> > >>
> > >> I think this new approach would be worthy of our "flexible windowing"
> in
> > >> contrast with the current approach.
> > >>
> > >> Regards,
> > >> Gyula
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Stephan Ewen
Is the approach of joining an element at a time from one input against a
window on the other input not a bit arbitrary?

This just joins whatever currently happens to be the window by the time the
single element arrives - that is a bit non-predictable, right?

As a more general point: The whole semantics of windowing and when they are
triggered are a bit ad-hoc now. It would be really good to start
formalizing that a bit and
put it down somewhere. Users need to be able to clearly understand and how
to predict the output.



On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email]> wrote:

> I think it should be possible to make this compatible with the
> .window().every() calls. Maybe if there is some trigger set in "every" we
> would not join that stream 1 by 1 but every so many elements. The problem
> here is that the window and every in this case are very-very different than
> the normal windowing semantics. The window would define the join window for
> each element of the other stream while every would define how often I join
> This stream with the other one.
>
> We need to think to make this intuitive.
>
> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <[hidden email]>
> wrote:
>
> > That would be really neat, the problem I see there, that we do not
> > distinguish between dataStream.window() and dataStream.window().every()
> > currently, they both return WindowedDataStreams and TriggerPolicies of
> the
> > every call do not make much sense in this setting (in fact practically
> the
> > trigger is always set to count of one).
> >
> > But of course we could make it in a way, that we check that the eviction
> > should be either null or count of 1, in every other case we throw an
> > exception while building the JobGraph.
> >
> > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Or you could define it like this:
> > >
> > > stream_A = a.window(...)
> > > stream_B = b.window(...)
> > >
> > > stream_A.join(stream_B).where().equals().with()
> > >
> > > So a join would just be a join of two WindowedDataStreamS. This would
> > > neatly move the windowing stuff into one place.
> > >
> > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> [hidden email]
> > >
> > > wrote:
> > > > Big +1 for the proposal for Peter and Gyula. I'm really for bringing
> > the
> > > > windowing and window join API in sync.
> > > >
> > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]>
> wrote:
> > > >
> > > >> Hey guys,
> > > >>
> > > >> As Aljoscha has highlighted earlier the current window join
> semantics
> > in
> > > >> the streaming api doesn't follow the changes in the windowing api.
> > More
> > > >> precisely, we currently only support joins over time windows of
> equal
> > > size
> > > >> on both streams. The reason for this is that we now take a window of
> > > each
> > > >> of the two streams and do joins over these pairs. This would be a
> > > blocking
> > > >> operation if the windows are not closed at exactly the same time
> (and
> > > since
> > > >> we dont want this we only allow time windows)
> > > >>
> > > >> I talked with Peter who came up with the initial idea of an
> > alternative
> > > >> approach for stream joins which works as follows:
> > > >>
> > > >> Instead of pairing windows for joins, we do element against window
> > > joins.
> > > >> What this means is that whenever we receive an element from one of
> the
> > > >> streams, we join this element with the current window(this window is
> > > >> constantly updated) of the other stream. This is non-blocking on any
> > > window
> > > >> definitions as we dont have to wait for windows to be completed and
> we
> > > can
> > > >> use this with any of our predefined policies like Time.of(...),
> > > >> Count.of(...), Delta.of(....).
> > > >>
> > > >> Additionally this also allows some very flexible way of defining
> > window
> > > >> joins. With this we could also define grouped windowing inside if a
> > > join.
> > > >> An example of this would be: Join all elements of Stream1 with the
> > last
> > > 5
> > > >> elements by a given windowkey of Stream2 on some join key.
> > > >>
> > > >> This feature can be easily implemented over the current operators,
> so
> > I
> > > >> already have a working prototype for the simple non-grouped case. My
> > > only
> > > >> concern is the API, the best thing I could come up with is something
> > > like
> > > >> this:
> > > >>
> > > >> stream_A.join(stream_B).onWindow(windowDefA,
> > windowDefB).by(windowKey1,
> > > >> windowKey2).where(...).equalTo(...).with(...)
> > > >>
> > > >> (the user can omit the "by" and "with" calls)
> > > >>
> > > >> I think this new approach would be worthy of our "flexible
> windowing"
> > in
> > > >> contrast with the current approach.
> > > >>
> > > >> Regards,
> > > >> Gyula
> > > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Kostas Tzoumas-2
Yes, we should write these semantics down. I volunteer to help.

I don't think that this is very ad-hoc. The semantics are basically the
following. Assuming an arriving element from the left side:
(1) We find the right-side matches
(2) We insert the left-side arrival into the left window
(3) We recompute the left window
We need to see whether right window re-computation needs to be triggered as
well. I think that this way of joining streams is also what the symmetric
hash join algorithms were meant to support.

Kostas


On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:

> Is the approach of joining an element at a time from one input against a
> window on the other input not a bit arbitrary?
>
> This just joins whatever currently happens to be the window by the time the
> single element arrives - that is a bit non-predictable, right?
>
> As a more general point: The whole semantics of windowing and when they are
> triggered are a bit ad-hoc now. It would be really good to start
> formalizing that a bit and
> put it down somewhere. Users need to be able to clearly understand and how
> to predict the output.
>
>
>
> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email]> wrote:
>
> > I think it should be possible to make this compatible with the
> > .window().every() calls. Maybe if there is some trigger set in "every" we
> > would not join that stream 1 by 1 but every so many elements. The problem
> > here is that the window and every in this case are very-very different
> than
> > the normal windowing semantics. The window would define the join window
> for
> > each element of the other stream while every would define how often I
> join
> > This stream with the other one.
> >
> > We need to think to make this intuitive.
> >
> > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> [hidden email]>
> > wrote:
> >
> > > That would be really neat, the problem I see there, that we do not
> > > distinguish between dataStream.window() and dataStream.window().every()
> > > currently, they both return WindowedDataStreams and TriggerPolicies of
> > the
> > > every call do not make much sense in this setting (in fact practically
> > the
> > > trigger is always set to count of one).
> > >
> > > But of course we could make it in a way, that we check that the
> eviction
> > > should be either null or count of 1, in every other case we throw an
> > > exception while building the JobGraph.
> > >
> > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <[hidden email]>
> > > wrote:
> > >
> > > > Or you could define it like this:
> > > >
> > > > stream_A = a.window(...)
> > > > stream_B = b.window(...)
> > > >
> > > > stream_A.join(stream_B).where().equals().with()
> > > >
> > > > So a join would just be a join of two WindowedDataStreamS. This would
> > > > neatly move the windowing stuff into one place.
> > > >
> > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> > [hidden email]
> > > >
> > > > wrote:
> > > > > Big +1 for the proposal for Peter and Gyula. I'm really for
> bringing
> > > the
> > > > > windowing and window join API in sync.
> > > > >
> > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]>
> > wrote:
> > > > >
> > > > >> Hey guys,
> > > > >>
> > > > >> As Aljoscha has highlighted earlier the current window join
> > semantics
> > > in
> > > > >> the streaming api doesn't follow the changes in the windowing api.
> > > More
> > > > >> precisely, we currently only support joins over time windows of
> > equal
> > > > size
> > > > >> on both streams. The reason for this is that we now take a window
> of
> > > > each
> > > > >> of the two streams and do joins over these pairs. This would be a
> > > > blocking
> > > > >> operation if the windows are not closed at exactly the same time
> > (and
> > > > since
> > > > >> we dont want this we only allow time windows)
> > > > >>
> > > > >> I talked with Peter who came up with the initial idea of an
> > > alternative
> > > > >> approach for stream joins which works as follows:
> > > > >>
> > > > >> Instead of pairing windows for joins, we do element against window
> > > > joins.
> > > > >> What this means is that whenever we receive an element from one of
> > the
> > > > >> streams, we join this element with the current window(this window
> is
> > > > >> constantly updated) of the other stream. This is non-blocking on
> any
> > > > window
> > > > >> definitions as we dont have to wait for windows to be completed
> and
> > we
> > > > can
> > > > >> use this with any of our predefined policies like Time.of(...),
> > > > >> Count.of(...), Delta.of(....).
> > > > >>
> > > > >> Additionally this also allows some very flexible way of defining
> > > window
> > > > >> joins. With this we could also define grouped windowing inside if
> a
> > > > join.
> > > > >> An example of this would be: Join all elements of Stream1 with the
> > > last
> > > > 5
> > > > >> elements by a given windowkey of Stream2 on some join key.
> > > > >>
> > > > >> This feature can be easily implemented over the current operators,
> > so
> > > I
> > > > >> already have a working prototype for the simple non-grouped case.
> My
> > > > only
> > > > >> concern is the API, the best thing I could come up with is
> something
> > > > like
> > > > >> this:
> > > > >>
> > > > >> stream_A.join(stream_B).onWindow(windowDefA,
> > > windowDefB).by(windowKey1,
> > > > >> windowKey2).where(...).equalTo(...).with(...)
> > > > >>
> > > > >> (the user can omit the "by" and "with" calls)
> > > > >>
> > > > >> I think this new approach would be worthy of our "flexible
> > windowing"
> > > in
> > > > >> contrast with the current approach.
> > > > >>
> > > > >> Regards,
> > > > >> Gyula
> > > > >>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Gyula Fóra
Hey,

I agree with Kostas, if we define the exact semantics how this works, this
is not more ad-hoc than any other stateful operator with multiple inputs.
(And I don't think any other system support something similar)

We need to make some design choices that are similar to the issues we had
for windowing. We need to chose how we want to evaluate the windowing
policies (global or local) because that affects what kind of policies can
be parallel, but I can work on these things.

I think this is an amazing feature, so I wouldn't necessarily rush the
implementation for 0.9 though.

And thanks for helping writing these down.

Gyula

On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email]> wrote:

> Yes, we should write these semantics down. I volunteer to help.
>
> I don't think that this is very ad-hoc. The semantics are basically the
> following. Assuming an arriving element from the left side:
> (1) We find the right-side matches
> (2) We insert the left-side arrival into the left window
> (3) We recompute the left window
> We need to see whether right window re-computation needs to be triggered as
> well. I think that this way of joining streams is also what the symmetric
> hash join algorithms were meant to support.
>
> Kostas
>
>
> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
>
> > Is the approach of joining an element at a time from one input against a
> > window on the other input not a bit arbitrary?
> >
> > This just joins whatever currently happens to be the window by the time
> the
> > single element arrives - that is a bit non-predictable, right?
> >
> > As a more general point: The whole semantics of windowing and when they
> are
> > triggered are a bit ad-hoc now. It would be really good to start
> > formalizing that a bit and
> > put it down somewhere. Users need to be able to clearly understand and
> how
> > to predict the output.
> >
> >
> >
> > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email]>
> wrote:
> >
> > > I think it should be possible to make this compatible with the
> > > .window().every() calls. Maybe if there is some trigger set in "every"
> we
> > > would not join that stream 1 by 1 but every so many elements. The
> problem
> > > here is that the window and every in this case are very-very different
> > than
> > > the normal windowing semantics. The window would define the join window
> > for
> > > each element of the other stream while every would define how often I
> > join
> > > This stream with the other one.
> > >
> > > We need to think to make this intuitive.
> > >
> > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> > [hidden email]>
> > > wrote:
> > >
> > > > That would be really neat, the problem I see there, that we do not
> > > > distinguish between dataStream.window() and
> dataStream.window().every()
> > > > currently, they both return WindowedDataStreams and TriggerPolicies
> of
> > > the
> > > > every call do not make much sense in this setting (in fact
> practically
> > > the
> > > > trigger is always set to count of one).
> > > >
> > > > But of course we could make it in a way, that we check that the
> > eviction
> > > > should be either null or count of 1, in every other case we throw an
> > > > exception while building the JobGraph.
> > > >
> > > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > Or you could define it like this:
> > > > >
> > > > > stream_A = a.window(...)
> > > > > stream_B = b.window(...)
> > > > >
> > > > > stream_A.join(stream_B).where().equals().with()
> > > > >
> > > > > So a join would just be a join of two WindowedDataStreamS. This
> would
> > > > > neatly move the windowing stuff into one place.
> > > > >
> > > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> > > [hidden email]
> > > > >
> > > > > wrote:
> > > > > > Big +1 for the proposal for Peter and Gyula. I'm really for
> > bringing
> > > > the
> > > > > > windowing and window join API in sync.
> > > > > >
> > > > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]>
> > > wrote:
> > > > > >
> > > > > >> Hey guys,
> > > > > >>
> > > > > >> As Aljoscha has highlighted earlier the current window join
> > > semantics
> > > > in
> > > > > >> the streaming api doesn't follow the changes in the windowing
> api.
> > > > More
> > > > > >> precisely, we currently only support joins over time windows of
> > > equal
> > > > > size
> > > > > >> on both streams. The reason for this is that we now take a
> window
> > of
> > > > > each
> > > > > >> of the two streams and do joins over these pairs. This would be
> a
> > > > > blocking
> > > > > >> operation if the windows are not closed at exactly the same time
> > > (and
> > > > > since
> > > > > >> we dont want this we only allow time windows)
> > > > > >>
> > > > > >> I talked with Peter who came up with the initial idea of an
> > > > alternative
> > > > > >> approach for stream joins which works as follows:
> > > > > >>
> > > > > >> Instead of pairing windows for joins, we do element against
> window
> > > > > joins.
> > > > > >> What this means is that whenever we receive an element from one
> of
> > > the
> > > > > >> streams, we join this element with the current window(this
> window
> > is
> > > > > >> constantly updated) of the other stream. This is non-blocking on
> > any
> > > > > window
> > > > > >> definitions as we dont have to wait for windows to be completed
> > and
> > > we
> > > > > can
> > > > > >> use this with any of our predefined policies like Time.of(...),
> > > > > >> Count.of(...), Delta.of(....).
> > > > > >>
> > > > > >> Additionally this also allows some very flexible way of defining
> > > > window
> > > > > >> joins. With this we could also define grouped windowing inside
> if
> > a
> > > > > join.
> > > > > >> An example of this would be: Join all elements of Stream1 with
> the
> > > > last
> > > > > 5
> > > > > >> elements by a given windowkey of Stream2 on some join key.
> > > > > >>
> > > > > >> This feature can be easily implemented over the current
> operators,
> > > so
> > > > I
> > > > > >> already have a working prototype for the simple non-grouped
> case.
> > My
> > > > > only
> > > > > >> concern is the API, the best thing I could come up with is
> > something
> > > > > like
> > > > > >> this:
> > > > > >>
> > > > > >> stream_A.join(stream_B).onWindow(windowDefA,
> > > > windowDefB).by(windowKey1,
> > > > > >> windowKey2).where(...).equalTo(...).with(...)
> > > > > >>
> > > > > >> (the user can omit the "by" and "with" calls)
> > > > > >>
> > > > > >> I think this new approach would be worthy of our "flexible
> > > windowing"
> > > > in
> > > > > >> contrast with the current approach.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Gyula
> > > > > >>
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Matthias J. Sax
Hi @all,

please keep me in the loop for this work. I am highly interested and I
want to help on it.

My initial thoughts are as follows:

 1) Currently, system timestamps are used and the suggested approach can
be seen as state-of-the-art (there is actually a research paper using
the exact same join semantic). Of course, the current approach is
inherently non-deterministic. The advantage is, that there is no
overhead in keeping track of the order of records and the latency should
be very low. (Additionally, state-recovery is simplified. Because, the
processing in inherently non-deterministic, recovery can be done with
relaxed guarantees).

  2) The user should be able to "switch on" deterministic processing,
ie, records are timestamped (either externally when generated, or
timestamped at the sources). Because deterministic processing adds some
overhead, the user should decide for it actively.
In this case, the order must be preserved in each re-distribution step
(merging is sufficient, if order is preserved within each incoming
channel). Furthermore, deterministic processing can be achieved by sound
window semantics (and there is a bunch of them). Even for
single-stream-windows it's a tricky problem; for join-windows it's even
harder. From my point of view, it is less important which semantics are
chosen; however, the user must be aware how it works. The most tricky
part for deterministic processing, is to deal with duplicate timestamps
(which cannot be avoided). The timestamping for (intermediate) result
tuples, is also an important question to be answered.


-Matthias


On 04/07/2015 11:37 AM, Gyula Fóra wrote:

> Hey,
>
> I agree with Kostas, if we define the exact semantics how this works, this
> is not more ad-hoc than any other stateful operator with multiple inputs.
> (And I don't think any other system support something similar)
>
> We need to make some design choices that are similar to the issues we had
> for windowing. We need to chose how we want to evaluate the windowing
> policies (global or local) because that affects what kind of policies can
> be parallel, but I can work on these things.
>
> I think this is an amazing feature, so I wouldn't necessarily rush the
> implementation for 0.9 though.
>
> And thanks for helping writing these down.
>
> Gyula
>
> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email]> wrote:
>
>> Yes, we should write these semantics down. I volunteer to help.
>>
>> I don't think that this is very ad-hoc. The semantics are basically the
>> following. Assuming an arriving element from the left side:
>> (1) We find the right-side matches
>> (2) We insert the left-side arrival into the left window
>> (3) We recompute the left window
>> We need to see whether right window re-computation needs to be triggered as
>> well. I think that this way of joining streams is also what the symmetric
>> hash join algorithms were meant to support.
>>
>> Kostas
>>
>>
>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
>>
>>> Is the approach of joining an element at a time from one input against a
>>> window on the other input not a bit arbitrary?
>>>
>>> This just joins whatever currently happens to be the window by the time
>> the
>>> single element arrives - that is a bit non-predictable, right?
>>>
>>> As a more general point: The whole semantics of windowing and when they
>> are
>>> triggered are a bit ad-hoc now. It would be really good to start
>>> formalizing that a bit and
>>> put it down somewhere. Users need to be able to clearly understand and
>> how
>>> to predict the output.
>>>
>>>
>>>
>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email]>
>> wrote:
>>>
>>>> I think it should be possible to make this compatible with the
>>>> .window().every() calls. Maybe if there is some trigger set in "every"
>> we
>>>> would not join that stream 1 by 1 but every so many elements. The
>> problem
>>>> here is that the window and every in this case are very-very different
>>> than
>>>> the normal windowing semantics. The window would define the join window
>>> for
>>>> each element of the other stream while every would define how often I
>>> join
>>>> This stream with the other one.
>>>>
>>>> We need to think to make this intuitive.
>>>>
>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
>>> [hidden email]>
>>>> wrote:
>>>>
>>>>> That would be really neat, the problem I see there, that we do not
>>>>> distinguish between dataStream.window() and
>> dataStream.window().every()
>>>>> currently, they both return WindowedDataStreams and TriggerPolicies
>> of
>>>> the
>>>>> every call do not make much sense in this setting (in fact
>> practically
>>>> the
>>>>> trigger is always set to count of one).
>>>>>
>>>>> But of course we could make it in a way, that we check that the
>>> eviction
>>>>> should be either null or count of 1, in every other case we throw an
>>>>> exception while building the JobGraph.
>>>>>
>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Or you could define it like this:
>>>>>>
>>>>>> stream_A = a.window(...)
>>>>>> stream_B = b.window(...)
>>>>>>
>>>>>> stream_A.join(stream_B).where().equals().with()
>>>>>>
>>>>>> So a join would just be a join of two WindowedDataStreamS. This
>> would
>>>>>> neatly move the windowing stuff into one place.
>>>>>>
>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>> [hidden email]
>>>>>>
>>>>>> wrote:
>>>>>>> Big +1 for the proposal for Peter and Gyula. I'm really for
>>> bringing
>>>>> the
>>>>>>> windowing and window join API in sync.
>>>>>>>
>>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]>
>>>> wrote:
>>>>>>>
>>>>>>>> Hey guys,
>>>>>>>>
>>>>>>>> As Aljoscha has highlighted earlier the current window join
>>>> semantics
>>>>> in
>>>>>>>> the streaming api doesn't follow the changes in the windowing
>> api.
>>>>> More
>>>>>>>> precisely, we currently only support joins over time windows of
>>>> equal
>>>>>> size
>>>>>>>> on both streams. The reason for this is that we now take a
>> window
>>> of
>>>>>> each
>>>>>>>> of the two streams and do joins over these pairs. This would be
>> a
>>>>>> blocking
>>>>>>>> operation if the windows are not closed at exactly the same time
>>>> (and
>>>>>> since
>>>>>>>> we dont want this we only allow time windows)
>>>>>>>>
>>>>>>>> I talked with Peter who came up with the initial idea of an
>>>>> alternative
>>>>>>>> approach for stream joins which works as follows:
>>>>>>>>
>>>>>>>> Instead of pairing windows for joins, we do element against
>> window
>>>>>> joins.
>>>>>>>> What this means is that whenever we receive an element from one
>> of
>>>> the
>>>>>>>> streams, we join this element with the current window(this
>> window
>>> is
>>>>>>>> constantly updated) of the other stream. This is non-blocking on
>>> any
>>>>>> window
>>>>>>>> definitions as we dont have to wait for windows to be completed
>>> and
>>>> we
>>>>>> can
>>>>>>>> use this with any of our predefined policies like Time.of(...),
>>>>>>>> Count.of(...), Delta.of(....).
>>>>>>>>
>>>>>>>> Additionally this also allows some very flexible way of defining
>>>>> window
>>>>>>>> joins. With this we could also define grouped windowing inside
>> if
>>> a
>>>>>> join.
>>>>>>>> An example of this would be: Join all elements of Stream1 with
>> the
>>>>> last
>>>>>> 5
>>>>>>>> elements by a given windowkey of Stream2 on some join key.
>>>>>>>>
>>>>>>>> This feature can be easily implemented over the current
>> operators,
>>>> so
>>>>> I
>>>>>>>> already have a working prototype for the simple non-grouped
>> case.
>>> My
>>>>>> only
>>>>>>>> concern is the API, the best thing I could come up with is
>>> something
>>>>>> like
>>>>>>>> this:
>>>>>>>>
>>>>>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>>> windowDefB).by(windowKey1,
>>>>>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>>>>>
>>>>>>>> (the user can omit the "by" and "with" calls)
>>>>>>>>
>>>>>>>> I think this new approach would be worthy of our "flexible
>>>> windowing"
>>>>> in
>>>>>>>> contrast with the current approach.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Gyula
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Paris Carbone
Hello Matthias,

Sure, ordering guarantees are indeed a tricky thing, I recall having that discussion back in TU Berlin. Bear in mind thought that DataStream, our abstract data type, represents a *partitioned* unbounded sequence of events. There are no *global* ordering guarantees made whatsoever in that model across partitions. If you see it more generally there are many “race conditions” in a distributed execution graph of vertices that process multiple inputs asynchronously, especially when you add joins and iterations into the mix (how do you deal with reprocessing “old” tuples that iterate in the graph). Btw have you checked the Naiad paper [1]? Stephan cited a while ago and it is quite relevant to that discussion.

Also, can you cite the paper with the joining semantics you are referring to? That would be of good help I think.

Paris

[1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf

<https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>

<https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
On 07 Apr 2015, at 11:50, Matthias J. Sax <[hidden email]<mailto:[hidden email]>> wrote:

Hi @all,

please keep me in the loop for this work. I am highly interested and I
want to help on it.

My initial thoughts are as follows:

1) Currently, system timestamps are used and the suggested approach can
be seen as state-of-the-art (there is actually a research paper using
the exact same join semantic). Of course, the current approach is
inherently non-deterministic. The advantage is, that there is no
overhead in keeping track of the order of records and the latency should
be very low. (Additionally, state-recovery is simplified. Because, the
processing in inherently non-deterministic, recovery can be done with
relaxed guarantees).

 2) The user should be able to "switch on" deterministic processing,
ie, records are timestamped (either externally when generated, or
timestamped at the sources). Because deterministic processing adds some
overhead, the user should decide for it actively.
In this case, the order must be preserved in each re-distribution step
(merging is sufficient, if order is preserved within each incoming
channel). Furthermore, deterministic processing can be achieved by sound
window semantics (and there is a bunch of them). Even for
single-stream-windows it's a tricky problem; for join-windows it's even
harder. From my point of view, it is less important which semantics are
chosen; however, the user must be aware how it works. The most tricky
part for deterministic processing, is to deal with duplicate timestamps
(which cannot be avoided). The timestamping for (intermediate) result
tuples, is also an important question to be answered.


-Matthias


On 04/07/2015 11:37 AM, Gyula Fóra wrote:
Hey,

I agree with Kostas, if we define the exact semantics how this works, this
is not more ad-hoc than any other stateful operator with multiple inputs.
(And I don't think any other system support something similar)

We need to make some design choices that are similar to the issues we had
for windowing. We need to chose how we want to evaluate the windowing
policies (global or local) because that affects what kind of policies can
be parallel, but I can work on these things.

I think this is an amazing feature, so I wouldn't necessarily rush the
implementation for 0.9 though.

And thanks for helping writing these down.

Gyula

On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email]<mailto:[hidden email]>> wrote:

Yes, we should write these semantics down. I volunteer to help.

I don't think that this is very ad-hoc. The semantics are basically the
following. Assuming an arriving element from the left side:
(1) We find the right-side matches
(2) We insert the left-side arrival into the left window
(3) We recompute the left window
We need to see whether right window re-computation needs to be triggered as
well. I think that this way of joining streams is also what the symmetric
hash join algorithms were meant to support.

Kostas


On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email]<mailto:[hidden email]>> wrote:

Is the approach of joining an element at a time from one input against a
window on the other input not a bit arbitrary?

This just joins whatever currently happens to be the window by the time
the
single element arrives - that is a bit non-predictable, right?

As a more general point: The whole semantics of windowing and when they
are
triggered are a bit ad-hoc now. It would be really good to start
formalizing that a bit and
put it down somewhere. Users need to be able to clearly understand and
how
to predict the output.



On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email]<mailto:[hidden email]>>
wrote:

I think it should be possible to make this compatible with the
.window().every() calls. Maybe if there is some trigger set in "every"
we
would not join that stream 1 by 1 but every so many elements. The
problem
here is that the window and every in this case are very-very different
than
the normal windowing semantics. The window would define the join window
for
each element of the other stream while every would define how often I
join
This stream with the other one.

We need to think to make this intuitive.

On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
[hidden email]<mailto:[hidden email]>>
wrote:

That would be really neat, the problem I see there, that we do not
distinguish between dataStream.window() and
dataStream.window().every()
currently, they both return WindowedDataStreams and TriggerPolicies
of
the
every call do not make much sense in this setting (in fact
practically
the
trigger is always set to count of one).

But of course we could make it in a way, that we check that the
eviction
should be either null or count of 1, in every other case we throw an
exception while building the JobGraph.

On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
[hidden email]<mailto:[hidden email]>>
wrote:

Or you could define it like this:

stream_A = a.window(...)
stream_B = b.window(...)

stream_A.join(stream_B).where().equals().with()

So a join would just be a join of two WindowedDataStreamS. This
would
neatly move the windowing stuff into one place.

On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
[hidden email]<mailto:[hidden email]>

wrote:
Big +1 for the proposal for Peter and Gyula. I'm really for
bringing
the
windowing and window join API in sync.

On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]<mailto:[hidden email]>>
wrote:

Hey guys,

As Aljoscha has highlighted earlier the current window join
semantics
in
the streaming api doesn't follow the changes in the windowing
api.
More
precisely, we currently only support joins over time windows of
equal
size
on both streams. The reason for this is that we now take a
window
of
each
of the two streams and do joins over these pairs. This would be
a
blocking
operation if the windows are not closed at exactly the same time
(and
since
we dont want this we only allow time windows)

I talked with Peter who came up with the initial idea of an
alternative
approach for stream joins which works as follows:

Instead of pairing windows for joins, we do element against
window
joins.
What this means is that whenever we receive an element from one
of
the
streams, we join this element with the current window(this
window
is
constantly updated) of the other stream. This is non-blocking on
any
window
definitions as we dont have to wait for windows to be completed
and
we
can
use this with any of our predefined policies like Time.of(...),
Count.of(...), Delta.of(....).

Additionally this also allows some very flexible way of defining
window
joins. With this we could also define grouped windowing inside
if
a
join.
An example of this would be: Join all elements of Stream1 with
the
last
5
elements by a given windowkey of Stream2 on some join key.

This feature can be easily implemented over the current
operators,
so
I
already have a working prototype for the simple non-grouped
case.
My
only
concern is the API, the best thing I could come up with is
something
like
this:

stream_A.join(stream_B).onWindow(windowDefA,
windowDefB).by(windowKey1,
windowKey2).where(...).equalTo(...).with(...)

(the user can omit the "by" and "with" calls)

I think this new approach would be worthy of our "flexible
windowing"
in
contrast with the current approach.

Regards,
Gyula









Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Bruno Cadonna
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi Paris,

what's the reason for not guaranteeing global ordering across partitions
in the stream model? Is it the smaller overhead or are there any
operations not computable in a distributed environment with global
ordering?

In any case, I agree with Matthias that the user should choose. If
operations were not computable with a global ordering, I would
restrict the set of operations for that mode.

Maybe, it would also be helpful to collect use cases for each of the
modes proposed by Matthias to understand the requirements for both modes.

Some (researchy) thoughts about indeterminism: How can the
indeterminism of the current setting be quantified? How "large" can it
grow with the current setting? Are there any limits that can be
guaranteed?

Cheers,
Bruno

On 07.04.2015 12:38, Paris Carbone wrote:

> Hello Matthias,
>
> Sure, ordering guarantees are indeed a tricky thing, I recall
> having that discussion back in TU Berlin. Bear in mind thought that
> DataStream, our abstract data type, represents a *partitioned*
> unbounded sequence of events. There are no *global* ordering
> guarantees made whatsoever in that model across partitions. If you
> see it more generally there are many “race conditions” in a
> distributed execution graph of vertices that process multiple
> inputs asynchronously, especially when you add joins and iterations
> into the mix (how do you deal with reprocessing “old” tuples that
> iterate in the graph). Btw have you checked the Naiad paper [1]?
> Stephan cited a while ago and it is quite relevant to that
> discussion.
>
> Also, can you cite the paper with the joining semantics you are
> referring to? That would be of good help I think.
>
> Paris
>
> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
> Apr 2015, at 11:50, Matthias J. Sax
> <[hidden email]<mailto:[hidden email]>>
> wrote:
>
> Hi @all,
>
> please keep me in the loop for this work. I am highly interested
> and I want to help on it.
>
> My initial thoughts are as follows:
>
> 1) Currently, system timestamps are used and the suggested approach
> can be seen as state-of-the-art (there is actually a research paper
> using the exact same join semantic). Of course, the current
> approach is inherently non-deterministic. The advantage is, that
> there is no overhead in keeping track of the order of records and
> the latency should be very low. (Additionally, state-recovery is
> simplified. Because, the processing in inherently
> non-deterministic, recovery can be done with relaxed guarantees).
>
> 2) The user should be able to "switch on" deterministic
> processing, ie, records are timestamped (either externally when
> generated, or timestamped at the sources). Because deterministic
> processing adds some overhead, the user should decide for it
> actively. In this case, the order must be preserved in each
> re-distribution step (merging is sufficient, if order is preserved
> within each incoming channel). Furthermore, deterministic
> processing can be achieved by sound window semantics (and there is
> a bunch of them). Even for single-stream-windows it's a tricky
> problem; for join-windows it's even harder. From my point of view,
> it is less important which semantics are chosen; however, the user
> must be aware how it works. The most tricky part for deterministic
> processing, is to deal with duplicate timestamps (which cannot be
> avoided). The timestamping for (intermediate) result tuples, is
> also an important question to be answered.
>
>
> -Matthias
>
>
> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
>
> I agree with Kostas, if we define the exact semantics how this
> works, this is not more ad-hoc than any other stateful operator
> with multiple inputs. (And I don't think any other system support
> something similar)
>
> We need to make some design choices that are similar to the issues
> we had for windowing. We need to chose how we want to evaluate the
> windowing policies (global or local) because that affects what kind
> of policies can be parallel, but I can work on these things.
>
> I think this is an amazing feature, so I wouldn't necessarily rush
> the implementation for 0.9 though.
>
> And thanks for helping writing these down.
>
> Gyula
>
> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
> <[hidden email]<mailto:[hidden email]>> wrote:
>
> Yes, we should write these semantics down. I volunteer to help.
>
> I don't think that this is very ad-hoc. The semantics are basically
> the following. Assuming an arriving element from the left side: (1)
> We find the right-side matches (2) We insert the left-side arrival
> into the left window (3) We recompute the left window We need to
> see whether right window re-computation needs to be triggered as
> well. I think that this way of joining streams is also what the
> symmetric hash join algorithms were meant to support.
>
> Kostas
>
>
> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
> <[hidden email]<mailto:[hidden email]>> wrote:
>
> Is the approach of joining an element at a time from one input
> against a window on the other input not a bit arbitrary?
>
> This just joins whatever currently happens to be the window by the
> time the single element arrives - that is a bit non-predictable,
> right?
>
> As a more general point: The whole semantics of windowing and when
> they are triggered are a bit ad-hoc now. It would be really good to
> start formalizing that a bit and put it down somewhere. Users need
> to be able to clearly understand and how to predict the output.
>
>
>
> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
> <[hidden email]<mailto:[hidden email]>> wrote:
>
> I think it should be possible to make this compatible with the
> .window().every() calls. Maybe if there is some trigger set in
> "every" we would not join that stream 1 by 1 but every so many
> elements. The problem here is that the window and every in this
> case are very-very different than the normal windowing semantics.
> The window would define the join window for each element of the
> other stream while every would define how often I join This stream
> with the other one.
>
> We need to think to make this intuitive.
>
> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> [hidden email]<mailto:[hidden email]>> wrote:
>
> That would be really neat, the problem I see there, that we do not
> distinguish between dataStream.window() and
> dataStream.window().every() currently, they both return
> WindowedDataStreams and TriggerPolicies of the every call do not
> make much sense in this setting (in fact practically the trigger is
> always set to count of one).
>
> But of course we could make it in a way, that we check that the
> eviction should be either null or count of 1, in every other case
> we throw an exception while building the JobGraph.
>
> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> [hidden email]<mailto:[hidden email]>> wrote:
>
> Or you could define it like this:
>
> stream_A = a.window(...) stream_B = b.window(...)
>
> stream_A.join(stream_B).where().equals().with()
>
> So a join would just be a join of two WindowedDataStreamS. This
> would neatly move the windowing stuff into one place.
>
> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> [hidden email]<mailto:[hidden email]>
>
> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
> bringing the windowing and window join API in sync.
>
> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
> <[hidden email]<mailto:[hidden email]>> wrote:
>
> Hey guys,
>
> As Aljoscha has highlighted earlier the current window join
> semantics in the streaming api doesn't follow the changes in the
> windowing api. More precisely, we currently only support joins over
> time windows of equal size on both streams. The reason for this is
> that we now take a window of each of the two streams and do joins
> over these pairs. This would be a blocking operation if the windows
> are not closed at exactly the same time (and since we dont want
> this we only allow time windows)
>
> I talked with Peter who came up with the initial idea of an
> alternative approach for stream joins which works as follows:
>
> Instead of pairing windows for joins, we do element against window
> joins. What this means is that whenever we receive an element from
> one of the streams, we join this element with the current
> window(this window is constantly updated) of the other stream. This
> is non-blocking on any window definitions as we dont have to wait
> for windows to be completed and we can use this with any of our
> predefined policies like Time.of(...), Count.of(...),
> Delta.of(....).
>
> Additionally this also allows some very flexible way of defining
> window joins. With this we could also define grouped windowing
> inside if a join. An example of this would be: Join all elements of
> Stream1 with the last 5 elements by a given windowkey of Stream2 on
> some join key.
>
> This feature can be easily implemented over the current operators,
> so I already have a working prototype for the simple non-grouped
> case. My only concern is the API, the best thing I could come up
> with is something like this:
>
> stream_A.join(stream_B).onWindow(windowDefA,
> windowDefB).by(windowKey1,
> windowKey2).where(...).equalTo(...).with(...)
>
> (the user can omit the "by" and "with" calls)
>
> I think this new approach would be worthy of our "flexible
> windowing" in contrast with the current approach.
>
> Regards, Gyula
>
>
>
>
>
>
>
>
>

- --
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)

iQEcBAEBAgAGBQJVJPQZAAoJEKdCIJx7flKwEswH/1FJXdNZBdy2Gbr5CHbNK+wJ
EhPvFEGvSa6hE6dXruk1ZofqdTHt53xtivZtY1rzZwp+uucw7Diy9eyRsShVCLA5
18V5xqvcyTINqUEU48gcw2amQpC49GsE7H4gZVGAIHBHcmuRDH6nJO/Ng0aO5riV
lmEZbdqNH3GyNGADW5gIOupptWloFqEJSAB2GZb9/Q8LG/bdnZhgXp4rAEfVDIgq
b2Y0N+o6bC3VCxQa5kdeKemTRizpXDqzlGtExemGo4lwjZBtnmVw4i1TAcCTyuFi
5enUGvMzgMx7Olg/4vZs8L1yVhKJ45W5Aeypk9oyn/f8V70DP0Q1MPg2CdHzxFE=
=C9Mj
-----END PGP SIGNATURE-----
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Stephan Ewen
Here is the state in Flink and why we have chosen not to do global ordering
at the moment:

 - Individual streams are FIFO, that means if the sender emits in order,
the receiver receives in order.

 - When streams are merged (think shuffle / partition-by), then the streams
are not merged, but buffers from the streams are taken as the come in.

  - We had a version that merged streams (for sort merging in batch
programs, actually) long ago, an it performed either horribly or
deadlocked. The reason is that all streams are always stalled if a buffer
is missing from one stream, since the merge cannot continue in such a case
(head-of-the-line waiting). That backpressures streams unnecessarily,
slowing down computation. If the streams depend mutually on each other
(think two partitioning steps), they frequently dadlock completely.

  - The only way to do that is by stalling/buffering/punctuating streams
continuously, which is a lot of work to implement and will definitely cost
performance.

Therefore we have decided going for a simpler model without global ordering
for now. If we start seeing that this has sever limitations in practice, we
may reconsider that.



On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
[hidden email]> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Hi Paris,
>
> what's the reason for not guaranteeing global ordering across partitions
> in the stream model? Is it the smaller overhead or are there any
> operations not computable in a distributed environment with global
> ordering?
>
> In any case, I agree with Matthias that the user should choose. If
> operations were not computable with a global ordering, I would
> restrict the set of operations for that mode.
>
> Maybe, it would also be helpful to collect use cases for each of the
> modes proposed by Matthias to understand the requirements for both modes.
>
> Some (researchy) thoughts about indeterminism: How can the
> indeterminism of the current setting be quantified? How "large" can it
> grow with the current setting? Are there any limits that can be
> guaranteed?
>
> Cheers,
> Bruno
>
> On 07.04.2015 12:38, Paris Carbone wrote:
> > Hello Matthias,
> >
> > Sure, ordering guarantees are indeed a tricky thing, I recall
> > having that discussion back in TU Berlin. Bear in mind thought that
> > DataStream, our abstract data type, represents a *partitioned*
> > unbounded sequence of events. There are no *global* ordering
> > guarantees made whatsoever in that model across partitions. If you
> > see it more generally there are many “race conditions” in a
> > distributed execution graph of vertices that process multiple
> > inputs asynchronously, especially when you add joins and iterations
> > into the mix (how do you deal with reprocessing “old” tuples that
> > iterate in the graph). Btw have you checked the Naiad paper [1]?
> > Stephan cited a while ago and it is quite relevant to that
> > discussion.
> >
> > Also, can you cite the paper with the joining semantics you are
> > referring to? That would be of good help I think.
> >
> > Paris
> >
> > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> >
> > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> >
> > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
> > Apr 2015, at 11:50, Matthias J. Sax
> > <[hidden email]<mailto:[hidden email]>>
> > wrote:
> >
> > Hi @all,
> >
> > please keep me in the loop for this work. I am highly interested
> > and I want to help on it.
> >
> > My initial thoughts are as follows:
> >
> > 1) Currently, system timestamps are used and the suggested approach
> > can be seen as state-of-the-art (there is actually a research paper
> > using the exact same join semantic). Of course, the current
> > approach is inherently non-deterministic. The advantage is, that
> > there is no overhead in keeping track of the order of records and
> > the latency should be very low. (Additionally, state-recovery is
> > simplified. Because, the processing in inherently
> > non-deterministic, recovery can be done with relaxed guarantees).
> >
> > 2) The user should be able to "switch on" deterministic
> > processing, ie, records are timestamped (either externally when
> > generated, or timestamped at the sources). Because deterministic
> > processing adds some overhead, the user should decide for it
> > actively. In this case, the order must be preserved in each
> > re-distribution step (merging is sufficient, if order is preserved
> > within each incoming channel). Furthermore, deterministic
> > processing can be achieved by sound window semantics (and there is
> > a bunch of them). Even for single-stream-windows it's a tricky
> > problem; for join-windows it's even harder. From my point of view,
> > it is less important which semantics are chosen; however, the user
> > must be aware how it works. The most tricky part for deterministic
> > processing, is to deal with duplicate timestamps (which cannot be
> > avoided). The timestamping for (intermediate) result tuples, is
> > also an important question to be answered.
> >
> >
> > -Matthias
> >
> >
> > On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
> >
> > I agree with Kostas, if we define the exact semantics how this
> > works, this is not more ad-hoc than any other stateful operator
> > with multiple inputs. (And I don't think any other system support
> > something similar)
> >
> > We need to make some design choices that are similar to the issues
> > we had for windowing. We need to chose how we want to evaluate the
> > windowing policies (global or local) because that affects what kind
> > of policies can be parallel, but I can work on these things.
> >
> > I think this is an amazing feature, so I wouldn't necessarily rush
> > the implementation for 0.9 though.
> >
> > And thanks for helping writing these down.
> >
> > Gyula
> >
> > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
> > <[hidden email]<mailto:[hidden email]>> wrote:
> >
> > Yes, we should write these semantics down. I volunteer to help.
> >
> > I don't think that this is very ad-hoc. The semantics are basically
> > the following. Assuming an arriving element from the left side: (1)
> > We find the right-side matches (2) We insert the left-side arrival
> > into the left window (3) We recompute the left window We need to
> > see whether right window re-computation needs to be triggered as
> > well. I think that this way of joining streams is also what the
> > symmetric hash join algorithms were meant to support.
> >
> > Kostas
> >
> >
> > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
> > <[hidden email]<mailto:[hidden email]>> wrote:
> >
> > Is the approach of joining an element at a time from one input
> > against a window on the other input not a bit arbitrary?
> >
> > This just joins whatever currently happens to be the window by the
> > time the single element arrives - that is a bit non-predictable,
> > right?
> >
> > As a more general point: The whole semantics of windowing and when
> > they are triggered are a bit ad-hoc now. It would be really good to
> > start formalizing that a bit and put it down somewhere. Users need
> > to be able to clearly understand and how to predict the output.
> >
> >
> >
> > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
> > <[hidden email]<mailto:[hidden email]>> wrote:
> >
> > I think it should be possible to make this compatible with the
> > .window().every() calls. Maybe if there is some trigger set in
> > "every" we would not join that stream 1 by 1 but every so many
> > elements. The problem here is that the window and every in this
> > case are very-very different than the normal windowing semantics.
> > The window would define the join window for each element of the
> > other stream while every would define how often I join This stream
> > with the other one.
> >
> > We need to think to make this intuitive.
> >
> > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> > [hidden email]<mailto:[hidden email]>> wrote:
> >
> > That would be really neat, the problem I see there, that we do not
> > distinguish between dataStream.window() and
> > dataStream.window().every() currently, they both return
> > WindowedDataStreams and TriggerPolicies of the every call do not
> > make much sense in this setting (in fact practically the trigger is
> > always set to count of one).
> >
> > But of course we could make it in a way, that we check that the
> > eviction should be either null or count of 1, in every other case
> > we throw an exception while building the JobGraph.
> >
> > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> > [hidden email]<mailto:[hidden email]>> wrote:
> >
> > Or you could define it like this:
> >
> > stream_A = a.window(...) stream_B = b.window(...)
> >
> > stream_A.join(stream_B).where().equals().with()
> >
> > So a join would just be a join of two WindowedDataStreamS. This
> > would neatly move the windowing stuff into one place.
> >
> > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> > [hidden email]<mailto:[hidden email]>
> >
> > wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
> > bringing the windowing and window join API in sync.
> >
> > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
> > <[hidden email]<mailto:[hidden email]>> wrote:
> >
> > Hey guys,
> >
> > As Aljoscha has highlighted earlier the current window join
> > semantics in the streaming api doesn't follow the changes in the
> > windowing api. More precisely, we currently only support joins over
> > time windows of equal size on both streams. The reason for this is
> > that we now take a window of each of the two streams and do joins
> > over these pairs. This would be a blocking operation if the windows
> > are not closed at exactly the same time (and since we dont want
> > this we only allow time windows)
> >
> > I talked with Peter who came up with the initial idea of an
> > alternative approach for stream joins which works as follows:
> >
> > Instead of pairing windows for joins, we do element against window
> > joins. What this means is that whenever we receive an element from
> > one of the streams, we join this element with the current
> > window(this window is constantly updated) of the other stream. This
> > is non-blocking on any window definitions as we dont have to wait
> > for windows to be completed and we can use this with any of our
> > predefined policies like Time.of(...), Count.of(...),
> > Delta.of(....).
> >
> > Additionally this also allows some very flexible way of defining
> > window joins. With this we could also define grouped windowing
> > inside if a join. An example of this would be: Join all elements of
> > Stream1 with the last 5 elements by a given windowkey of Stream2 on
> > some join key.
> >
> > This feature can be easily implemented over the current operators,
> > so I already have a working prototype for the simple non-grouped
> > case. My only concern is the API, the best thing I could come up
> > with is something like this:
> >
> > stream_A.join(stream_B).onWindow(windowDefA,
> > windowDefB).by(windowKey1,
> > windowKey2).where(...).equalTo(...).with(...)
> >
> > (the user can omit the "by" and "with" calls)
> >
> > I think this new approach would be worthy of our "flexible
> > windowing" in contrast with the current approach.
> >
> > Regards, Gyula
> >
> >
> >
> >
> >
> >
> >
> >
> >
>
> - --
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.11 (GNU/Linux)
>
> iQEcBAEBAgAGBQJVJPQZAAoJEKdCIJx7flKwEswH/1FJXdNZBdy2Gbr5CHbNK+wJ
> EhPvFEGvSa6hE6dXruk1ZofqdTHt53xtivZtY1rzZwp+uucw7Diy9eyRsShVCLA5
> 18V5xqvcyTINqUEU48gcw2amQpC49GsE7H4gZVGAIHBHcmuRDH6nJO/Ng0aO5riV
> lmEZbdqNH3GyNGADW5gIOupptWloFqEJSAB2GZb9/Q8LG/bdnZhgXp4rAEfVDIgq
> b2Y0N+o6bC3VCxQa5kdeKemTRizpXDqzlGtExemGo4lwjZBtnmVw4i1TAcCTyuFi
> 5enUGvMzgMx7Olg/4vZs8L1yVhKJ45W5Aeypk9oyn/f8V70DP0Q1MPg2CdHzxFE=
> =C9Mj
> -----END PGP SIGNATURE-----
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Matthias J. Sax
This reasoning makes absolutely sense. That's why I suggested, that the
user should actively choose ordered data processing...

About deadlocks: Those can be avoided, if the buffers are consumed
continuously in an in-memory merge buffer (maybe with spilling to disk
if necessary). Of course, latency might suffer and punctuations should
be used to relax this problem.

As far as I understood, CEP might be an interesting use-case for Flink,
and it depend on ordered data streams.


-Matthias



On 04/08/2015 11:50 AM, Stephan Ewen wrote:

> Here is the state in Flink and why we have chosen not to do global ordering
> at the moment:
>
>  - Individual streams are FIFO, that means if the sender emits in order,
> the receiver receives in order.
>
>  - When streams are merged (think shuffle / partition-by), then the streams
> are not merged, but buffers from the streams are taken as the come in.
>
>   - We had a version that merged streams (for sort merging in batch
> programs, actually) long ago, an it performed either horribly or
> deadlocked. The reason is that all streams are always stalled if a buffer
> is missing from one stream, since the merge cannot continue in such a case
> (head-of-the-line waiting). That backpressures streams unnecessarily,
> slowing down computation. If the streams depend mutually on each other
> (think two partitioning steps), they frequently dadlock completely.
>
>   - The only way to do that is by stalling/buffering/punctuating streams
> continuously, which is a lot of work to implement and will definitely cost
> performance.
>
> Therefore we have decided going for a simpler model without global ordering
> for now. If we start seeing that this has sever limitations in practice, we
> may reconsider that.
>
>
>
> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
> [hidden email]> wrote:
>
> Hi Paris,
>
> what's the reason for not guaranteeing global ordering across partitions
> in the stream model? Is it the smaller overhead or are there any
> operations not computable in a distributed environment with global
> ordering?
>
> In any case, I agree with Matthias that the user should choose. If
> operations were not computable with a global ordering, I would
> restrict the set of operations for that mode.
>
> Maybe, it would also be helpful to collect use cases for each of the
> modes proposed by Matthias to understand the requirements for both modes.
>
> Some (researchy) thoughts about indeterminism: How can the
> indeterminism of the current setting be quantified? How "large" can it
> grow with the current setting? Are there any limits that can be
> guaranteed?
>
> Cheers,
> Bruno
>
> On 07.04.2015 12:38, Paris Carbone wrote:
>>>> Hello Matthias,
>>>>
>>>> Sure, ordering guarantees are indeed a tricky thing, I recall
>>>> having that discussion back in TU Berlin. Bear in mind thought that
>>>> DataStream, our abstract data type, represents a *partitioned*
>>>> unbounded sequence of events. There are no *global* ordering
>>>> guarantees made whatsoever in that model across partitions. If you
>>>> see it more generally there are many “race conditions” in a
>>>> distributed execution graph of vertices that process multiple
>>>> inputs asynchronously, especially when you add joins and iterations
>>>> into the mix (how do you deal with reprocessing “old” tuples that
>>>> iterate in the graph). Btw have you checked the Naiad paper [1]?
>>>> Stephan cited a while ago and it is quite relevant to that
>>>> discussion.
>>>>
>>>> Also, can you cite the paper with the joining semantics you are
>>>> referring to? That would be of good help I think.
>>>>
>>>> Paris
>>>>
>>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>
>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>
>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
>>>> Apr 2015, at 11:50, Matthias J. Sax
>>>> <[hidden email]<mailto:[hidden email]>>
>>>> wrote:
>>>>
>>>> Hi @all,
>>>>
>>>> please keep me in the loop for this work. I am highly interested
>>>> and I want to help on it.
>>>>
>>>> My initial thoughts are as follows:
>>>>
>>>> 1) Currently, system timestamps are used and the suggested approach
>>>> can be seen as state-of-the-art (there is actually a research paper
>>>> using the exact same join semantic). Of course, the current
>>>> approach is inherently non-deterministic. The advantage is, that
>>>> there is no overhead in keeping track of the order of records and
>>>> the latency should be very low. (Additionally, state-recovery is
>>>> simplified. Because, the processing in inherently
>>>> non-deterministic, recovery can be done with relaxed guarantees).
>>>>
>>>> 2) The user should be able to "switch on" deterministic
>>>> processing, ie, records are timestamped (either externally when
>>>> generated, or timestamped at the sources). Because deterministic
>>>> processing adds some overhead, the user should decide for it
>>>> actively. In this case, the order must be preserved in each
>>>> re-distribution step (merging is sufficient, if order is preserved
>>>> within each incoming channel). Furthermore, deterministic
>>>> processing can be achieved by sound window semantics (and there is
>>>> a bunch of them). Even for single-stream-windows it's a tricky
>>>> problem; for join-windows it's even harder. From my point of view,
>>>> it is less important which semantics are chosen; however, the user
>>>> must be aware how it works. The most tricky part for deterministic
>>>> processing, is to deal with duplicate timestamps (which cannot be
>>>> avoided). The timestamping for (intermediate) result tuples, is
>>>> also an important question to be answered.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
>>>>
>>>> I agree with Kostas, if we define the exact semantics how this
>>>> works, this is not more ad-hoc than any other stateful operator
>>>> with multiple inputs. (And I don't think any other system support
>>>> something similar)
>>>>
>>>> We need to make some design choices that are similar to the issues
>>>> we had for windowing. We need to chose how we want to evaluate the
>>>> windowing policies (global or local) because that affects what kind
>>>> of policies can be parallel, but I can work on these things.
>>>>
>>>> I think this is an amazing feature, so I wouldn't necessarily rush
>>>> the implementation for 0.9 though.
>>>>
>>>> And thanks for helping writing these down.
>>>>
>>>> Gyula
>>>>
>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>
>>>> Yes, we should write these semantics down. I volunteer to help.
>>>>
>>>> I don't think that this is very ad-hoc. The semantics are basically
>>>> the following. Assuming an arriving element from the left side: (1)
>>>> We find the right-side matches (2) We insert the left-side arrival
>>>> into the left window (3) We recompute the left window We need to
>>>> see whether right window re-computation needs to be triggered as
>>>> well. I think that this way of joining streams is also what the
>>>> symmetric hash join algorithms were meant to support.
>>>>
>>>> Kostas
>>>>
>>>>
>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>
>>>> Is the approach of joining an element at a time from one input
>>>> against a window on the other input not a bit arbitrary?
>>>>
>>>> This just joins whatever currently happens to be the window by the
>>>> time the single element arrives - that is a bit non-predictable,
>>>> right?
>>>>
>>>> As a more general point: The whole semantics of windowing and when
>>>> they are triggered are a bit ad-hoc now. It would be really good to
>>>> start formalizing that a bit and put it down somewhere. Users need
>>>> to be able to clearly understand and how to predict the output.
>>>>
>>>>
>>>>
>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>
>>>> I think it should be possible to make this compatible with the
>>>> .window().every() calls. Maybe if there is some trigger set in
>>>> "every" we would not join that stream 1 by 1 but every so many
>>>> elements. The problem here is that the window and every in this
>>>> case are very-very different than the normal windowing semantics.
>>>> The window would define the join window for each element of the
>>>> other stream while every would define how often I join This stream
>>>> with the other one.
>>>>
>>>> We need to think to make this intuitive.
>>>>
>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
>>>> [hidden email]<mailto:[hidden email]>> wrote:
>>>>
>>>> That would be really neat, the problem I see there, that we do not
>>>> distinguish between dataStream.window() and
>>>> dataStream.window().every() currently, they both return
>>>> WindowedDataStreams and TriggerPolicies of the every call do not
>>>> make much sense in this setting (in fact practically the trigger is
>>>> always set to count of one).
>>>>
>>>> But of course we could make it in a way, that we check that the
>>>> eviction should be either null or count of 1, in every other case
>>>> we throw an exception while building the JobGraph.
>>>>
>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
>>>> [hidden email]<mailto:[hidden email]>> wrote:
>>>>
>>>> Or you could define it like this:
>>>>
>>>> stream_A = a.window(...) stream_B = b.window(...)
>>>>
>>>> stream_A.join(stream_B).where().equals().with()
>>>>
>>>> So a join would just be a join of two WindowedDataStreamS. This
>>>> would neatly move the windowing stuff into one place.
>>>>
>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>> [hidden email]<mailto:[hidden email]>
>>>>
>>>> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
>>>> bringing the windowing and window join API in sync.
>>>>
>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>
>>>> Hey guys,
>>>>
>>>> As Aljoscha has highlighted earlier the current window join
>>>> semantics in the streaming api doesn't follow the changes in the
>>>> windowing api. More precisely, we currently only support joins over
>>>> time windows of equal size on both streams. The reason for this is
>>>> that we now take a window of each of the two streams and do joins
>>>> over these pairs. This would be a blocking operation if the windows
>>>> are not closed at exactly the same time (and since we dont want
>>>> this we only allow time windows)
>>>>
>>>> I talked with Peter who came up with the initial idea of an
>>>> alternative approach for stream joins which works as follows:
>>>>
>>>> Instead of pairing windows for joins, we do element against window
>>>> joins. What this means is that whenever we receive an element from
>>>> one of the streams, we join this element with the current
>>>> window(this window is constantly updated) of the other stream. This
>>>> is non-blocking on any window definitions as we dont have to wait
>>>> for windows to be completed and we can use this with any of our
>>>> predefined policies like Time.of(...), Count.of(...),
>>>> Delta.of(....).
>>>>
>>>> Additionally this also allows some very flexible way of defining
>>>> window joins. With this we could also define grouped windowing
>>>> inside if a join. An example of this would be: Join all elements of
>>>> Stream1 with the last 5 elements by a given windowkey of Stream2 on
>>>> some join key.
>>>>
>>>> This feature can be easily implemented over the current operators,
>>>> so I already have a working prototype for the simple non-grouped
>>>> case. My only concern is the API, the best thing I could come up
>>>> with is something like this:
>>>>
>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>> windowDefB).by(windowKey1,
>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>
>>>> (the user can omit the "by" and "with" calls)
>>>>
>>>> I think this new approach would be worthy of our "flexible
>>>> windowing" in contrast with the current approach.
>>>>
>>>> Regards, Gyula
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Stephan Ewen
I agree, any ordering guarantees would need to be actively enabled.

How much of CEP depends on fully ordered streams? There is a lot you can do
with windows on event time, which are triggered by punctuations.

This is like a "soft" variant of the ordered streams, where order relation
occurs only with between windows, rather than between all events. That
makes it much cheaper to maintain.

On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax <
[hidden email]> wrote:

> This reasoning makes absolutely sense. That's why I suggested, that the
> user should actively choose ordered data processing...
>
> About deadlocks: Those can be avoided, if the buffers are consumed
> continuously in an in-memory merge buffer (maybe with spilling to disk
> if necessary). Of course, latency might suffer and punctuations should
> be used to relax this problem.
>
> As far as I understood, CEP might be an interesting use-case for Flink,
> and it depend on ordered data streams.
>
>
> -Matthias
>
>
>
> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
> > Here is the state in Flink and why we have chosen not to do global
> ordering
> > at the moment:
> >
> >  - Individual streams are FIFO, that means if the sender emits in order,
> > the receiver receives in order.
> >
> >  - When streams are merged (think shuffle / partition-by), then the
> streams
> > are not merged, but buffers from the streams are taken as the come in.
> >
> >   - We had a version that merged streams (for sort merging in batch
> > programs, actually) long ago, an it performed either horribly or
> > deadlocked. The reason is that all streams are always stalled if a buffer
> > is missing from one stream, since the merge cannot continue in such a
> case
> > (head-of-the-line waiting). That backpressures streams unnecessarily,
> > slowing down computation. If the streams depend mutually on each other
> > (think two partitioning steps), they frequently dadlock completely.
> >
> >   - The only way to do that is by stalling/buffering/punctuating streams
> > continuously, which is a lot of work to implement and will definitely
> cost
> > performance.
> >
> > Therefore we have decided going for a simpler model without global
> ordering
> > for now. If we start seeing that this has sever limitations in practice,
> we
> > may reconsider that.
> >
> >
> >
> > On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
> > [hidden email]> wrote:
> >
> > Hi Paris,
> >
> > what's the reason for not guaranteeing global ordering across partitions
> > in the stream model? Is it the smaller overhead or are there any
> > operations not computable in a distributed environment with global
> > ordering?
> >
> > In any case, I agree with Matthias that the user should choose. If
> > operations were not computable with a global ordering, I would
> > restrict the set of operations for that mode.
> >
> > Maybe, it would also be helpful to collect use cases for each of the
> > modes proposed by Matthias to understand the requirements for both modes.
> >
> > Some (researchy) thoughts about indeterminism: How can the
> > indeterminism of the current setting be quantified? How "large" can it
> > grow with the current setting? Are there any limits that can be
> > guaranteed?
> >
> > Cheers,
> > Bruno
> >
> > On 07.04.2015 12:38, Paris Carbone wrote:
> >>>> Hello Matthias,
> >>>>
> >>>> Sure, ordering guarantees are indeed a tricky thing, I recall
> >>>> having that discussion back in TU Berlin. Bear in mind thought that
> >>>> DataStream, our abstract data type, represents a *partitioned*
> >>>> unbounded sequence of events. There are no *global* ordering
> >>>> guarantees made whatsoever in that model across partitions. If you
> >>>> see it more generally there are many “race conditions” in a
> >>>> distributed execution graph of vertices that process multiple
> >>>> inputs asynchronously, especially when you add joins and iterations
> >>>> into the mix (how do you deal with reprocessing “old” tuples that
> >>>> iterate in the graph). Btw have you checked the Naiad paper [1]?
> >>>> Stephan cited a while ago and it is quite relevant to that
> >>>> discussion.
> >>>>
> >>>> Also, can you cite the paper with the joining semantics you are
> >>>> referring to? That would be of good help I think.
> >>>>
> >>>> Paris
> >>>>
> >>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> >>>>
> >>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> >>>>
> >>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
> >>>> Apr 2015, at 11:50, Matthias J. Sax
> >>>> <[hidden email]<mailto:[hidden email]>>
> >>>> wrote:
> >>>>
> >>>> Hi @all,
> >>>>
> >>>> please keep me in the loop for this work. I am highly interested
> >>>> and I want to help on it.
> >>>>
> >>>> My initial thoughts are as follows:
> >>>>
> >>>> 1) Currently, system timestamps are used and the suggested approach
> >>>> can be seen as state-of-the-art (there is actually a research paper
> >>>> using the exact same join semantic). Of course, the current
> >>>> approach is inherently non-deterministic. The advantage is, that
> >>>> there is no overhead in keeping track of the order of records and
> >>>> the latency should be very low. (Additionally, state-recovery is
> >>>> simplified. Because, the processing in inherently
> >>>> non-deterministic, recovery can be done with relaxed guarantees).
> >>>>
> >>>> 2) The user should be able to "switch on" deterministic
> >>>> processing, ie, records are timestamped (either externally when
> >>>> generated, or timestamped at the sources). Because deterministic
> >>>> processing adds some overhead, the user should decide for it
> >>>> actively. In this case, the order must be preserved in each
> >>>> re-distribution step (merging is sufficient, if order is preserved
> >>>> within each incoming channel). Furthermore, deterministic
> >>>> processing can be achieved by sound window semantics (and there is
> >>>> a bunch of them). Even for single-stream-windows it's a tricky
> >>>> problem; for join-windows it's even harder. From my point of view,
> >>>> it is less important which semantics are chosen; however, the user
> >>>> must be aware how it works. The most tricky part for deterministic
> >>>> processing, is to deal with duplicate timestamps (which cannot be
> >>>> avoided). The timestamping for (intermediate) result tuples, is
> >>>> also an important question to be answered.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
> >>>>
> >>>> I agree with Kostas, if we define the exact semantics how this
> >>>> works, this is not more ad-hoc than any other stateful operator
> >>>> with multiple inputs. (And I don't think any other system support
> >>>> something similar)
> >>>>
> >>>> We need to make some design choices that are similar to the issues
> >>>> we had for windowing. We need to chose how we want to evaluate the
> >>>> windowing policies (global or local) because that affects what kind
> >>>> of policies can be parallel, but I can work on these things.
> >>>>
> >>>> I think this is an amazing feature, so I wouldn't necessarily rush
> >>>> the implementation for 0.9 though.
> >>>>
> >>>> And thanks for helping writing these down.
> >>>>
> >>>> Gyula
> >>>>
> >>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
> >>>> <[hidden email]<mailto:[hidden email]>> wrote:
> >>>>
> >>>> Yes, we should write these semantics down. I volunteer to help.
> >>>>
> >>>> I don't think that this is very ad-hoc. The semantics are basically
> >>>> the following. Assuming an arriving element from the left side: (1)
> >>>> We find the right-side matches (2) We insert the left-side arrival
> >>>> into the left window (3) We recompute the left window We need to
> >>>> see whether right window re-computation needs to be triggered as
> >>>> well. I think that this way of joining streams is also what the
> >>>> symmetric hash join algorithms were meant to support.
> >>>>
> >>>> Kostas
> >>>>
> >>>>
> >>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
> >>>> <[hidden email]<mailto:[hidden email]>> wrote:
> >>>>
> >>>> Is the approach of joining an element at a time from one input
> >>>> against a window on the other input not a bit arbitrary?
> >>>>
> >>>> This just joins whatever currently happens to be the window by the
> >>>> time the single element arrives - that is a bit non-predictable,
> >>>> right?
> >>>>
> >>>> As a more general point: The whole semantics of windowing and when
> >>>> they are triggered are a bit ad-hoc now. It would be really good to
> >>>> start formalizing that a bit and put it down somewhere. Users need
> >>>> to be able to clearly understand and how to predict the output.
> >>>>
> >>>>
> >>>>
> >>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
> >>>> <[hidden email]<mailto:[hidden email]>> wrote:
> >>>>
> >>>> I think it should be possible to make this compatible with the
> >>>> .window().every() calls. Maybe if there is some trigger set in
> >>>> "every" we would not join that stream 1 by 1 but every so many
> >>>> elements. The problem here is that the window and every in this
> >>>> case are very-very different than the normal windowing semantics.
> >>>> The window would define the join window for each element of the
> >>>> other stream while every would define how often I join This stream
> >>>> with the other one.
> >>>>
> >>>> We need to think to make this intuitive.
> >>>>
> >>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> >>>> [hidden email]<mailto:[hidden email]>> wrote:
> >>>>
> >>>> That would be really neat, the problem I see there, that we do not
> >>>> distinguish between dataStream.window() and
> >>>> dataStream.window().every() currently, they both return
> >>>> WindowedDataStreams and TriggerPolicies of the every call do not
> >>>> make much sense in this setting (in fact practically the trigger is
> >>>> always set to count of one).
> >>>>
> >>>> But of course we could make it in a way, that we check that the
> >>>> eviction should be either null or count of 1, in every other case
> >>>> we throw an exception while building the JobGraph.
> >>>>
> >>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> >>>> [hidden email]<mailto:[hidden email]>> wrote:
> >>>>
> >>>> Or you could define it like this:
> >>>>
> >>>> stream_A = a.window(...) stream_B = b.window(...)
> >>>>
> >>>> stream_A.join(stream_B).where().equals().with()
> >>>>
> >>>> So a join would just be a join of two WindowedDataStreamS. This
> >>>> would neatly move the windowing stuff into one place.
> >>>>
> >>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> >>>> [hidden email]<mailto:[hidden email]>
> >>>>
> >>>> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
> >>>> bringing the windowing and window join API in sync.
> >>>>
> >>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
> >>>> <[hidden email]<mailto:[hidden email]>> wrote:
> >>>>
> >>>> Hey guys,
> >>>>
> >>>> As Aljoscha has highlighted earlier the current window join
> >>>> semantics in the streaming api doesn't follow the changes in the
> >>>> windowing api. More precisely, we currently only support joins over
> >>>> time windows of equal size on both streams. The reason for this is
> >>>> that we now take a window of each of the two streams and do joins
> >>>> over these pairs. This would be a blocking operation if the windows
> >>>> are not closed at exactly the same time (and since we dont want
> >>>> this we only allow time windows)
> >>>>
> >>>> I talked with Peter who came up with the initial idea of an
> >>>> alternative approach for stream joins which works as follows:
> >>>>
> >>>> Instead of pairing windows for joins, we do element against window
> >>>> joins. What this means is that whenever we receive an element from
> >>>> one of the streams, we join this element with the current
> >>>> window(this window is constantly updated) of the other stream. This
> >>>> is non-blocking on any window definitions as we dont have to wait
> >>>> for windows to be completed and we can use this with any of our
> >>>> predefined policies like Time.of(...), Count.of(...),
> >>>> Delta.of(....).
> >>>>
> >>>> Additionally this also allows some very flexible way of defining
> >>>> window joins. With this we could also define grouped windowing
> >>>> inside if a join. An example of this would be: Join all elements of
> >>>> Stream1 with the last 5 elements by a given windowkey of Stream2 on
> >>>> some join key.
> >>>>
> >>>> This feature can be easily implemented over the current operators,
> >>>> so I already have a working prototype for the simple non-grouped
> >>>> case. My only concern is the API, the best thing I could come up
> >>>> with is something like this:
> >>>>
> >>>> stream_A.join(stream_B).onWindow(windowDefA,
> >>>> windowDefB).by(windowKey1,
> >>>> windowKey2).where(...).equalTo(...).with(...)
> >>>>
> >>>> (the user can omit the "by" and "with" calls)
> >>>>
> >>>> I think this new approach would be worthy of our "flexible
> >>>> windowing" in contrast with the current approach.
> >>>>
> >>>> Regards, Gyula
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Bruno Cadonna
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi Stephan,

how much of CEP depends on fully ordered streams depends on the
operators that you use in the pattern query. But in general, they need
fully ordered events within a window or at least some strategies to
deal with out-of-order events.

If I got it right, you propose windows that are build on the
occurrence time of the event, i.e., the time when the event occurred
in the real world, and then to close a window when a punctuation says
that the window is complete.

I agree with you that with such windows, you can do a lot and decrease
overhead for maintainance. For example, some aggregations like sum,
count, avg do not need ordered events in the window, they just need
complete windows that are ordered with respect to each other to be
deterministic. However, if the windows overlap you need again the time
order to evict the oldest events from a window.

Cheers,
Bruno


On 08.04.2015 13:30, Stephan Ewen wrote:

> I agree, any ordering guarantees would need to be actively
> enabled.
>
> How much of CEP depends on fully ordered streams? There is a lot
> you can do with windows on event time, which are triggered by
> punctuations.
>
> This is like a "soft" variant of the ordered streams, where order
> relation occurs only with between windows, rather than between all
> events. That makes it much cheaper to maintain.
>
> On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax <
> [hidden email]> wrote:
>
>> This reasoning makes absolutely sense. That's why I suggested,
>> that the user should actively choose ordered data processing...
>>
>> About deadlocks: Those can be avoided, if the buffers are
>> consumed continuously in an in-memory merge buffer (maybe with
>> spilling to disk if necessary). Of course, latency might suffer
>> and punctuations should be used to relax this problem.
>>
>> As far as I understood, CEP might be an interesting use-case for
>> Flink, and it depend on ordered data streams.
>>
>>
>> -Matthias
>>
>>
>>
>> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
>>> Here is the state in Flink and why we have chosen not to do
>>> global
>> ordering
>>> at the moment:
>>>
>>> - Individual streams are FIFO, that means if the sender emits
>>> in order, the receiver receives in order.
>>>
>>> - When streams are merged (think shuffle / partition-by), then
>>> the
>> streams
>>> are not merged, but buffers from the streams are taken as the
>>> come in.
>>>
>>> - We had a version that merged streams (for sort merging in
>>> batch programs, actually) long ago, an it performed either
>>> horribly or deadlocked. The reason is that all streams are
>>> always stalled if a buffer is missing from one stream, since
>>> the merge cannot continue in such a
>> case
>>> (head-of-the-line waiting). That backpressures streams
>>> unnecessarily, slowing down computation. If the streams depend
>>> mutually on each other (think two partitioning steps), they
>>> frequently dadlock completely.
>>>
>>> - The only way to do that is by stalling/buffering/punctuating
>>> streams continuously, which is a lot of work to implement and
>>> will definitely
>> cost
>>> performance.
>>>
>>> Therefore we have decided going for a simpler model without
>>> global
>> ordering
>>> for now. If we start seeing that this has sever limitations in
>>> practice,
>> we
>>> may reconsider that.
>>>
>>>
>>>
>>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
>>> [hidden email]> wrote:
>>>
>>> Hi Paris,
>>>
>>> what's the reason for not guaranteeing global ordering across
>>> partitions in the stream model? Is it the smaller overhead or
>>> are there any operations not computable in a distributed
>>> environment with global ordering?
>>>
>>> In any case, I agree with Matthias that the user should choose.
>>> If operations were not computable with a global ordering, I
>>> would restrict the set of operations for that mode.
>>>
>>> Maybe, it would also be helpful to collect use cases for each
>>> of the modes proposed by Matthias to understand the
>>> requirements for both modes.
>>>
>>> Some (researchy) thoughts about indeterminism: How can the
>>> indeterminism of the current setting be quantified? How "large"
>>> can it grow with the current setting? Are there any limits that
>>> can be guaranteed?
>>>
>>> Cheers, Bruno
>>>
>>> On 07.04.2015 12:38, Paris Carbone wrote:
>>>>>> Hello Matthias,
>>>>>>
>>>>>> Sure, ordering guarantees are indeed a tricky thing, I
>>>>>> recall having that discussion back in TU Berlin. Bear in
>>>>>> mind thought that DataStream, our abstract data type,
>>>>>> represents a *partitioned* unbounded sequence of events.
>>>>>> There are no *global* ordering guarantees made whatsoever
>>>>>> in that model across partitions. If you see it more
>>>>>> generally there are many “race conditions” in a
>>>>>> distributed execution graph of vertices that process
>>>>>> multiple inputs asynchronously, especially when you add
>>>>>> joins and iterations into the mix (how do you deal with
>>>>>> reprocessing “old” tuples that iterate in the graph). Btw
>>>>>> have you checked the Naiad paper [1]? Stephan cited a
>>>>>> while ago and it is quite relevant to that discussion.
>>>>>>
>>>>>> Also, can you cite the paper with the joining semantics
>>>>>> you are referring to? That would be of good help I
>>>>>> think.
>>>>>>
>>>>>> Paris
>>>>>>
>>>>>> [1]
>>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>>>
>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>>>
>>>>>>
>>>>>>
<https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
>>>>>> Apr 2015, at 11:50, Matthias J. Sax
>>>>>> <[hidden email]<mailto:[hidden email]>>
>>>>>>
>>>>>>
wrote:

>>>>>>
>>>>>> Hi @all,
>>>>>>
>>>>>> please keep me in the loop for this work. I am highly
>>>>>> interested and I want to help on it.
>>>>>>
>>>>>> My initial thoughts are as follows:
>>>>>>
>>>>>> 1) Currently, system timestamps are used and the
>>>>>> suggested approach can be seen as state-of-the-art (there
>>>>>> is actually a research paper using the exact same join
>>>>>> semantic). Of course, the current approach is inherently
>>>>>> non-deterministic. The advantage is, that there is no
>>>>>> overhead in keeping track of the order of records and the
>>>>>> latency should be very low. (Additionally, state-recovery
>>>>>> is simplified. Because, the processing in inherently
>>>>>> non-deterministic, recovery can be done with relaxed
>>>>>> guarantees).
>>>>>>
>>>>>> 2) The user should be able to "switch on" deterministic
>>>>>> processing, ie, records are timestamped (either
>>>>>> externally when generated, or timestamped at the
>>>>>> sources). Because deterministic processing adds some
>>>>>> overhead, the user should decide for it actively. In this
>>>>>> case, the order must be preserved in each re-distribution
>>>>>> step (merging is sufficient, if order is preserved within
>>>>>> each incoming channel). Furthermore, deterministic
>>>>>> processing can be achieved by sound window semantics (and
>>>>>> there is a bunch of them). Even for single-stream-windows
>>>>>> it's a tricky problem; for join-windows it's even harder.
>>>>>> From my point of view, it is less important which
>>>>>> semantics are chosen; however, the user must be aware how
>>>>>> it works. The most tricky part for deterministic
>>>>>> processing, is to deal with duplicate timestamps (which
>>>>>> cannot be avoided). The timestamping for (intermediate)
>>>>>> result tuples, is also an important question to be
>>>>>> answered.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
>>>>>>
>>>>>> I agree with Kostas, if we define the exact semantics how
>>>>>> this works, this is not more ad-hoc than any other
>>>>>> stateful operator with multiple inputs. (And I don't
>>>>>> think any other system support something similar)
>>>>>>
>>>>>> We need to make some design choices that are similar to
>>>>>> the issues we had for windowing. We need to chose how we
>>>>>> want to evaluate the windowing policies (global or local)
>>>>>> because that affects what kind of policies can be
>>>>>> parallel, but I can work on these things.
>>>>>>
>>>>>> I think this is an amazing feature, so I wouldn't
>>>>>> necessarily rush the implementation for 0.9 though.
>>>>>>
>>>>>> And thanks for helping writing these down.
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
>>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>>>
>>>>>> Yes, we should write these semantics down. I volunteer to
>>>>>> help.
>>>>>>
>>>>>> I don't think that this is very ad-hoc. The semantics are
>>>>>> basically the following. Assuming an arriving element
>>>>>> from the left side: (1) We find the right-side matches
>>>>>> (2) We insert the left-side arrival into the left window
>>>>>> (3) We recompute the left window We need to see whether
>>>>>> right window re-computation needs to be triggered as
>>>>>> well. I think that this way of joining streams is also
>>>>>> what the symmetric hash join algorithms were meant to
>>>>>> support.
>>>>>>
>>>>>> Kostas
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
>>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>>>
>>>>>> Is the approach of joining an element at a time from one
>>>>>> input against a window on the other input not a bit
>>>>>> arbitrary?
>>>>>>
>>>>>> This just joins whatever currently happens to be the
>>>>>> window by the time the single element arrives - that is a
>>>>>> bit non-predictable, right?
>>>>>>
>>>>>> As a more general point: The whole semantics of windowing
>>>>>> and when they are triggered are a bit ad-hoc now. It
>>>>>> would be really good to start formalizing that a bit and
>>>>>> put it down somewhere. Users need to be able to clearly
>>>>>> understand and how to predict the output.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
>>>>>> <[hidden email]<mailto:[hidden email]>>
>>>>>> wrote:
>>>>>>
>>>>>> I think it should be possible to make this compatible
>>>>>> with the .window().every() calls. Maybe if there is some
>>>>>> trigger set in "every" we would not join that stream 1 by
>>>>>> 1 but every so many elements. The problem here is that
>>>>>> the window and every in this case are very-very different
>>>>>> than the normal windowing semantics. The window would
>>>>>> define the join window for each element of the other
>>>>>> stream while every would define how often I join This
>>>>>> stream with the other one.
>>>>>>
>>>>>> We need to think to make this intuitive.
>>>>>>
>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
>>>>>> [hidden email]<mailto:[hidden email]>>
>>>>>> wrote:
>>>>>>
>>>>>> That would be really neat, the problem I see there, that
>>>>>> we do not distinguish between dataStream.window() and
>>>>>> dataStream.window().every() currently, they both return
>>>>>> WindowedDataStreams and TriggerPolicies of the every call
>>>>>> do not make much sense in this setting (in fact
>>>>>> practically the trigger is always set to count of one).
>>>>>>
>>>>>> But of course we could make it in a way, that we check
>>>>>> that the eviction should be either null or count of 1, in
>>>>>> every other case we throw an exception while building the
>>>>>> JobGraph.
>>>>>>
>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
>>>>>> [hidden email]<mailto:[hidden email]>> wrote:
>>>>>>
>>>>>> Or you could define it like this:
>>>>>>
>>>>>> stream_A = a.window(...) stream_B = b.window(...)
>>>>>>
>>>>>> stream_A.join(stream_B).where().equals().with()
>>>>>>
>>>>>> So a join would just be a join of two
>>>>>> WindowedDataStreamS. This would neatly move the windowing
>>>>>> stuff into one place.
>>>>>>
>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>>>> [hidden email]<mailto:[hidden email]>
>>>>>>
>>>>>>
>>>>>>
wrote: Big +1 for the proposal for Peter and Gyula. I'm really for

>>>>>> bringing the windowing and window join API in sync.
>>>>>>
>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
>>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>>>
>>>>>> Hey guys,
>>>>>>
>>>>>> As Aljoscha has highlighted earlier the current window
>>>>>> join semantics in the streaming api doesn't follow the
>>>>>> changes in the windowing api. More precisely, we
>>>>>> currently only support joins over time windows of equal
>>>>>> size on both streams. The reason for this is that we now
>>>>>> take a window of each of the two streams and do joins
>>>>>> over these pairs. This would be a blocking operation if
>>>>>> the windows are not closed at exactly the same time (and
>>>>>> since we dont want this we only allow time windows)
>>>>>>
>>>>>> I talked with Peter who came up with the initial idea of
>>>>>> an alternative approach for stream joins which works as
>>>>>> follows:
>>>>>>
>>>>>> Instead of pairing windows for joins, we do element
>>>>>> against window joins. What this means is that whenever we
>>>>>> receive an element from one of the streams, we join this
>>>>>> element with the current window(this window is constantly
>>>>>> updated) of the other stream. This is non-blocking on any
>>>>>> window definitions as we dont have to wait for windows to
>>>>>> be completed and we can use this with any of our
>>>>>> predefined policies like Time.of(...), Count.of(...),
>>>>>> Delta.of(....).
>>>>>>
>>>>>> Additionally this also allows some very flexible way of
>>>>>> defining window joins. With this we could also define
>>>>>> grouped windowing inside if a join. An example of this
>>>>>> would be: Join all elements of Stream1 with the last 5
>>>>>> elements by a given windowkey of Stream2 on some join
>>>>>> key.
>>>>>>
>>>>>> This feature can be easily implemented over the current
>>>>>> operators, so I already have a working prototype for the
>>>>>> simple non-grouped case. My only concern is the API, the
>>>>>> best thing I could come up with is something like this:
>>>>>>
>>>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>>>> windowDefB).by(windowKey1,
>>>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>>>
>>>>>> (the user can omit the "by" and "with" calls)
>>>>>>
>>>>>> I think this new approach would be worthy of our
>>>>>> "flexible windowing" in contrast with the current
>>>>>> approach.
>>>>>>
>>>>>> Regards, Gyula
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>
>>>>
>>>
>>
>>
>

- --
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)

iQEcBAEBAgAGBQJVJSP5AAoJEKdCIJx7flKwCPcH/2Hv9QsEWdTVTeAEdt7fdVRu
NVck4h56tZ9PNlP8Ir17QYAQ+KEDTmFwKNxj+rvCdzTs7Q1CAmD2EQ0LJ80BI4FX
cpp+AV2dRj+hzMAtq4tgak6qS9vHhlSetsmj56xV4Ea5IhcunyViHNMWn0+nbzu0
aoPCKKVNmODCwPB8UBqN4eFWjjxF0aUwPisp7goUECgbEJkvGKrknWhlwVsKSft3
IXMlQ4gKONNguW0Pg7abU7tD2brarQOa/keVNHWddPgJeZJCrs3v1cLCthbLjPpb
aXULfQYiszch+Us4HGddiOEDoizpQdWcydzfRoIhDVXQCftqC8ftHzWgkEW8Mk8=
=asVX
-----END PGP SIGNATURE-----
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Stephan Ewen
With the current network layer and the agenda we have for windowing, we
should be able to support widows on event time this in the near future.
Inside the window, you can sort all records by time and have a full
ordering. That is independent of the order of the stream.

How about this as a first goal?

On Wed, Apr 8, 2015 at 2:50 PM, Bruno Cadonna <
[hidden email]> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Hi Stephan,
>
> how much of CEP depends on fully ordered streams depends on the
> operators that you use in the pattern query. But in general, they need
> fully ordered events within a window or at least some strategies to
> deal with out-of-order events.
>
> If I got it right, you propose windows that are build on the
> occurrence time of the event, i.e., the time when the event occurred
> in the real world, and then to close a window when a punctuation says
> that the window is complete.
>
> I agree with you that with such windows, you can do a lot and decrease
> overhead for maintainance. For example, some aggregations like sum,
> count, avg do not need ordered events in the window, they just need
> complete windows that are ordered with respect to each other to be
> deterministic. However, if the windows overlap you need again the time
> order to evict the oldest events from a window.
>
> Cheers,
> Bruno
>
>
> On 08.04.2015 13:30, Stephan Ewen wrote:
> > I agree, any ordering guarantees would need to be actively
> > enabled.
> >
> > How much of CEP depends on fully ordered streams? There is a lot
> > you can do with windows on event time, which are triggered by
> > punctuations.
> >
> > This is like a "soft" variant of the ordered streams, where order
> > relation occurs only with between windows, rather than between all
> > events. That makes it much cheaper to maintain.
> >
> > On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax <
> > [hidden email]> wrote:
> >
> >> This reasoning makes absolutely sense. That's why I suggested,
> >> that the user should actively choose ordered data processing...
> >>
> >> About deadlocks: Those can be avoided, if the buffers are
> >> consumed continuously in an in-memory merge buffer (maybe with
> >> spilling to disk if necessary). Of course, latency might suffer
> >> and punctuations should be used to relax this problem.
> >>
> >> As far as I understood, CEP might be an interesting use-case for
> >> Flink, and it depend on ordered data streams.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
> >>> Here is the state in Flink and why we have chosen not to do
> >>> global
> >> ordering
> >>> at the moment:
> >>>
> >>> - Individual streams are FIFO, that means if the sender emits
> >>> in order, the receiver receives in order.
> >>>
> >>> - When streams are merged (think shuffle / partition-by), then
> >>> the
> >> streams
> >>> are not merged, but buffers from the streams are taken as the
> >>> come in.
> >>>
> >>> - We had a version that merged streams (for sort merging in
> >>> batch programs, actually) long ago, an it performed either
> >>> horribly or deadlocked. The reason is that all streams are
> >>> always stalled if a buffer is missing from one stream, since
> >>> the merge cannot continue in such a
> >> case
> >>> (head-of-the-line waiting). That backpressures streams
> >>> unnecessarily, slowing down computation. If the streams depend
> >>> mutually on each other (think two partitioning steps), they
> >>> frequently dadlock completely.
> >>>
> >>> - The only way to do that is by stalling/buffering/punctuating
> >>> streams continuously, which is a lot of work to implement and
> >>> will definitely
> >> cost
> >>> performance.
> >>>
> >>> Therefore we have decided going for a simpler model without
> >>> global
> >> ordering
> >>> for now. If we start seeing that this has sever limitations in
> >>> practice,
> >> we
> >>> may reconsider that.
> >>>
> >>>
> >>>
> >>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
> >>> [hidden email]> wrote:
> >>>
> >>> Hi Paris,
> >>>
> >>> what's the reason for not guaranteeing global ordering across
> >>> partitions in the stream model? Is it the smaller overhead or
> >>> are there any operations not computable in a distributed
> >>> environment with global ordering?
> >>>
> >>> In any case, I agree with Matthias that the user should choose.
> >>> If operations were not computable with a global ordering, I
> >>> would restrict the set of operations for that mode.
> >>>
> >>> Maybe, it would also be helpful to collect use cases for each
> >>> of the modes proposed by Matthias to understand the
> >>> requirements for both modes.
> >>>
> >>> Some (researchy) thoughts about indeterminism: How can the
> >>> indeterminism of the current setting be quantified? How "large"
> >>> can it grow with the current setting? Are there any limits that
> >>> can be guaranteed?
> >>>
> >>> Cheers, Bruno
> >>>
> >>> On 07.04.2015 12:38, Paris Carbone wrote:
> >>>>>> Hello Matthias,
> >>>>>>
> >>>>>> Sure, ordering guarantees are indeed a tricky thing, I
> >>>>>> recall having that discussion back in TU Berlin. Bear in
> >>>>>> mind thought that DataStream, our abstract data type,
> >>>>>> represents a *partitioned* unbounded sequence of events.
> >>>>>> There are no *global* ordering guarantees made whatsoever
> >>>>>> in that model across partitions. If you see it more
> >>>>>> generally there are many “race conditions” in a
> >>>>>> distributed execution graph of vertices that process
> >>>>>> multiple inputs asynchronously, especially when you add
> >>>>>> joins and iterations into the mix (how do you deal with
> >>>>>> reprocessing “old” tuples that iterate in the graph). Btw
> >>>>>> have you checked the Naiad paper [1]? Stephan cited a
> >>>>>> while ago and it is quite relevant to that discussion.
> >>>>>>
> >>>>>> Also, can you cite the paper with the joining semantics
> >>>>>> you are referring to? That would be of good help I
> >>>>>> think.
> >>>>>>
> >>>>>> Paris
> >>>>>>
> >>>>>> [1]
> >>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> >>>>>>
> >>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> >>>>>>
> >>>>>>
> >>>>>>
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
> >>>>>> Apr 2015, at 11:50, Matthias J. Sax
> >>>>>> <[hidden email]<mailto:[hidden email]
> >>
> >>>>>>
> >>>>>>
> wrote:
> >>>>>>
> >>>>>> Hi @all,
> >>>>>>
> >>>>>> please keep me in the loop for this work. I am highly
> >>>>>> interested and I want to help on it.
> >>>>>>
> >>>>>> My initial thoughts are as follows:
> >>>>>>
> >>>>>> 1) Currently, system timestamps are used and the
> >>>>>> suggested approach can be seen as state-of-the-art (there
> >>>>>> is actually a research paper using the exact same join
> >>>>>> semantic). Of course, the current approach is inherently
> >>>>>> non-deterministic. The advantage is, that there is no
> >>>>>> overhead in keeping track of the order of records and the
> >>>>>> latency should be very low. (Additionally, state-recovery
> >>>>>> is simplified. Because, the processing in inherently
> >>>>>> non-deterministic, recovery can be done with relaxed
> >>>>>> guarantees).
> >>>>>>
> >>>>>> 2) The user should be able to "switch on" deterministic
> >>>>>> processing, ie, records are timestamped (either
> >>>>>> externally when generated, or timestamped at the
> >>>>>> sources). Because deterministic processing adds some
> >>>>>> overhead, the user should decide for it actively. In this
> >>>>>> case, the order must be preserved in each re-distribution
> >>>>>> step (merging is sufficient, if order is preserved within
> >>>>>> each incoming channel). Furthermore, deterministic
> >>>>>> processing can be achieved by sound window semantics (and
> >>>>>> there is a bunch of them). Even for single-stream-windows
> >>>>>> it's a tricky problem; for join-windows it's even harder.
> >>>>>> From my point of view, it is less important which
> >>>>>> semantics are chosen; however, the user must be aware how
> >>>>>> it works. The most tricky part for deterministic
> >>>>>> processing, is to deal with duplicate timestamps (which
> >>>>>> cannot be avoided). The timestamping for (intermediate)
> >>>>>> result tuples, is also an important question to be
> >>>>>> answered.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
> >>>>>>
> >>>>>> I agree with Kostas, if we define the exact semantics how
> >>>>>> this works, this is not more ad-hoc than any other
> >>>>>> stateful operator with multiple inputs. (And I don't
> >>>>>> think any other system support something similar)
> >>>>>>
> >>>>>> We need to make some design choices that are similar to
> >>>>>> the issues we had for windowing. We need to chose how we
> >>>>>> want to evaluate the windowing policies (global or local)
> >>>>>> because that affects what kind of policies can be
> >>>>>> parallel, but I can work on these things.
> >>>>>>
> >>>>>> I think this is an amazing feature, so I wouldn't
> >>>>>> necessarily rush the implementation for 0.9 though.
> >>>>>>
> >>>>>> And thanks for helping writing these down.
> >>>>>>
> >>>>>> Gyula
> >>>>>>
> >>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
> >>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
> >>>>>>
> >>>>>> Yes, we should write these semantics down. I volunteer to
> >>>>>> help.
> >>>>>>
> >>>>>> I don't think that this is very ad-hoc. The semantics are
> >>>>>> basically the following. Assuming an arriving element
> >>>>>> from the left side: (1) We find the right-side matches
> >>>>>> (2) We insert the left-side arrival into the left window
> >>>>>> (3) We recompute the left window We need to see whether
> >>>>>> right window re-computation needs to be triggered as
> >>>>>> well. I think that this way of joining streams is also
> >>>>>> what the symmetric hash join algorithms were meant to
> >>>>>> support.
> >>>>>>
> >>>>>> Kostas
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
> >>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
> >>>>>>
> >>>>>> Is the approach of joining an element at a time from one
> >>>>>> input against a window on the other input not a bit
> >>>>>> arbitrary?
> >>>>>>
> >>>>>> This just joins whatever currently happens to be the
> >>>>>> window by the time the single element arrives - that is a
> >>>>>> bit non-predictable, right?
> >>>>>>
> >>>>>> As a more general point: The whole semantics of windowing
> >>>>>> and when they are triggered are a bit ad-hoc now. It
> >>>>>> would be really good to start formalizing that a bit and
> >>>>>> put it down somewhere. Users need to be able to clearly
> >>>>>> understand and how to predict the output.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
> >>>>>> <[hidden email]<mailto:[hidden email]>>
> >>>>>> wrote:
> >>>>>>
> >>>>>> I think it should be possible to make this compatible
> >>>>>> with the .window().every() calls. Maybe if there is some
> >>>>>> trigger set in "every" we would not join that stream 1 by
> >>>>>> 1 but every so many elements. The problem here is that
> >>>>>> the window and every in this case are very-very different
> >>>>>> than the normal windowing semantics. The window would
> >>>>>> define the join window for each element of the other
> >>>>>> stream while every would define how often I join This
> >>>>>> stream with the other one.
> >>>>>>
> >>>>>> We need to think to make this intuitive.
> >>>>>>
> >>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> >>>>>> [hidden email]<mailto:[hidden email]>>
> >>>>>> wrote:
> >>>>>>
> >>>>>> That would be really neat, the problem I see there, that
> >>>>>> we do not distinguish between dataStream.window() and
> >>>>>> dataStream.window().every() currently, they both return
> >>>>>> WindowedDataStreams and TriggerPolicies of the every call
> >>>>>> do not make much sense in this setting (in fact
> >>>>>> practically the trigger is always set to count of one).
> >>>>>>
> >>>>>> But of course we could make it in a way, that we check
> >>>>>> that the eviction should be either null or count of 1, in
> >>>>>> every other case we throw an exception while building the
> >>>>>> JobGraph.
> >>>>>>
> >>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> >>>>>> [hidden email]<mailto:[hidden email]>> wrote:
> >>>>>>
> >>>>>> Or you could define it like this:
> >>>>>>
> >>>>>> stream_A = a.window(...) stream_B = b.window(...)
> >>>>>>
> >>>>>> stream_A.join(stream_B).where().equals().with()
> >>>>>>
> >>>>>> So a join would just be a join of two
> >>>>>> WindowedDataStreamS. This would neatly move the windowing
> >>>>>> stuff into one place.
> >>>>>>
> >>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> >>>>>> [hidden email]<mailto:[hidden email]>
> >>>>>>
> >>>>>>
> >>>>>>
> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
> >>>>>> bringing the windowing and window join API in sync.
> >>>>>>
> >>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
> >>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
> >>>>>>
> >>>>>> Hey guys,
> >>>>>>
> >>>>>> As Aljoscha has highlighted earlier the current window
> >>>>>> join semantics in the streaming api doesn't follow the
> >>>>>> changes in the windowing api. More precisely, we
> >>>>>> currently only support joins over time windows of equal
> >>>>>> size on both streams. The reason for this is that we now
> >>>>>> take a window of each of the two streams and do joins
> >>>>>> over these pairs. This would be a blocking operation if
> >>>>>> the windows are not closed at exactly the same time (and
> >>>>>> since we dont want this we only allow time windows)
> >>>>>>
> >>>>>> I talked with Peter who came up with the initial idea of
> >>>>>> an alternative approach for stream joins which works as
> >>>>>> follows:
> >>>>>>
> >>>>>> Instead of pairing windows for joins, we do element
> >>>>>> against window joins. What this means is that whenever we
> >>>>>> receive an element from one of the streams, we join this
> >>>>>> element with the current window(this window is constantly
> >>>>>> updated) of the other stream. This is non-blocking on any
> >>>>>> window definitions as we dont have to wait for windows to
> >>>>>> be completed and we can use this with any of our
> >>>>>> predefined policies like Time.of(...), Count.of(...),
> >>>>>> Delta.of(....).
> >>>>>>
> >>>>>> Additionally this also allows some very flexible way of
> >>>>>> defining window joins. With this we could also define
> >>>>>> grouped windowing inside if a join. An example of this
> >>>>>> would be: Join all elements of Stream1 with the last 5
> >>>>>> elements by a given windowkey of Stream2 on some join
> >>>>>> key.
> >>>>>>
> >>>>>> This feature can be easily implemented over the current
> >>>>>> operators, so I already have a working prototype for the
> >>>>>> simple non-grouped case. My only concern is the API, the
> >>>>>> best thing I could come up with is something like this:
> >>>>>>
> >>>>>> stream_A.join(stream_B).onWindow(windowDefA,
> >>>>>> windowDefB).by(windowKey1,
> >>>>>> windowKey2).where(...).equalTo(...).with(...)
> >>>>>>
> >>>>>> (the user can omit the "by" and "with" calls)
> >>>>>>
> >>>>>> I think this new approach would be worthy of our
> >>>>>> "flexible windowing" in contrast with the current
> >>>>>> approach.
> >>>>>>
> >>>>>> Regards, Gyula
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>
> >>>>
> >>>
> >>
> >>
> >
>
> - --
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
>   Dr. Bruno Cadonna
>   Postdoctoral Researcher
>
>   Databases and Information Systems
>   Department of Computer Science
>   Humboldt-Universität zu Berlin
>
>   http://www.informatik.hu-berlin.de/~cadonnab
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.11 (GNU/Linux)
>
> iQEcBAEBAgAGBQJVJSP5AAoJEKdCIJx7flKwCPcH/2Hv9QsEWdTVTeAEdt7fdVRu
> NVck4h56tZ9PNlP8Ir17QYAQ+KEDTmFwKNxj+rvCdzTs7Q1CAmD2EQ0LJ80BI4FX
> cpp+AV2dRj+hzMAtq4tgak6qS9vHhlSetsmj56xV4Ea5IhcunyViHNMWn0+nbzu0
> aoPCKKVNmODCwPB8UBqN4eFWjjxF0aUwPisp7goUECgbEJkvGKrknWhlwVsKSft3
> IXMlQ4gKONNguW0Pg7abU7tD2brarQOa/keVNHWddPgJeZJCrs3v1cLCthbLjPpb
> aXULfQYiszch+Us4HGddiOEDoizpQdWcydzfRoIhDVXQCftqC8ftHzWgkEW8Mk8=
> =asVX
> -----END PGP SIGNATURE-----
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Bruno Cadonna
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi Stephan,

that sounds reasonable to me.

Cheers,
Bruno

On 08.04.2015 15:06, Stephan Ewen wrote:

> With the current network layer and the agenda we have for
> windowing, we should be able to support widows on event time this
> in the near future. Inside the window, you can sort all records by
> time and have a full ordering. That is independent of the order of
> the stream.
>
> How about this as a first goal?
>
> On Wed, Apr 8, 2015 at 2:50 PM, Bruno Cadonna <
> [hidden email]> wrote:
>
> Hi Stephan,
>
> how much of CEP depends on fully ordered streams depends on the
> operators that you use in the pattern query. But in general, they
> need fully ordered events within a window or at least some
> strategies to deal with out-of-order events.
>
> If I got it right, you propose windows that are build on the
> occurrence time of the event, i.e., the time when the event
> occurred in the real world, and then to close a window when a
> punctuation says that the window is complete.
>
> I agree with you that with such windows, you can do a lot and
> decrease overhead for maintainance. For example, some aggregations
> like sum, count, avg do not need ordered events in the window, they
> just need complete windows that are ordered with respect to each
> other to be deterministic. However, if the windows overlap you need
> again the time order to evict the oldest events from a window.
>
> Cheers, Bruno
>
>
> On 08.04.2015 13:30, Stephan Ewen wrote:
>>>> I agree, any ordering guarantees would need to be actively
>>>> enabled.
>>>>
>>>> How much of CEP depends on fully ordered streams? There is a
>>>> lot you can do with windows on event time, which are
>>>> triggered by punctuations.
>>>>
>>>> This is like a "soft" variant of the ordered streams, where
>>>> order relation occurs only with between windows, rather than
>>>> between all events. That makes it much cheaper to maintain.
>>>>
>>>> On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax <
>>>> [hidden email]> wrote:
>>>>
>>>>> This reasoning makes absolutely sense. That's why I
>>>>> suggested, that the user should actively choose ordered
>>>>> data processing...
>>>>>
>>>>> About deadlocks: Those can be avoided, if the buffers are
>>>>> consumed continuously in an in-memory merge buffer (maybe
>>>>> with spilling to disk if necessary). Of course, latency
>>>>> might suffer and punctuations should be used to relax this
>>>>> problem.
>>>>>
>>>>> As far as I understood, CEP might be an interesting
>>>>> use-case for Flink, and it depend on ordered data streams.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
>>>>>> Here is the state in Flink and why we have chosen not to
>>>>>> do global
>>>>> ordering
>>>>>> at the moment:
>>>>>>
>>>>>> - Individual streams are FIFO, that means if the sender
>>>>>> emits in order, the receiver receives in order.
>>>>>>
>>>>>> - When streams are merged (think shuffle / partition-by),
>>>>>> then the
>>>>> streams
>>>>>> are not merged, but buffers from the streams are taken as
>>>>>> the come in.
>>>>>>
>>>>>> - We had a version that merged streams (for sort merging
>>>>>> in batch programs, actually) long ago, an it performed
>>>>>> either horribly or deadlocked. The reason is that all
>>>>>> streams are always stalled if a buffer is missing from
>>>>>> one stream, since the merge cannot continue in such a
>>>>> case
>>>>>> (head-of-the-line waiting). That backpressures streams
>>>>>> unnecessarily, slowing down computation. If the streams
>>>>>> depend mutually on each other (think two partitioning
>>>>>> steps), they frequently dadlock completely.
>>>>>>
>>>>>> - The only way to do that is by
>>>>>> stalling/buffering/punctuating streams continuously,
>>>>>> which is a lot of work to implement and will definitely
>>>>> cost
>>>>>> performance.
>>>>>>
>>>>>> Therefore we have decided going for a simpler model
>>>>>> without global
>>>>> ordering
>>>>>> for now. If we start seeing that this has sever
>>>>>> limitations in practice,
>>>>> we
>>>>>> may reconsider that.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
>>>>>> [hidden email]> wrote:
>>>>>>
>>>>>> Hi Paris,
>>>>>>
>>>>>> what's the reason for not guaranteeing global ordering
>>>>>> across partitions in the stream model? Is it the smaller
>>>>>> overhead or are there any operations not computable in a
>>>>>> distributed environment with global ordering?
>>>>>>
>>>>>> In any case, I agree with Matthias that the user should
>>>>>> choose. If operations were not computable with a global
>>>>>> ordering, I would restrict the set of operations for that
>>>>>> mode.
>>>>>>
>>>>>> Maybe, it would also be helpful to collect use cases for
>>>>>> each of the modes proposed by Matthias to understand the
>>>>>> requirements for both modes.
>>>>>>
>>>>>> Some (researchy) thoughts about indeterminism: How can
>>>>>> the indeterminism of the current setting be quantified?
>>>>>> How "large" can it grow with the current setting? Are
>>>>>> there any limits that can be guaranteed?
>>>>>>
>>>>>> Cheers, Bruno
>>>>>>
>>>>>> On 07.04.2015 12:38, Paris Carbone wrote:
>>>>>>>>> Hello Matthias,
>>>>>>>>>
>>>>>>>>> Sure, ordering guarantees are indeed a tricky
>>>>>>>>> thing, I recall having that discussion back in TU
>>>>>>>>> Berlin. Bear in mind thought that DataStream, our
>>>>>>>>> abstract data type, represents a *partitioned*
>>>>>>>>> unbounded sequence of events. There are no *global*
>>>>>>>>> ordering guarantees made whatsoever in that model
>>>>>>>>> across partitions. If you see it more generally
>>>>>>>>> there are many “race conditions” in a distributed
>>>>>>>>> execution graph of vertices that process multiple
>>>>>>>>> inputs asynchronously, especially when you add
>>>>>>>>> joins and iterations into the mix (how do you deal
>>>>>>>>> with reprocessing “old” tuples that iterate in the
>>>>>>>>> graph). Btw have you checked the Naiad paper [1]?
>>>>>>>>> Stephan cited a while ago and it is quite relevant
>>>>>>>>> to that discussion.
>>>>>>>>>
>>>>>>>>> Also, can you cite the paper with the joining
>>>>>>>>> semantics you are referring to? That would be of
>>>>>>>>> good help I think.
>>>>>>>>>
>>>>>>>>> Paris
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
<https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>

>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
>>>>>>>>> Apr 2015, at 11:50, Matthias J. Sax
>>>>>>>>> <[hidden email]<mailto:[hidden email]
>>>>>
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>>
wrote:

>>>>>>>>>
>>>>>>>>> Hi @all,
>>>>>>>>>
>>>>>>>>> please keep me in the loop for this work. I am
>>>>>>>>> highly interested and I want to help on it.
>>>>>>>>>
>>>>>>>>> My initial thoughts are as follows:
>>>>>>>>>
>>>>>>>>> 1) Currently, system timestamps are used and the
>>>>>>>>> suggested approach can be seen as state-of-the-art
>>>>>>>>> (there is actually a research paper using the exact
>>>>>>>>> same join semantic). Of course, the current
>>>>>>>>> approach is inherently non-deterministic. The
>>>>>>>>> advantage is, that there is no overhead in keeping
>>>>>>>>> track of the order of records and the latency
>>>>>>>>> should be very low. (Additionally, state-recovery
>>>>>>>>> is simplified. Because, the processing in
>>>>>>>>> inherently non-deterministic, recovery can be done
>>>>>>>>> with relaxed guarantees).
>>>>>>>>>
>>>>>>>>> 2) The user should be able to "switch on"
>>>>>>>>> deterministic processing, ie, records are
>>>>>>>>> timestamped (either externally when generated, or
>>>>>>>>> timestamped at the sources). Because deterministic
>>>>>>>>> processing adds some overhead, the user should
>>>>>>>>> decide for it actively. In this case, the order
>>>>>>>>> must be preserved in each re-distribution step
>>>>>>>>> (merging is sufficient, if order is preserved
>>>>>>>>> within each incoming channel). Furthermore,
>>>>>>>>> deterministic processing can be achieved by sound
>>>>>>>>> window semantics (and there is a bunch of them).
>>>>>>>>> Even for single-stream-windows it's a tricky
>>>>>>>>> problem; for join-windows it's even harder. From my
>>>>>>>>> point of view, it is less important which semantics
>>>>>>>>> are chosen; however, the user must be aware how it
>>>>>>>>> works. The most tricky part for deterministic
>>>>>>>>> processing, is to deal with duplicate timestamps
>>>>>>>>> (which cannot be avoided). The timestamping for
>>>>>>>>> (intermediate) result tuples, is also an important
>>>>>>>>> question to be answered.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
>>>>>>>>>
>>>>>>>>> I agree with Kostas, if we define the exact
>>>>>>>>> semantics how this works, this is not more ad-hoc
>>>>>>>>> than any other stateful operator with multiple
>>>>>>>>> inputs. (And I don't think any other system support
>>>>>>>>> something similar)
>>>>>>>>>
>>>>>>>>> We need to make some design choices that are
>>>>>>>>> similar to the issues we had for windowing. We need
>>>>>>>>> to chose how we want to evaluate the windowing
>>>>>>>>> policies (global or local) because that affects
>>>>>>>>> what kind of policies can be parallel, but I can
>>>>>>>>> work on these things.
>>>>>>>>>
>>>>>>>>> I think this is an amazing feature, so I wouldn't
>>>>>>>>> necessarily rush the implementation for 0.9
>>>>>>>>> though.
>>>>>>>>>
>>>>>>>>> And thanks for helping writing these down.
>>>>>>>>>
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
>>>>>>>>> <[hidden email]<mailto:[hidden email]>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Yes, we should write these semantics down. I
>>>>>>>>> volunteer to help.
>>>>>>>>>
>>>>>>>>> I don't think that this is very ad-hoc. The
>>>>>>>>> semantics are basically the following. Assuming an
>>>>>>>>> arriving element from the left side: (1) We find
>>>>>>>>> the right-side matches (2) We insert the left-side
>>>>>>>>> arrival into the left window (3) We recompute the
>>>>>>>>> left window We need to see whether right window
>>>>>>>>> re-computation needs to be triggered as well. I
>>>>>>>>> think that this way of joining streams is also what
>>>>>>>>> the symmetric hash join algorithms were meant to
>>>>>>>>> support.
>>>>>>>>>
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
>>>>>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>>>>>>
>>>>>>>>> Is the approach of joining an element at a time
>>>>>>>>> from one input against a window on the other input
>>>>>>>>> not a bit arbitrary?
>>>>>>>>>
>>>>>>>>> This just joins whatever currently happens to be
>>>>>>>>> the window by the time the single element arrives -
>>>>>>>>> that is a bit non-predictable, right?
>>>>>>>>>
>>>>>>>>> As a more general point: The whole semantics of
>>>>>>>>> windowing and when they are triggered are a bit
>>>>>>>>> ad-hoc now. It would be really good to start
>>>>>>>>> formalizing that a bit and put it down somewhere.
>>>>>>>>> Users need to be able to clearly understand and how
>>>>>>>>> to predict the output.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
>>>>>>>>> <[hidden email]<mailto:[hidden email]>>
>>>>>>>>>
>>>>>>>>>
wrote:

>>>>>>>>>
>>>>>>>>> I think it should be possible to make this
>>>>>>>>> compatible with the .window().every() calls. Maybe
>>>>>>>>> if there is some trigger set in "every" we would
>>>>>>>>> not join that stream 1 by 1 but every so many
>>>>>>>>> elements. The problem here is that the window and
>>>>>>>>> every in this case are very-very different than the
>>>>>>>>> normal windowing semantics. The window would define
>>>>>>>>> the join window for each element of the other
>>>>>>>>> stream while every would define how often I join
>>>>>>>>> This stream with the other one.
>>>>>>>>>
>>>>>>>>> We need to think to make this intuitive.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
>>>>>>>>> [hidden email]<mailto:[hidden email]>>
>>>>>>>>>
>>>>>>>>>
wrote:

>>>>>>>>>
>>>>>>>>> That would be really neat, the problem I see there,
>>>>>>>>> that we do not distinguish between
>>>>>>>>> dataStream.window() and dataStream.window().every()
>>>>>>>>> currently, they both return WindowedDataStreams and
>>>>>>>>> TriggerPolicies of the every call do not make much
>>>>>>>>> sense in this setting (in fact practically the
>>>>>>>>> trigger is always set to count of one).
>>>>>>>>>
>>>>>>>>> But of course we could make it in a way, that we
>>>>>>>>> check that the eviction should be either null or
>>>>>>>>> count of 1, in every other case we throw an
>>>>>>>>> exception while building the JobGraph.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
>>>>>>>>> [hidden email]<mailto:[hidden email]>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Or you could define it like this:
>>>>>>>>>
>>>>>>>>> stream_A = a.window(...) stream_B = b.window(...)
>>>>>>>>>
>>>>>>>>> stream_A.join(stream_B).where().equals().with()
>>>>>>>>>
>>>>>>>>> So a join would just be a join of two
>>>>>>>>> WindowedDataStreamS. This would neatly move the
>>>>>>>>> windowing stuff into one place.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>>>>>>> [hidden email]<mailto:[hidden email]>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>
>>>>>>>>>
wrote: Big +1 for the proposal for Peter and Gyula. I'm really for

>>>>>>>>> bringing the windowing and window join API in
>>>>>>>>> sync.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
>>>>>>>>> <[hidden email]<mailto:[hidden email]>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hey guys,
>>>>>>>>>
>>>>>>>>> As Aljoscha has highlighted earlier the current
>>>>>>>>> window join semantics in the streaming api doesn't
>>>>>>>>> follow the changes in the windowing api. More
>>>>>>>>> precisely, we currently only support joins over
>>>>>>>>> time windows of equal size on both streams. The
>>>>>>>>> reason for this is that we now take a window of
>>>>>>>>> each of the two streams and do joins over these
>>>>>>>>> pairs. This would be a blocking operation if the
>>>>>>>>> windows are not closed at exactly the same time
>>>>>>>>> (and since we dont want this we only allow time
>>>>>>>>> windows)
>>>>>>>>>
>>>>>>>>> I talked with Peter who came up with the initial
>>>>>>>>> idea of an alternative approach for stream joins
>>>>>>>>> which works as follows:
>>>>>>>>>
>>>>>>>>> Instead of pairing windows for joins, we do
>>>>>>>>> element against window joins. What this means is
>>>>>>>>> that whenever we receive an element from one of the
>>>>>>>>> streams, we join this element with the current
>>>>>>>>> window(this window is constantly updated) of the
>>>>>>>>> other stream. This is non-blocking on any window
>>>>>>>>> definitions as we dont have to wait for windows to
>>>>>>>>> be completed and we can use this with any of our
>>>>>>>>> predefined policies like Time.of(...),
>>>>>>>>> Count.of(...), Delta.of(....).
>>>>>>>>>
>>>>>>>>> Additionally this also allows some very flexible
>>>>>>>>> way of defining window joins. With this we could
>>>>>>>>> also define grouped windowing inside if a join. An
>>>>>>>>> example of this would be: Join all elements of
>>>>>>>>> Stream1 with the last 5 elements by a given
>>>>>>>>> windowkey of Stream2 on some join key.
>>>>>>>>>
>>>>>>>>> This feature can be easily implemented over the
>>>>>>>>> current operators, so I already have a working
>>>>>>>>> prototype for the simple non-grouped case. My only
>>>>>>>>> concern is the API, the best thing I could come up
>>>>>>>>> with is something like this:
>>>>>>>>>
>>>>>>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>>>>>>> windowDefB).by(windowKey1,
>>>>>>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>>>>>>
>>>>>>>>> (the user can omit the "by" and "with" calls)
>>>>>>>>>
>>>>>>>>> I think this new approach would be worthy of our
>>>>>>>>> "flexible windowing" in contrast with the current
>>>>>>>>> approach.
>>>>>>>>>
>>>>>>>>> Regards, Gyula
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>
>>
>

- --
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)

iQEcBAEBAgAGBQJVJSoUAAoJEKdCIJx7flKw2BwH/RyhPD7nXGLTwgPJvGBIaPKl
ghtTi1t7WutYiVRhHJmISK+kbNWxfXSioYhC8IzK2aqZZQ/j3hn3xVZIL4c+c+Oq
jpRGNhC7xFzqRCF8ki/saFpQO9vU2zLP70uk9bQGIna19z2Ye28Ofoc0d+Dv/qrV
acH5HX+s+rdmbt9mgcGeyA0/+Hh/HWPiic0auRhhX06zHuynrHBLDaS32RVAOT79
bZ91VIElyu0pDCafRDrWNOzVlop+wGLl6cvBGc/vznO501M1jjJvqS9npLsPKrpE
OrpBRHIrWT2PicZwLMaSzoZsjl15Sy0qsV8On4e6Ug5f6RhPlKH2IVmVVIDZZ1I=
=sd8+
-----END PGP SIGNATURE-----
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Márton Balassi
In reply to this post by Stephan Ewen
+1 for Stephan's suggestion.

If we would like to support event time and also sorting inside a window we
should carefully consider where to actually put the timestamp of the
records. If the timestamp is part of the record then it is more
straight-forward, but in case of we assign the timestamps in our sources
the initial idea was to keep these hidden from the user and only use it in
the network layer.

The more extreme solution with total ordering has a JIRA, but has been a
bit silent lately. [1]

[1] https://issues.apache.org/jira/browse/FLINK-1493?

On Wed, Apr 8, 2015 at 3:06 PM, Stephan Ewen <[hidden email]> wrote:

> With the current network layer and the agenda we have for windowing, we
> should be able to support widows on event time this in the near future.
> Inside the window, you can sort all records by time and have a full
> ordering. That is independent of the order of the stream.
>
> How about this as a first goal?
>
> On Wed, Apr 8, 2015 at 2:50 PM, Bruno Cadonna <
> [hidden email]> wrote:
>
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA1
> >
> > Hi Stephan,
> >
> > how much of CEP depends on fully ordered streams depends on the
> > operators that you use in the pattern query. But in general, they need
> > fully ordered events within a window or at least some strategies to
> > deal with out-of-order events.
> >
> > If I got it right, you propose windows that are build on the
> > occurrence time of the event, i.e., the time when the event occurred
> > in the real world, and then to close a window when a punctuation says
> > that the window is complete.
> >
> > I agree with you that with such windows, you can do a lot and decrease
> > overhead for maintainance. For example, some aggregations like sum,
> > count, avg do not need ordered events in the window, they just need
> > complete windows that are ordered with respect to each other to be
> > deterministic. However, if the windows overlap you need again the time
> > order to evict the oldest events from a window.
> >
> > Cheers,
> > Bruno
> >
> >
> > On 08.04.2015 13:30, Stephan Ewen wrote:
> > > I agree, any ordering guarantees would need to be actively
> > > enabled.
> > >
> > > How much of CEP depends on fully ordered streams? There is a lot
> > > you can do with windows on event time, which are triggered by
> > > punctuations.
> > >
> > > This is like a "soft" variant of the ordered streams, where order
> > > relation occurs only with between windows, rather than between all
> > > events. That makes it much cheaper to maintain.
> > >
> > > On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax <
> > > [hidden email]> wrote:
> > >
> > >> This reasoning makes absolutely sense. That's why I suggested,
> > >> that the user should actively choose ordered data processing...
> > >>
> > >> About deadlocks: Those can be avoided, if the buffers are
> > >> consumed continuously in an in-memory merge buffer (maybe with
> > >> spilling to disk if necessary). Of course, latency might suffer
> > >> and punctuations should be used to relax this problem.
> > >>
> > >> As far as I understood, CEP might be an interesting use-case for
> > >> Flink, and it depend on ordered data streams.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
> > >>> Here is the state in Flink and why we have chosen not to do
> > >>> global
> > >> ordering
> > >>> at the moment:
> > >>>
> > >>> - Individual streams are FIFO, that means if the sender emits
> > >>> in order, the receiver receives in order.
> > >>>
> > >>> - When streams are merged (think shuffle / partition-by), then
> > >>> the
> > >> streams
> > >>> are not merged, but buffers from the streams are taken as the
> > >>> come in.
> > >>>
> > >>> - We had a version that merged streams (for sort merging in
> > >>> batch programs, actually) long ago, an it performed either
> > >>> horribly or deadlocked. The reason is that all streams are
> > >>> always stalled if a buffer is missing from one stream, since
> > >>> the merge cannot continue in such a
> > >> case
> > >>> (head-of-the-line waiting). That backpressures streams
> > >>> unnecessarily, slowing down computation. If the streams depend
> > >>> mutually on each other (think two partitioning steps), they
> > >>> frequently dadlock completely.
> > >>>
> > >>> - The only way to do that is by stalling/buffering/punctuating
> > >>> streams continuously, which is a lot of work to implement and
> > >>> will definitely
> > >> cost
> > >>> performance.
> > >>>
> > >>> Therefore we have decided going for a simpler model without
> > >>> global
> > >> ordering
> > >>> for now. If we start seeing that this has sever limitations in
> > >>> practice,
> > >> we
> > >>> may reconsider that.
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
> > >>> [hidden email]> wrote:
> > >>>
> > >>> Hi Paris,
> > >>>
> > >>> what's the reason for not guaranteeing global ordering across
> > >>> partitions in the stream model? Is it the smaller overhead or
> > >>> are there any operations not computable in a distributed
> > >>> environment with global ordering?
> > >>>
> > >>> In any case, I agree with Matthias that the user should choose.
> > >>> If operations were not computable with a global ordering, I
> > >>> would restrict the set of operations for that mode.
> > >>>
> > >>> Maybe, it would also be helpful to collect use cases for each
> > >>> of the modes proposed by Matthias to understand the
> > >>> requirements for both modes.
> > >>>
> > >>> Some (researchy) thoughts about indeterminism: How can the
> > >>> indeterminism of the current setting be quantified? How "large"
> > >>> can it grow with the current setting? Are there any limits that
> > >>> can be guaranteed?
> > >>>
> > >>> Cheers, Bruno
> > >>>
> > >>> On 07.04.2015 12:38, Paris Carbone wrote:
> > >>>>>> Hello Matthias,
> > >>>>>>
> > >>>>>> Sure, ordering guarantees are indeed a tricky thing, I
> > >>>>>> recall having that discussion back in TU Berlin. Bear in
> > >>>>>> mind thought that DataStream, our abstract data type,
> > >>>>>> represents a *partitioned* unbounded sequence of events.
> > >>>>>> There are no *global* ordering guarantees made whatsoever
> > >>>>>> in that model across partitions. If you see it more
> > >>>>>> generally there are many “race conditions” in a
> > >>>>>> distributed execution graph of vertices that process
> > >>>>>> multiple inputs asynchronously, especially when you add
> > >>>>>> joins and iterations into the mix (how do you deal with
> > >>>>>> reprocessing “old” tuples that iterate in the graph). Btw
> > >>>>>> have you checked the Naiad paper [1]? Stephan cited a
> > >>>>>> while ago and it is quite relevant to that discussion.
> > >>>>>>
> > >>>>>> Also, can you cite the paper with the joining semantics
> > >>>>>> you are referring to? That would be of good help I
> > >>>>>> think.
> > >>>>>>
> > >>>>>> Paris
> > >>>>>>
> > >>>>>> [1]
> > >>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
> > >>>>>>
> > >>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
> > >>>>>> Apr 2015, at 11:50, Matthias J. Sax
> > >>>>>> <[hidden email]<mailto:
> [hidden email]
> > >>
> > >>>>>>
> > >>>>>>
> > wrote:
> > >>>>>>
> > >>>>>> Hi @all,
> > >>>>>>
> > >>>>>> please keep me in the loop for this work. I am highly
> > >>>>>> interested and I want to help on it.
> > >>>>>>
> > >>>>>> My initial thoughts are as follows:
> > >>>>>>
> > >>>>>> 1) Currently, system timestamps are used and the
> > >>>>>> suggested approach can be seen as state-of-the-art (there
> > >>>>>> is actually a research paper using the exact same join
> > >>>>>> semantic). Of course, the current approach is inherently
> > >>>>>> non-deterministic. The advantage is, that there is no
> > >>>>>> overhead in keeping track of the order of records and the
> > >>>>>> latency should be very low. (Additionally, state-recovery
> > >>>>>> is simplified. Because, the processing in inherently
> > >>>>>> non-deterministic, recovery can be done with relaxed
> > >>>>>> guarantees).
> > >>>>>>
> > >>>>>> 2) The user should be able to "switch on" deterministic
> > >>>>>> processing, ie, records are timestamped (either
> > >>>>>> externally when generated, or timestamped at the
> > >>>>>> sources). Because deterministic processing adds some
> > >>>>>> overhead, the user should decide for it actively. In this
> > >>>>>> case, the order must be preserved in each re-distribution
> > >>>>>> step (merging is sufficient, if order is preserved within
> > >>>>>> each incoming channel). Furthermore, deterministic
> > >>>>>> processing can be achieved by sound window semantics (and
> > >>>>>> there is a bunch of them). Even for single-stream-windows
> > >>>>>> it's a tricky problem; for join-windows it's even harder.
> > >>>>>> From my point of view, it is less important which
> > >>>>>> semantics are chosen; however, the user must be aware how
> > >>>>>> it works. The most tricky part for deterministic
> > >>>>>> processing, is to deal with duplicate timestamps (which
> > >>>>>> cannot be avoided). The timestamping for (intermediate)
> > >>>>>> result tuples, is also an important question to be
> > >>>>>> answered.
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
> > >>>>>>
> > >>>>>> I agree with Kostas, if we define the exact semantics how
> > >>>>>> this works, this is not more ad-hoc than any other
> > >>>>>> stateful operator with multiple inputs. (And I don't
> > >>>>>> think any other system support something similar)
> > >>>>>>
> > >>>>>> We need to make some design choices that are similar to
> > >>>>>> the issues we had for windowing. We need to chose how we
> > >>>>>> want to evaluate the windowing policies (global or local)
> > >>>>>> because that affects what kind of policies can be
> > >>>>>> parallel, but I can work on these things.
> > >>>>>>
> > >>>>>> I think this is an amazing feature, so I wouldn't
> > >>>>>> necessarily rush the implementation for 0.9 though.
> > >>>>>>
> > >>>>>> And thanks for helping writing these down.
> > >>>>>>
> > >>>>>> Gyula
> > >>>>>>
> > >>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
> > >>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
> > >>>>>>
> > >>>>>> Yes, we should write these semantics down. I volunteer to
> > >>>>>> help.
> > >>>>>>
> > >>>>>> I don't think that this is very ad-hoc. The semantics are
> > >>>>>> basically the following. Assuming an arriving element
> > >>>>>> from the left side: (1) We find the right-side matches
> > >>>>>> (2) We insert the left-side arrival into the left window
> > >>>>>> (3) We recompute the left window We need to see whether
> > >>>>>> right window re-computation needs to be triggered as
> > >>>>>> well. I think that this way of joining streams is also
> > >>>>>> what the symmetric hash join algorithms were meant to
> > >>>>>> support.
> > >>>>>>
> > >>>>>> Kostas
> > >>>>>>
> > >>>>>>
> > >>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
> > >>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
> > >>>>>>
> > >>>>>> Is the approach of joining an element at a time from one
> > >>>>>> input against a window on the other input not a bit
> > >>>>>> arbitrary?
> > >>>>>>
> > >>>>>> This just joins whatever currently happens to be the
> > >>>>>> window by the time the single element arrives - that is a
> > >>>>>> bit non-predictable, right?
> > >>>>>>
> > >>>>>> As a more general point: The whole semantics of windowing
> > >>>>>> and when they are triggered are a bit ad-hoc now. It
> > >>>>>> would be really good to start formalizing that a bit and
> > >>>>>> put it down somewhere. Users need to be able to clearly
> > >>>>>> understand and how to predict the output.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
> > >>>>>> <[hidden email]<mailto:[hidden email]>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>> I think it should be possible to make this compatible
> > >>>>>> with the .window().every() calls. Maybe if there is some
> > >>>>>> trigger set in "every" we would not join that stream 1 by
> > >>>>>> 1 but every so many elements. The problem here is that
> > >>>>>> the window and every in this case are very-very different
> > >>>>>> than the normal windowing semantics. The window would
> > >>>>>> define the join window for each element of the other
> > >>>>>> stream while every would define how often I join This
> > >>>>>> stream with the other one.
> > >>>>>>
> > >>>>>> We need to think to make this intuitive.
> > >>>>>>
> > >>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> > >>>>>> [hidden email]<mailto:[hidden email]>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>> That would be really neat, the problem I see there, that
> > >>>>>> we do not distinguish between dataStream.window() and
> > >>>>>> dataStream.window().every() currently, they both return
> > >>>>>> WindowedDataStreams and TriggerPolicies of the every call
> > >>>>>> do not make much sense in this setting (in fact
> > >>>>>> practically the trigger is always set to count of one).
> > >>>>>>
> > >>>>>> But of course we could make it in a way, that we check
> > >>>>>> that the eviction should be either null or count of 1, in
> > >>>>>> every other case we throw an exception while building the
> > >>>>>> JobGraph.
> > >>>>>>
> > >>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> > >>>>>> [hidden email]<mailto:[hidden email]>> wrote:
> > >>>>>>
> > >>>>>> Or you could define it like this:
> > >>>>>>
> > >>>>>> stream_A = a.window(...) stream_B = b.window(...)
> > >>>>>>
> > >>>>>> stream_A.join(stream_B).where().equals().with()
> > >>>>>>
> > >>>>>> So a join would just be a join of two
> > >>>>>> WindowedDataStreamS. This would neatly move the windowing
> > >>>>>> stuff into one place.
> > >>>>>>
> > >>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> > >>>>>> [hidden email]<mailto:[hidden email]>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
> > >>>>>> bringing the windowing and window join API in sync.
> > >>>>>>
> > >>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
> > >>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
> > >>>>>>
> > >>>>>> Hey guys,
> > >>>>>>
> > >>>>>> As Aljoscha has highlighted earlier the current window
> > >>>>>> join semantics in the streaming api doesn't follow the
> > >>>>>> changes in the windowing api. More precisely, we
> > >>>>>> currently only support joins over time windows of equal
> > >>>>>> size on both streams. The reason for this is that we now
> > >>>>>> take a window of each of the two streams and do joins
> > >>>>>> over these pairs. This would be a blocking operation if
> > >>>>>> the windows are not closed at exactly the same time (and
> > >>>>>> since we dont want this we only allow time windows)
> > >>>>>>
> > >>>>>> I talked with Peter who came up with the initial idea of
> > >>>>>> an alternative approach for stream joins which works as
> > >>>>>> follows:
> > >>>>>>
> > >>>>>> Instead of pairing windows for joins, we do element
> > >>>>>> against window joins. What this means is that whenever we
> > >>>>>> receive an element from one of the streams, we join this
> > >>>>>> element with the current window(this window is constantly
> > >>>>>> updated) of the other stream. This is non-blocking on any
> > >>>>>> window definitions as we dont have to wait for windows to
> > >>>>>> be completed and we can use this with any of our
> > >>>>>> predefined policies like Time.of(...), Count.of(...),
> > >>>>>> Delta.of(....).
> > >>>>>>
> > >>>>>> Additionally this also allows some very flexible way of
> > >>>>>> defining window joins. With this we could also define
> > >>>>>> grouped windowing inside if a join. An example of this
> > >>>>>> would be: Join all elements of Stream1 with the last 5
> > >>>>>> elements by a given windowkey of Stream2 on some join
> > >>>>>> key.
> > >>>>>>
> > >>>>>> This feature can be easily implemented over the current
> > >>>>>> operators, so I already have a working prototype for the
> > >>>>>> simple non-grouped case. My only concern is the API, the
> > >>>>>> best thing I could come up with is something like this:
> > >>>>>>
> > >>>>>> stream_A.join(stream_B).onWindow(windowDefA,
> > >>>>>> windowDefB).by(windowKey1,
> > >>>>>> windowKey2).where(...).equalTo(...).with(...)
> > >>>>>>
> > >>>>>> (the user can omit the "by" and "with" calls)
> > >>>>>>
> > >>>>>> I think this new approach would be worthy of our
> > >>>>>> "flexible windowing" in contrast with the current
> > >>>>>> approach.
> > >>>>>>
> > >>>>>> Regards, Gyula
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> > - --
> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> >
> >   Dr. Bruno Cadonna
> >   Postdoctoral Researcher
> >
> >   Databases and Information Systems
> >   Department of Computer Science
> >   Humboldt-Universität zu Berlin
> >
> >   http://www.informatik.hu-berlin.de/~cadonnab
> >
> > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> > -----BEGIN PGP SIGNATURE-----
> > Version: GnuPG v1.4.11 (GNU/Linux)
> >
> > iQEcBAEBAgAGBQJVJSP5AAoJEKdCIJx7flKwCPcH/2Hv9QsEWdTVTeAEdt7fdVRu
> > NVck4h56tZ9PNlP8Ir17QYAQ+KEDTmFwKNxj+rvCdzTs7Q1CAmD2EQ0LJ80BI4FX
> > cpp+AV2dRj+hzMAtq4tgak6qS9vHhlSetsmj56xV4Ea5IhcunyViHNMWn0+nbzu0
> > aoPCKKVNmODCwPB8UBqN4eFWjjxF0aUwPisp7goUECgbEJkvGKrknWhlwVsKSft3
> > IXMlQ4gKONNguW0Pg7abU7tD2brarQOa/keVNHWddPgJeZJCrs3v1cLCthbLjPpb
> > aXULfQYiszch+Us4HGddiOEDoizpQdWcydzfRoIhDVXQCftqC8ftHzWgkEW8Mk8=
> > =asVX
> > -----END PGP SIGNATURE-----
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Matthias J. Sax
I started to work on an in-memory merge on a record-timestamp attribute
for total ordered streams. But I got distracted by the Storm
compatibility layer... I will continue to work on it, when I find some
extra time ;)

On 04/08/2015 03:18 PM, Márton Balassi wrote:

> +1 for Stephan's suggestion.
>
> If we would like to support event time and also sorting inside a window we
> should carefully consider where to actually put the timestamp of the
> records. If the timestamp is part of the record then it is more
> straight-forward, but in case of we assign the timestamps in our sources
> the initial idea was to keep these hidden from the user and only use it in
> the network layer.
>
> The more extreme solution with total ordering has a JIRA, but has been a
> bit silent lately. [1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-1493?
>
> On Wed, Apr 8, 2015 at 3:06 PM, Stephan Ewen <[hidden email]> wrote:
>
>> With the current network layer and the agenda we have for windowing, we
>> should be able to support widows on event time this in the near future.
>> Inside the window, you can sort all records by time and have a full
>> ordering. That is independent of the order of the stream.
>>
>> How about this as a first goal?
>>
>> On Wed, Apr 8, 2015 at 2:50 PM, Bruno Cadonna <
>> [hidden email]> wrote:
>>
> Hi Stephan,
>
> how much of CEP depends on fully ordered streams depends on the
> operators that you use in the pattern query. But in general, they need
> fully ordered events within a window or at least some strategies to
> deal with out-of-order events.
>
> If I got it right, you propose windows that are build on the
> occurrence time of the event, i.e., the time when the event occurred
> in the real world, and then to close a window when a punctuation says
> that the window is complete.
>
> I agree with you that with such windows, you can do a lot and decrease
> overhead for maintainance. For example, some aggregations like sum,
> count, avg do not need ordered events in the window, they just need
> complete windows that are ordered with respect to each other to be
> deterministic. However, if the windows overlap you need again the time
> order to evict the oldest events from a window.
>
> Cheers,
> Bruno
>
>
> On 08.04.2015 13:30, Stephan Ewen wrote:
>>>>> I agree, any ordering guarantees would need to be actively
>>>>> enabled.
>>>>>
>>>>> How much of CEP depends on fully ordered streams? There is a lot
>>>>> you can do with windows on event time, which are triggered by
>>>>> punctuations.
>>>>>
>>>>> This is like a "soft" variant of the ordered streams, where order
>>>>> relation occurs only with between windows, rather than between all
>>>>> events. That makes it much cheaper to maintain.
>>>>>
>>>>> On Wed, Apr 8, 2015 at 1:19 PM, Matthias J. Sax <
>>>>> [hidden email]> wrote:
>>>>>
>>>>>> This reasoning makes absolutely sense. That's why I suggested,
>>>>>> that the user should actively choose ordered data processing...
>>>>>>
>>>>>> About deadlocks: Those can be avoided, if the buffers are
>>>>>> consumed continuously in an in-memory merge buffer (maybe with
>>>>>> spilling to disk if necessary). Of course, latency might suffer
>>>>>> and punctuations should be used to relax this problem.
>>>>>>
>>>>>> As far as I understood, CEP might be an interesting use-case for
>>>>>> Flink, and it depend on ordered data streams.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 04/08/2015 11:50 AM, Stephan Ewen wrote:
>>>>>>> Here is the state in Flink and why we have chosen not to do
>>>>>>> global
>>>>>> ordering
>>>>>>> at the moment:
>>>>>>>
>>>>>>> - Individual streams are FIFO, that means if the sender emits
>>>>>>> in order, the receiver receives in order.
>>>>>>>
>>>>>>> - When streams are merged (think shuffle / partition-by), then
>>>>>>> the
>>>>>> streams
>>>>>>> are not merged, but buffers from the streams are taken as the
>>>>>>> come in.
>>>>>>>
>>>>>>> - We had a version that merged streams (for sort merging in
>>>>>>> batch programs, actually) long ago, an it performed either
>>>>>>> horribly or deadlocked. The reason is that all streams are
>>>>>>> always stalled if a buffer is missing from one stream, since
>>>>>>> the merge cannot continue in such a
>>>>>> case
>>>>>>> (head-of-the-line waiting). That backpressures streams
>>>>>>> unnecessarily, slowing down computation. If the streams depend
>>>>>>> mutually on each other (think two partitioning steps), they
>>>>>>> frequently dadlock completely.
>>>>>>>
>>>>>>> - The only way to do that is by stalling/buffering/punctuating
>>>>>>> streams continuously, which is a lot of work to implement and
>>>>>>> will definitely
>>>>>> cost
>>>>>>> performance.
>>>>>>>
>>>>>>> Therefore we have decided going for a simpler model without
>>>>>>> global
>>>>>> ordering
>>>>>>> for now. If we start seeing that this has sever limitations in
>>>>>>> practice,
>>>>>> we
>>>>>>> may reconsider that.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 8, 2015 at 11:25 AM, Bruno Cadonna <
>>>>>>> [hidden email]> wrote:
>>>>>>>
>>>>>>> Hi Paris,
>>>>>>>
>>>>>>> what's the reason for not guaranteeing global ordering across
>>>>>>> partitions in the stream model? Is it the smaller overhead or
>>>>>>> are there any operations not computable in a distributed
>>>>>>> environment with global ordering?
>>>>>>>
>>>>>>> In any case, I agree with Matthias that the user should choose.
>>>>>>> If operations were not computable with a global ordering, I
>>>>>>> would restrict the set of operations for that mode.
>>>>>>>
>>>>>>> Maybe, it would also be helpful to collect use cases for each
>>>>>>> of the modes proposed by Matthias to understand the
>>>>>>> requirements for both modes.
>>>>>>>
>>>>>>> Some (researchy) thoughts about indeterminism: How can the
>>>>>>> indeterminism of the current setting be quantified? How "large"
>>>>>>> can it grow with the current setting? Are there any limits that
>>>>>>> can be guaranteed?
>>>>>>>
>>>>>>> Cheers, Bruno
>>>>>>>
>>>>>>> On 07.04.2015 12:38, Paris Carbone wrote:
>>>>>>>>>> Hello Matthias,
>>>>>>>>>>
>>>>>>>>>> Sure, ordering guarantees are indeed a tricky thing, I
>>>>>>>>>> recall having that discussion back in TU Berlin. Bear in
>>>>>>>>>> mind thought that DataStream, our abstract data type,
>>>>>>>>>> represents a *partitioned* unbounded sequence of events.
>>>>>>>>>> There are no *global* ordering guarantees made whatsoever
>>>>>>>>>> in that model across partitions. If you see it more
>>>>>>>>>> generally there are many “race conditions” in a
>>>>>>>>>> distributed execution graph of vertices that process
>>>>>>>>>> multiple inputs asynchronously, especially when you add
>>>>>>>>>> joins and iterations into the mix (how do you deal with
>>>>>>>>>> reprocessing “old” tuples that iterate in the graph). Btw
>>>>>>>>>> have you checked the Naiad paper [1]? Stephan cited a
>>>>>>>>>> while ago and it is quite relevant to that discussion.
>>>>>>>>>>
>>>>>>>>>> Also, can you cite the paper with the joining semantics
>>>>>>>>>> you are referring to? That would be of good help I
>>>>>>>>>> think.
>>>>>>>>>>
>>>>>>>>>> Paris
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>>>>>>>
>>>>>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> On 07
>>>>>>>>>> Apr 2015, at 11:50, Matthias J. Sax
>>>>>>>>>> <[hidden email]<mailto:
>>> [hidden email]
>>>>>>
>>>>>>>>>>
>>>>>>>>>>
> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi @all,
>>>>>>>>>>
>>>>>>>>>> please keep me in the loop for this work. I am highly
>>>>>>>>>> interested and I want to help on it.
>>>>>>>>>>
>>>>>>>>>> My initial thoughts are as follows:
>>>>>>>>>>
>>>>>>>>>> 1) Currently, system timestamps are used and the
>>>>>>>>>> suggested approach can be seen as state-of-the-art (there
>>>>>>>>>> is actually a research paper using the exact same join
>>>>>>>>>> semantic). Of course, the current approach is inherently
>>>>>>>>>> non-deterministic. The advantage is, that there is no
>>>>>>>>>> overhead in keeping track of the order of records and the
>>>>>>>>>> latency should be very low. (Additionally, state-recovery
>>>>>>>>>> is simplified. Because, the processing in inherently
>>>>>>>>>> non-deterministic, recovery can be done with relaxed
>>>>>>>>>> guarantees).
>>>>>>>>>>
>>>>>>>>>> 2) The user should be able to "switch on" deterministic
>>>>>>>>>> processing, ie, records are timestamped (either
>>>>>>>>>> externally when generated, or timestamped at the
>>>>>>>>>> sources). Because deterministic processing adds some
>>>>>>>>>> overhead, the user should decide for it actively. In this
>>>>>>>>>> case, the order must be preserved in each re-distribution
>>>>>>>>>> step (merging is sufficient, if order is preserved within
>>>>>>>>>> each incoming channel). Furthermore, deterministic
>>>>>>>>>> processing can be achieved by sound window semantics (and
>>>>>>>>>> there is a bunch of them). Even for single-stream-windows
>>>>>>>>>> it's a tricky problem; for join-windows it's even harder.
>>>>>>>>>> From my point of view, it is less important which
>>>>>>>>>> semantics are chosen; however, the user must be aware how
>>>>>>>>>> it works. The most tricky part for deterministic
>>>>>>>>>> processing, is to deal with duplicate timestamps (which
>>>>>>>>>> cannot be avoided). The timestamping for (intermediate)
>>>>>>>>>> result tuples, is also an important question to be
>>>>>>>>>> answered.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey,
>>>>>>>>>>
>>>>>>>>>> I agree with Kostas, if we define the exact semantics how
>>>>>>>>>> this works, this is not more ad-hoc than any other
>>>>>>>>>> stateful operator with multiple inputs. (And I don't
>>>>>>>>>> think any other system support something similar)
>>>>>>>>>>
>>>>>>>>>> We need to make some design choices that are similar to
>>>>>>>>>> the issues we had for windowing. We need to chose how we
>>>>>>>>>> want to evaluate the windowing policies (global or local)
>>>>>>>>>> because that affects what kind of policies can be
>>>>>>>>>> parallel, but I can work on these things.
>>>>>>>>>>
>>>>>>>>>> I think this is an amazing feature, so I wouldn't
>>>>>>>>>> necessarily rush the implementation for 0.9 though.
>>>>>>>>>>
>>>>>>>>>> And thanks for helping writing these down.
>>>>>>>>>>
>>>>>>>>>> Gyula
>>>>>>>>>>
>>>>>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas
>>>>>>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>>>>>>>
>>>>>>>>>> Yes, we should write these semantics down. I volunteer to
>>>>>>>>>> help.
>>>>>>>>>>
>>>>>>>>>> I don't think that this is very ad-hoc. The semantics are
>>>>>>>>>> basically the following. Assuming an arriving element
>>>>>>>>>> from the left side: (1) We find the right-side matches
>>>>>>>>>> (2) We insert the left-side arrival into the left window
>>>>>>>>>> (3) We recompute the left window We need to see whether
>>>>>>>>>> right window re-computation needs to be triggered as
>>>>>>>>>> well. I think that this way of joining streams is also
>>>>>>>>>> what the symmetric hash join algorithms were meant to
>>>>>>>>>> support.
>>>>>>>>>>
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen
>>>>>>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>>>>>>>
>>>>>>>>>> Is the approach of joining an element at a time from one
>>>>>>>>>> input against a window on the other input not a bit
>>>>>>>>>> arbitrary?
>>>>>>>>>>
>>>>>>>>>> This just joins whatever currently happens to be the
>>>>>>>>>> window by the time the single element arrives - that is a
>>>>>>>>>> bit non-predictable, right?
>>>>>>>>>>
>>>>>>>>>> As a more general point: The whole semantics of windowing
>>>>>>>>>> and when they are triggered are a bit ad-hoc now. It
>>>>>>>>>> would be really good to start formalizing that a bit and
>>>>>>>>>> put it down somewhere. Users need to be able to clearly
>>>>>>>>>> understand and how to predict the output.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra
>>>>>>>>>> <[hidden email]<mailto:[hidden email]>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> I think it should be possible to make this compatible
>>>>>>>>>> with the .window().every() calls. Maybe if there is some
>>>>>>>>>> trigger set in "every" we would not join that stream 1 by
>>>>>>>>>> 1 but every so many elements. The problem here is that
>>>>>>>>>> the window and every in this case are very-very different
>>>>>>>>>> than the normal windowing semantics. The window would
>>>>>>>>>> define the join window for each element of the other
>>>>>>>>>> stream while every would define how often I join This
>>>>>>>>>> stream with the other one.
>>>>>>>>>>
>>>>>>>>>> We need to think to make this intuitive.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
>>>>>>>>>> [hidden email]<mailto:[hidden email]>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> That would be really neat, the problem I see there, that
>>>>>>>>>> we do not distinguish between dataStream.window() and
>>>>>>>>>> dataStream.window().every() currently, they both return
>>>>>>>>>> WindowedDataStreams and TriggerPolicies of the every call
>>>>>>>>>> do not make much sense in this setting (in fact
>>>>>>>>>> practically the trigger is always set to count of one).
>>>>>>>>>>
>>>>>>>>>> But of course we could make it in a way, that we check
>>>>>>>>>> that the eviction should be either null or count of 1, in
>>>>>>>>>> every other case we throw an exception while building the
>>>>>>>>>> JobGraph.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
>>>>>>>>>> [hidden email]<mailto:[hidden email]>> wrote:
>>>>>>>>>>
>>>>>>>>>> Or you could define it like this:
>>>>>>>>>>
>>>>>>>>>> stream_A = a.window(...) stream_B = b.window(...)
>>>>>>>>>>
>>>>>>>>>> stream_A.join(stream_B).where().equals().with()
>>>>>>>>>>
>>>>>>>>>> So a join would just be a join of two
>>>>>>>>>> WindowedDataStreamS. This would neatly move the windowing
>>>>>>>>>> stuff into one place.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>>>>>>>> [hidden email]<mailto:[hidden email]>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
> wrote: Big +1 for the proposal for Peter and Gyula. I'm really for
>>>>>>>>>> bringing the windowing and window join API in sync.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra
>>>>>>>>>> <[hidden email]<mailto:[hidden email]>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hey guys,
>>>>>>>>>>
>>>>>>>>>> As Aljoscha has highlighted earlier the current window
>>>>>>>>>> join semantics in the streaming api doesn't follow the
>>>>>>>>>> changes in the windowing api. More precisely, we
>>>>>>>>>> currently only support joins over time windows of equal
>>>>>>>>>> size on both streams. The reason for this is that we now
>>>>>>>>>> take a window of each of the two streams and do joins
>>>>>>>>>> over these pairs. This would be a blocking operation if
>>>>>>>>>> the windows are not closed at exactly the same time (and
>>>>>>>>>> since we dont want this we only allow time windows)
>>>>>>>>>>
>>>>>>>>>> I talked with Peter who came up with the initial idea of
>>>>>>>>>> an alternative approach for stream joins which works as
>>>>>>>>>> follows:
>>>>>>>>>>
>>>>>>>>>> Instead of pairing windows for joins, we do element
>>>>>>>>>> against window joins. What this means is that whenever we
>>>>>>>>>> receive an element from one of the streams, we join this
>>>>>>>>>> element with the current window(this window is constantly
>>>>>>>>>> updated) of the other stream. This is non-blocking on any
>>>>>>>>>> window definitions as we dont have to wait for windows to
>>>>>>>>>> be completed and we can use this with any of our
>>>>>>>>>> predefined policies like Time.of(...), Count.of(...),
>>>>>>>>>> Delta.of(....).
>>>>>>>>>>
>>>>>>>>>> Additionally this also allows some very flexible way of
>>>>>>>>>> defining window joins. With this we could also define
>>>>>>>>>> grouped windowing inside if a join. An example of this
>>>>>>>>>> would be: Join all elements of Stream1 with the last 5
>>>>>>>>>> elements by a given windowkey of Stream2 on some join
>>>>>>>>>> key.
>>>>>>>>>>
>>>>>>>>>> This feature can be easily implemented over the current
>>>>>>>>>> operators, so I already have a working prototype for the
>>>>>>>>>> simple non-grouped case. My only concern is the API, the
>>>>>>>>>> best thing I could come up with is something like this:
>>>>>>>>>>
>>>>>>>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>>>>>>>> windowDefB).by(windowKey1,
>>>>>>>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>>>>>>>
>>>>>>>>>> (the user can omit the "by" and "with" calls)
>>>>>>>>>>
>>>>>>>>>> I think this new approach would be worthy of our
>>>>>>>>>> "flexible windowing" in contrast with the current
>>>>>>>>>> approach.
>>>>>>>>>>
>>>>>>>>>> Regards, Gyula
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>
>>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Rework of the window-join semantics

Matthias J. Sax
In reply to this post by Paris Carbone
Hi Paris,

thanks for the pointer to the Naiad paper. That is quite interesting.

The paper I mentioned [1], does not describe the semantics in detail; it
is more about the implementation for the stream-joins. However, it uses
the same semantics (from my understanding) as proposed by Gyula.

-Matthias

[1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded
Streams". VLDB 2002.



On 04/07/2015 12:38 PM, Paris Carbone wrote:

> Hello Matthias,
>
> Sure, ordering guarantees are indeed a tricky thing, I recall having that discussion back in TU Berlin. Bear in mind thought that DataStream, our abstract data type, represents a *partitioned* unbounded sequence of events. There are no *global* ordering guarantees made whatsoever in that model across partitions. If you see it more generally there are many “race conditions” in a distributed execution graph of vertices that process multiple inputs asynchronously, especially when you add joins and iterations into the mix (how do you deal with reprocessing “old” tuples that iterate in the graph). Btw have you checked the Naiad paper [1]? Stephan cited a while ago and it is quite relevant to that discussion.
>
> Also, can you cite the paper with the joining semantics you are referring to? That would be of good help I think.
>
> Paris
>
> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>
> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
> On 07 Apr 2015, at 11:50, Matthias J. Sax <[hidden email]<mailto:[hidden email]>> wrote:
>
> Hi @all,
>
> please keep me in the loop for this work. I am highly interested and I
> want to help on it.
>
> My initial thoughts are as follows:
>
> 1) Currently, system timestamps are used and the suggested approach can
> be seen as state-of-the-art (there is actually a research paper using
> the exact same join semantic). Of course, the current approach is
> inherently non-deterministic. The advantage is, that there is no
> overhead in keeping track of the order of records and the latency should
> be very low. (Additionally, state-recovery is simplified. Because, the
> processing in inherently non-deterministic, recovery can be done with
> relaxed guarantees).
>
>  2) The user should be able to "switch on" deterministic processing,
> ie, records are timestamped (either externally when generated, or
> timestamped at the sources). Because deterministic processing adds some
> overhead, the user should decide for it actively.
> In this case, the order must be preserved in each re-distribution step
> (merging is sufficient, if order is preserved within each incoming
> channel). Furthermore, deterministic processing can be achieved by sound
> window semantics (and there is a bunch of them). Even for
> single-stream-windows it's a tricky problem; for join-windows it's even
> harder. From my point of view, it is less important which semantics are
> chosen; however, the user must be aware how it works. The most tricky
> part for deterministic processing, is to deal with duplicate timestamps
> (which cannot be avoided). The timestamping for (intermediate) result
> tuples, is also an important question to be answered.
>
>
> -Matthias
>
>
> On 04/07/2015 11:37 AM, Gyula Fóra wrote:
> Hey,
>
> I agree with Kostas, if we define the exact semantics how this works, this
> is not more ad-hoc than any other stateful operator with multiple inputs.
> (And I don't think any other system support something similar)
>
> We need to make some design choices that are similar to the issues we had
> for windowing. We need to chose how we want to evaluate the windowing
> policies (global or local) because that affects what kind of policies can
> be parallel, but I can work on these things.
>
> I think this is an amazing feature, so I wouldn't necessarily rush the
> implementation for 0.9 though.
>
> And thanks for helping writing these down.
>
> Gyula
>
> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <[hidden email]<mailto:[hidden email]>> wrote:
>
> Yes, we should write these semantics down. I volunteer to help.
>
> I don't think that this is very ad-hoc. The semantics are basically the
> following. Assuming an arriving element from the left side:
> (1) We find the right-side matches
> (2) We insert the left-side arrival into the left window
> (3) We recompute the left window
> We need to see whether right window re-computation needs to be triggered as
> well. I think that this way of joining streams is also what the symmetric
> hash join algorithms were meant to support.
>
> Kostas
>
>
> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <[hidden email]<mailto:[hidden email]>> wrote:
>
> Is the approach of joining an element at a time from one input against a
> window on the other input not a bit arbitrary?
>
> This just joins whatever currently happens to be the window by the time
> the
> single element arrives - that is a bit non-predictable, right?
>
> As a more general point: The whole semantics of windowing and when they
> are
> triggered are a bit ad-hoc now. It would be really good to start
> formalizing that a bit and
> put it down somewhere. Users need to be able to clearly understand and
> how
> to predict the output.
>
>
>
> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <[hidden email]<mailto:[hidden email]>>
> wrote:
>
> I think it should be possible to make this compatible with the
> .window().every() calls. Maybe if there is some trigger set in "every"
> we
> would not join that stream 1 by 1 but every so many elements. The
> problem
> here is that the window and every in this case are very-very different
> than
> the normal windowing semantics. The window would define the join window
> for
> each element of the other stream while every would define how often I
> join
> This stream with the other one.
>
> We need to think to make this intuitive.
>
> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
> [hidden email]<mailto:[hidden email]>>
> wrote:
>
> That would be really neat, the problem I see there, that we do not
> distinguish between dataStream.window() and
> dataStream.window().every()
> currently, they both return WindowedDataStreams and TriggerPolicies
> of
> the
> every call do not make much sense in this setting (in fact
> practically
> the
> trigger is always set to count of one).
>
> But of course we could make it in a way, that we check that the
> eviction
> should be either null or count of 1, in every other case we throw an
> exception while building the JobGraph.
>
> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
> [hidden email]<mailto:[hidden email]>>
> wrote:
>
> Or you could define it like this:
>
> stream_A = a.window(...)
> stream_B = b.window(...)
>
> stream_A.join(stream_B).where().equals().with()
>
> So a join would just be a join of two WindowedDataStreamS. This
> would
> neatly move the windowing stuff into one place.
>
> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
> [hidden email]<mailto:[hidden email]>
>
> wrote:
> Big +1 for the proposal for Peter and Gyula. I'm really for
> bringing
> the
> windowing and window join API in sync.
>
> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <[hidden email]<mailto:[hidden email]>>
> wrote:
>
> Hey guys,
>
> As Aljoscha has highlighted earlier the current window join
> semantics
> in
> the streaming api doesn't follow the changes in the windowing
> api.
> More
> precisely, we currently only support joins over time windows of
> equal
> size
> on both streams. The reason for this is that we now take a
> window
> of
> each
> of the two streams and do joins over these pairs. This would be
> a
> blocking
> operation if the windows are not closed at exactly the same time
> (and
> since
> we dont want this we only allow time windows)
>
> I talked with Peter who came up with the initial idea of an
> alternative
> approach for stream joins which works as follows:
>
> Instead of pairing windows for joins, we do element against
> window
> joins.
> What this means is that whenever we receive an element from one
> of
> the
> streams, we join this element with the current window(this
> window
> is
> constantly updated) of the other stream. This is non-blocking on
> any
> window
> definitions as we dont have to wait for windows to be completed
> and
> we
> can
> use this with any of our predefined policies like Time.of(...),
> Count.of(...), Delta.of(....).
>
> Additionally this also allows some very flexible way of defining
> window
> joins. With this we could also define grouped windowing inside
> if
> a
> join.
> An example of this would be: Join all elements of Stream1 with
> the
> last
> 5
> elements by a given windowkey of Stream2 on some join key.
>
> This feature can be easily implemented over the current
> operators,
> so
> I
> already have a working prototype for the simple non-grouped
> case.
> My
> only
> concern is the API, the best thing I could come up with is
> something
> like
> this:
>
> stream_A.join(stream_B).onWindow(windowDefA,
> windowDefB).by(windowKey1,
> windowKey2).where(...).equalTo(...).with(...)
>
> (the user can omit the "by" and "with" calls)
>
> I think this new approach would be worthy of our "flexible
> windowing"
> in
> contrast with the current approach.
>
> Regards,
> Gyula
>
>
>
>
>
>
>
>
>


signature.asc (836 bytes) Download Attachment
12