About Interplay of Merged Streams, Output Selectors and Checkpoint Barriers (and Watermarks)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

About Interplay of Merged Streams, Output Selectors and Checkpoint Barriers (and Watermarks)

Aljoscha Krettek-2
Hi Folks,
as I said in the subject. How will this work? I'm in the process about
thinking how to implement low watermarks in Streaming. I'm thinking
that the implementation should be quite similar to how the
checkpointing barriers will be implemented since they also flush out
stuff.

Now I'm wondering how this will work with merged Streams and the
output selectors (split streams). It seems to me that there are a lot
of paths that elements can take to arrive at operators. The problem I
have is that an operator can only emit a low watermark itself if it
knows that all input operators have sent him a low watermark with that
value (the low watermark is the minimum of the low watermarks of all
upstream operators). I imagine that the checkpoint barriers exhibit
the same behaviour.

Do we maybe have to add an explicit union (merge) operator and change
how split streams are implemented?

What are your thoughts?

Cheers,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: About Interplay of Merged Streams, Output Selectors and Checkpoint Barriers (and Watermarks)

Gyula Fóra
Hi,

Checkpoint barriers are handled directly on top of the network layer and
you are right they work similarly, by blocking input channels until it gets
the barrier from all of them.

A way of implementing this on the operator level would be by adding a way
to ask the inputreader the channel index of the last record. This way the
operator could keep track of the channels from which it has received
records and execute the watermark logic. The IndexedReaders have
implemented the necessarry funcionality but were patched away accidentally
buy some earlier changes (as they were not used anyway)

Adding a union operator is probably an overkill and would pose the same
difficulties when implementing it.

Cheers,
Gyula

On Tue, May 12, 2015 at 2:40 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi Folks,
> as I said in the subject. How will this work? I'm in the process about
> thinking how to implement low watermarks in Streaming. I'm thinking
> that the implementation should be quite similar to how the
> checkpointing barriers will be implemented since they also flush out
> stuff.
>
> Now I'm wondering how this will work with merged Streams and the
> output selectors (split streams). It seems to me that there are a lot
> of paths that elements can take to arrive at operators. The problem I
> have is that an operator can only emit a low watermark itself if it
> knows that all input operators have sent him a low watermark with that
> value (the low watermark is the minimum of the low watermarks of all
> upstream operators). I imagine that the checkpoint barriers exhibit
> the same behaviour.
>
> Do we maybe have to add an explicit union (merge) operator and change
> how split streams are implemented?
>
> What are your thoughts?
>
> Cheers,
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: About Interplay of Merged Streams, Output Selectors and Checkpoint Barriers (and Watermarks)

Matthias J. Sax
Hi,

I don't understand why we need the same machnism twice in the code...
Could checkpoing barrieres and low watermarks be unified (or one build
on-top/by-using the other)

-Matthias


On 05/12/2015 02:47 PM, Gyula Fóra wrote:

> Hi,
>
> Checkpoint barriers are handled directly on top of the network layer and
> you are right they work similarly, by blocking input channels until it gets
> the barrier from all of them.
>
> A way of implementing this on the operator level would be by adding a way
> to ask the inputreader the channel index of the last record. This way the
> operator could keep track of the channels from which it has received
> records and execute the watermark logic. The IndexedReaders have
> implemented the necessarry funcionality but were patched away accidentally
> buy some earlier changes (as they were not used anyway)
>
> Adding a union operator is probably an overkill and would pose the same
> difficulties when implementing it.
>
> Cheers,
> Gyula
>
> On Tue, May 12, 2015 at 2:40 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Hi Folks,
>> as I said in the subject. How will this work? I'm in the process about
>> thinking how to implement low watermarks in Streaming. I'm thinking
>> that the implementation should be quite similar to how the
>> checkpointing barriers will be implemented since they also flush out
>> stuff.
>>
>> Now I'm wondering how this will work with merged Streams and the
>> output selectors (split streams). It seems to me that there are a lot
>> of paths that elements can take to arrive at operators. The problem I
>> have is that an operator can only emit a low watermark itself if it
>> knows that all input operators have sent him a low watermark with that
>> value (the low watermark is the minimum of the low watermarks of all
>> upstream operators). I imagine that the checkpoint barriers exhibit
>> the same behaviour.
>>
>> Do we maybe have to add an explicit union (merge) operator and change
>> how split streams are implemented?
>>
>> What are your thoughts?
>>
>> Cheers,
>> Aljoscha
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: About Interplay of Merged Streams, Output Selectors and Checkpoint Barriers (and Watermarks)

Stephan Ewen
In reply to this post by Aljoscha Krettek-2
I would like to refrain from adding additional tasks as much as possible.

I agree with Gyula that extending the reader to track watermarks and call a
handler whenever the watermark advances would be a nice way to implement
this.

On Tue, May 12, 2015 at 2:40 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi Folks,
> as I said in the subject. How will this work? I'm in the process about
> thinking how to implement low watermarks in Streaming. I'm thinking
> that the implementation should be quite similar to how the
> checkpointing barriers will be implemented since they also flush out
> stuff.
>
> Now I'm wondering how this will work with merged Streams and the
> output selectors (split streams). It seems to me that there are a lot
> of paths that elements can take to arrive at operators. The problem I
> have is that an operator can only emit a low watermark itself if it
> knows that all input operators have sent him a low watermark with that
> value (the low watermark is the minimum of the low watermarks of all
> upstream operators). I imagine that the checkpoint barriers exhibit
> the same behaviour.
>
> Do we maybe have to add an explicit union (merge) operator and change
> how split streams are implemented?
>
> What are your thoughts?
>
> Cheers,
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: About Interplay of Merged Streams, Output Selectors and Checkpoint Barriers (and Watermarks)

Gyula Fóra
In reply to this post by Matthias J. Sax
Its actually a very different mechanism as watermarks will not block the
computations

On Tue, May 12, 2015 at 2:48 PM, Matthias J. Sax <
[hidden email]> wrote:

> Hi,
>
> I don't understand why we need the same machnism twice in the code...
> Could checkpoing barrieres and low watermarks be unified (or one build
> on-top/by-using the other)
>
> -Matthias
>
>
> On 05/12/2015 02:47 PM, Gyula Fóra wrote:
> > Hi,
> >
> > Checkpoint barriers are handled directly on top of the network layer and
> > you are right they work similarly, by blocking input channels until it
> gets
> > the barrier from all of them.
> >
> > A way of implementing this on the operator level would be by adding a way
> > to ask the inputreader the channel index of the last record. This way the
> > operator could keep track of the channels from which it has received
> > records and execute the watermark logic. The IndexedReaders have
> > implemented the necessarry funcionality but were patched away
> accidentally
> > buy some earlier changes (as they were not used anyway)
> >
> > Adding a union operator is probably an overkill and would pose the same
> > difficulties when implementing it.
> >
> > Cheers,
> > Gyula
> >
> > On Tue, May 12, 2015 at 2:40 PM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> >> Hi Folks,
> >> as I said in the subject. How will this work? I'm in the process about
> >> thinking how to implement low watermarks in Streaming. I'm thinking
> >> that the implementation should be quite similar to how the
> >> checkpointing barriers will be implemented since they also flush out
> >> stuff.
> >>
> >> Now I'm wondering how this will work with merged Streams and the
> >> output selectors (split streams). It seems to me that there are a lot
> >> of paths that elements can take to arrive at operators. The problem I
> >> have is that an operator can only emit a low watermark itself if it
> >> knows that all input operators have sent him a low watermark with that
> >> value (the low watermark is the minimum of the low watermarks of all
> >> upstream operators). I imagine that the checkpoint barriers exhibit
> >> the same behaviour.
> >>
> >> Do we maybe have to add an explicit union (merge) operator and change
> >> how split streams are implemented?
> >>
> >> What are your thoughts?
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: About Interplay of Merged Streams, Output Selectors and Checkpoint Barriers (and Watermarks)

Stephan Ewen
Watermarks also don't need to flush buffers, they can actually simply queue
in as special stream records, if we want to.

On Tue, May 12, 2015 at 2:53 PM, Gyula Fóra <[hidden email]> wrote:

> Its actually a very different mechanism as watermarks will not block the
> computations
>
> On Tue, May 12, 2015 at 2:48 PM, Matthias J. Sax <
> [hidden email]> wrote:
>
> > Hi,
> >
> > I don't understand why we need the same machnism twice in the code...
> > Could checkpoing barrieres and low watermarks be unified (or one build
> > on-top/by-using the other)
> >
> > -Matthias
> >
> >
> > On 05/12/2015 02:47 PM, Gyula Fóra wrote:
> > > Hi,
> > >
> > > Checkpoint barriers are handled directly on top of the network layer
> and
> > > you are right they work similarly, by blocking input channels until it
> > gets
> > > the barrier from all of them.
> > >
> > > A way of implementing this on the operator level would be by adding a
> way
> > > to ask the inputreader the channel index of the last record. This way
> the
> > > operator could keep track of the channels from which it has received
> > > records and execute the watermark logic. The IndexedReaders have
> > > implemented the necessarry funcionality but were patched away
> > accidentally
> > > buy some earlier changes (as they were not used anyway)
> > >
> > > Adding a union operator is probably an overkill and would pose the same
> > > difficulties when implementing it.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > On Tue, May 12, 2015 at 2:40 PM, Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > >> Hi Folks,
> > >> as I said in the subject. How will this work? I'm in the process about
> > >> thinking how to implement low watermarks in Streaming. I'm thinking
> > >> that the implementation should be quite similar to how the
> > >> checkpointing barriers will be implemented since they also flush out
> > >> stuff.
> > >>
> > >> Now I'm wondering how this will work with merged Streams and the
> > >> output selectors (split streams). It seems to me that there are a lot
> > >> of paths that elements can take to arrive at operators. The problem I
> > >> have is that an operator can only emit a low watermark itself if it
> > >> knows that all input operators have sent him a low watermark with that
> > >> value (the low watermark is the minimum of the low watermarks of all
> > >> upstream operators). I imagine that the checkpoint barriers exhibit
> > >> the same behaviour.
> > >>
> > >> Do we maybe have to add an explicit union (merge) operator and change
> > >> how split streams are implemented?
> > >>
> > >> What are your thoughts?
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: About Interplay of Merged Streams, Output Selectors and Checkpoint Barriers (and Watermarks)

Aljoscha Krettek-2
In reply to this post by Gyula Fóra
What Stephan mentioned is exactly how I'm planning to implement it,
yes. How do the barriers work with chained tasks and OutputSelectorS?
Or is there no special-case code required?

On Tue, May 12, 2015 at 2:53 PM, Gyula Fóra <[hidden email]> wrote:

> Its actually a very different mechanism as watermarks will not block the
> computations
>
> On Tue, May 12, 2015 at 2:48 PM, Matthias J. Sax <
> [hidden email]> wrote:
>
>> Hi,
>>
>> I don't understand why we need the same machnism twice in the code...
>> Could checkpoing barrieres and low watermarks be unified (or one build
>> on-top/by-using the other)
>>
>> -Matthias
>>
>>
>> On 05/12/2015 02:47 PM, Gyula Fóra wrote:
>> > Hi,
>> >
>> > Checkpoint barriers are handled directly on top of the network layer and
>> > you are right they work similarly, by blocking input channels until it
>> gets
>> > the barrier from all of them.
>> >
>> > A way of implementing this on the operator level would be by adding a way
>> > to ask the inputreader the channel index of the last record. This way the
>> > operator could keep track of the channels from which it has received
>> > records and execute the watermark logic. The IndexedReaders have
>> > implemented the necessarry funcionality but were patched away
>> accidentally
>> > buy some earlier changes (as they were not used anyway)
>> >
>> > Adding a union operator is probably an overkill and would pose the same
>> > difficulties when implementing it.
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Tue, May 12, 2015 at 2:40 PM, Aljoscha Krettek <[hidden email]>
>> > wrote:
>> >
>> >> Hi Folks,
>> >> as I said in the subject. How will this work? I'm in the process about
>> >> thinking how to implement low watermarks in Streaming. I'm thinking
>> >> that the implementation should be quite similar to how the
>> >> checkpointing barriers will be implemented since they also flush out
>> >> stuff.
>> >>
>> >> Now I'm wondering how this will work with merged Streams and the
>> >> output selectors (split streams). It seems to me that there are a lot
>> >> of paths that elements can take to arrive at operators. The problem I
>> >> have is that an operator can only emit a low watermark itself if it
>> >> knows that all input operators have sent him a low watermark with that
>> >> value (the low watermark is the minimum of the low watermarks of all
>> >> upstream operators). I imagine that the checkpoint barriers exhibit
>> >> the same behaviour.
>> >>
>> >> Do we maybe have to add an explicit union (merge) operator and change
>> >> how split streams are implemented?
>> >>
>> >> What are your thoughts?
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >
>>
>>