[DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

classic Classic list List threaded Threaded
20 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Aljoscha Krettek-2
Hi Everyone!

We would like to start a discussion on "FLIP-126: Unify (and separate)
Watermark Assigners" [1]. This work was started by Stephan in an
experimental branch. I expanded on that work to provide a PoC for the
changes proposed in this FLIP: [2].

Currently, we have two different flavours of Watermark
Assigners: AssignerWithPunctuatedWatermarks
and AssignerWithPeriodicWatermarks. Both of them extend
from TimestampAssigner. This means that sources that want to support
watermark assignment/extraction in the source need to support two
separate interfaces, we have two operator implementations for the
different flavours. Also, this makes features such as generic support
for idleness detection more complicated to implemented because we again
have to support two types of watermark assigners.

In this FLIP we propose two things:

Unify the Watermark Assigners into one Interface WatermarkGenerator
Separate this new interface from the TimestampAssigner
The motivation for the first is to simplify future implementations and
code duplication. The motivation for the second point is again code
deduplication, most assigners currently have to extend from some base
timestamp extractor or duplicate the extraction logic, or users have to
override an abstract method of the watermark assigner to provide the
timestamp extraction logic.

Additionally, we propose to add a generic wrapping WatermarkGenerator
that provides idleness detection, i.e. it can mark a stream/partition as
idle if no data arrives after a configured timeout.

The "unify and separate" part refers to the fact that we want to unify
punctuated and periodic assigners but at the same time split the
timestamp assigner from the watermark generator.

Please find more details in the FLIP [1]. Looking forward to
your feedback.

Best,
Aljoscha

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners

[2] https://github.com/aljoscha/flink/tree/stephan-event-time
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Timo Walther-2
Thanks for the proposal Aljoscha. This is a very useful unification. We
have considered this FLIP already in the interfaces for FLIP-95 [1] and
look forward to update to the new unified watermark generators once
FLIP-126 has been accepted.

Regards,
Timo

[1] https://github.com/apache/flink/pull/11692

On 20.04.20 18:10, Aljoscha Krettek wrote:

> Hi Everyone!
>
> We would like to start a discussion on "FLIP-126: Unify (and separate)
> Watermark Assigners" [1]. This work was started by Stephan in an
> experimental branch. I expanded on that work to provide a PoC for the
> changes proposed in this FLIP: [2].
>
> Currently, we have two different flavours of Watermark
> Assigners: AssignerWithPunctuatedWatermarks
> and AssignerWithPeriodicWatermarks. Both of them extend
> from TimestampAssigner. This means that sources that want to support
> watermark assignment/extraction in the source need to support two
> separate interfaces, we have two operator implementations for the
> different flavours. Also, this makes features such as generic support
> for idleness detection more complicated to implemented because we again
> have to support two types of watermark assigners.
>
> In this FLIP we propose two things:
>
> Unify the Watermark Assigners into one Interface WatermarkGenerator
> Separate this new interface from the TimestampAssigner
> The motivation for the first is to simplify future implementations and
> code duplication. The motivation for the second point is again code
> deduplication, most assigners currently have to extend from some base
> timestamp extractor or duplicate the extraction logic, or users have to
> override an abstract method of the watermark assigner to provide the
> timestamp extraction logic.
>
> Additionally, we propose to add a generic wrapping WatermarkGenerator
> that provides idleness detection, i.e. it can mark a stream/partition as
> idle if no data arrives after a configured timeout.
>
> The "unify and separate" part refers to the fact that we want to unify
> punctuated and periodic assigners but at the same time split the
> timestamp assigner from the watermark generator.
>
> Please find more details in the FLIP [1]. Looking forward to
> your feedback.
>
> Best,
> Aljoscha
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners 
>
>
> [2] https://github.com/aljoscha/flink/tree/stephan-event-time

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Kostas Kloudas-4
Hi Aljoscha,

Thanks for opening the discussion!

I have two comments on the FLIP:
1) we could add lifecycle methods to the Generator, i.e. open()/
close(), probably with a Context as argument: I have not fully thought
this through but I think that this is more aligned with the rest of
our rich functions. In addition, it will allow, for example, to
initialize the Watermark value, if we decide to checkpoint the
watermark (see [1]) (I also do not know if Table/SQL needs to do
anything in the open()).
2) aligned with the above, and with the case where we want to
checkpoint the watermark in mind, I am wondering about how we could
implement this in the future. In the FLIP, it is proposed to expose
the WatermarkOutput in the methods of the WatermarkGenerator. Given
that there is the implicit contract that watermarks are
non-decreasing, the WatermarkOutput#emitWatermark() will have (I
assume) a check that will compare the last emitted WM against the
provided one, and emit it only if it is >=. If not, then we risk
having the user shooting himself on the foot if he/she accidentally
forgets the check. Given that the WatermarkGenerator and its caller do
not know if the watermark was finally emitted or not (the
WatermarkOutput#emitWatermark returns void), who will be responsible
for checkpointing the WM?

Given this, why not having the methods as:

public interface WatermarkGenerator<T> {

    Watermark onEvent(T event, long eventTimestamp, WatermarkOutput output);

    Watermark onPeriodicEmit(WatermarkOutput output);
}

and the caller will be the one enforcing any invariants, such as
non-decreasing watermarks. In this way, the caller can checkpoint
anything that is needed as it will have complete knowledge as to if
the WM was emitted or not.

What do you think?

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-5601

On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]> wrote:

>
> Thanks for the proposal Aljoscha. This is a very useful unification. We
> have considered this FLIP already in the interfaces for FLIP-95 [1] and
> look forward to update to the new unified watermark generators once
> FLIP-126 has been accepted.
>
> Regards,
> Timo
>
> [1] https://github.com/apache/flink/pull/11692
>
> On 20.04.20 18:10, Aljoscha Krettek wrote:
> > Hi Everyone!
> >
> > We would like to start a discussion on "FLIP-126: Unify (and separate)
> > Watermark Assigners" [1]. This work was started by Stephan in an
> > experimental branch. I expanded on that work to provide a PoC for the
> > changes proposed in this FLIP: [2].
> >
> > Currently, we have two different flavours of Watermark
> > Assigners: AssignerWithPunctuatedWatermarks
> > and AssignerWithPeriodicWatermarks. Both of them extend
> > from TimestampAssigner. This means that sources that want to support
> > watermark assignment/extraction in the source need to support two
> > separate interfaces, we have two operator implementations for the
> > different flavours. Also, this makes features such as generic support
> > for idleness detection more complicated to implemented because we again
> > have to support two types of watermark assigners.
> >
> > In this FLIP we propose two things:
> >
> > Unify the Watermark Assigners into one Interface WatermarkGenerator
> > Separate this new interface from the TimestampAssigner
> > The motivation for the first is to simplify future implementations and
> > code duplication. The motivation for the second point is again code
> > deduplication, most assigners currently have to extend from some base
> > timestamp extractor or duplicate the extraction logic, or users have to
> > override an abstract method of the watermark assigner to provide the
> > timestamp extraction logic.
> >
> > Additionally, we propose to add a generic wrapping WatermarkGenerator
> > that provides idleness detection, i.e. it can mark a stream/partition as
> > idle if no data arrives after a configured timeout.
> >
> > The "unify and separate" part refers to the fact that we want to unify
> > punctuated and periodic assigners but at the same time split the
> > timestamp assigner from the watermark generator.
> >
> > Please find more details in the FLIP [1]. Looking forward to
> > your feedback.
> >
> > Best,
> > Aljoscha
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >
> >
> > [2] https://github.com/aljoscha/flink/tree/stephan-event-time
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

David Anderson-3
Overall I like this proposal; thanks for bringing it forward, Aljoscha.

I also like the idea of making the Watermark generator a rich function --
this should make it more straightforward to implement smarter watermark
generators. Eg, one that uses state to keep statistics about the actual
out-of-orderness, and uses those statistics to implement a variable delay.

David

On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <[hidden email]> wrote:

> Hi Aljoscha,
>
> Thanks for opening the discussion!
>
> I have two comments on the FLIP:
> 1) we could add lifecycle methods to the Generator, i.e. open()/
> close(), probably with a Context as argument: I have not fully thought
> this through but I think that this is more aligned with the rest of
> our rich functions. In addition, it will allow, for example, to
> initialize the Watermark value, if we decide to checkpoint the
> watermark (see [1]) (I also do not know if Table/SQL needs to do
> anything in the open()).
> 2) aligned with the above, and with the case where we want to
> checkpoint the watermark in mind, I am wondering about how we could
> implement this in the future. In the FLIP, it is proposed to expose
> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> that there is the implicit contract that watermarks are
> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> assume) a check that will compare the last emitted WM against the
> provided one, and emit it only if it is >=. If not, then we risk
> having the user shooting himself on the foot if he/she accidentally
> forgets the check. Given that the WatermarkGenerator and its caller do
> not know if the watermark was finally emitted or not (the
> WatermarkOutput#emitWatermark returns void), who will be responsible
> for checkpointing the WM?
>
> Given this, why not having the methods as:
>
> public interface WatermarkGenerator<T> {
>
>     Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
> output);
>
>     Watermark onPeriodicEmit(WatermarkOutput output);
> }
>
> and the caller will be the one enforcing any invariants, such as
> non-decreasing watermarks. In this way, the caller can checkpoint
> anything that is needed as it will have complete knowledge as to if
> the WM was emitted or not.
>
> What do you think?
>
> Cheers,
> Kostas
>
> [1] https://issues.apache.org/jira/browse/FLINK-5601
>
> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]> wrote:
> >
> > Thanks for the proposal Aljoscha. This is a very useful unification. We
> > have considered this FLIP already in the interfaces for FLIP-95 [1] and
> > look forward to update to the new unified watermark generators once
> > FLIP-126 has been accepted.
> >
> > Regards,
> > Timo
> >
> > [1] https://github.com/apache/flink/pull/11692
> >
> > On 20.04.20 18:10, Aljoscha Krettek wrote:
> > > Hi Everyone!
> > >
> > > We would like to start a discussion on "FLIP-126: Unify (and separate)
> > > Watermark Assigners" [1]. This work was started by Stephan in an
> > > experimental branch. I expanded on that work to provide a PoC for the
> > > changes proposed in this FLIP: [2].
> > >
> > > Currently, we have two different flavours of Watermark
> > > Assigners: AssignerWithPunctuatedWatermarks
> > > and AssignerWithPeriodicWatermarks. Both of them extend
> > > from TimestampAssigner. This means that sources that want to support
> > > watermark assignment/extraction in the source need to support two
> > > separate interfaces, we have two operator implementations for the
> > > different flavours. Also, this makes features such as generic support
> > > for idleness detection more complicated to implemented because we again
> > > have to support two types of watermark assigners.
> > >
> > > In this FLIP we propose two things:
> > >
> > > Unify the Watermark Assigners into one Interface WatermarkGenerator
> > > Separate this new interface from the TimestampAssigner
> > > The motivation for the first is to simplify future implementations and
> > > code duplication. The motivation for the second point is again code
> > > deduplication, most assigners currently have to extend from some base
> > > timestamp extractor or duplicate the extraction logic, or users have to
> > > override an abstract method of the watermark assigner to provide the
> > > timestamp extraction logic.
> > >
> > > Additionally, we propose to add a generic wrapping WatermarkGenerator
> > > that provides idleness detection, i.e. it can mark a stream/partition
> as
> > > idle if no data arrives after a configured timeout.
> > >
> > > The "unify and separate" part refers to the fact that we want to unify
> > > punctuated and periodic assigners but at the same time split the
> > > timestamp assigner from the watermark generator.
> > >
> > > Please find more details in the FLIP [1]. Looking forward to
> > > your feedback.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> > >
> > >
> > > [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Aljoscha Krettek-2
Regarding the WatermarkGenerator (WG) interface itself. The proposal is
basically to turn emitting into a "flatMap", we give the
WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
decide whether to output a watermark or not and can also mark the output
as idle. Changing the interface to return a Watermark (as the previous
watermark assigner interface did) would not allow that flexibility.

Regarding checkpointing the watermark and keeping track of the minimum
watermark, this would be the responsibility of the framework (or the
KafkaConsumer in the current implementation). The user-supplied WG does
not need to make sure the watermark doesn't regress.

Regarding making the WG a "rich function", I can see the potential
benefit but I also see a lot of pitfalls. For example, how should the
watermark state be handled in the case of scale-in? It could be made to
work in the Kafka case by attaching the state to the partition state
that we keep, but then we have potential backwards compatibility
problems also for the WM state. Does the WG usually need to keep the
state or might it be enough if the state is transient, i.e. if you have
a restart the WG would loose its histogram but it would rebuild it
quickly and you would get back to the same steady state as before.

Best,
Aljoscha

On 27.04.20 12:12, David Anderson wrote:

> Overall I like this proposal; thanks for bringing it forward, Aljoscha.
>
> I also like the idea of making the Watermark generator a rich function --
> this should make it more straightforward to implement smarter watermark
> generators. Eg, one that uses state to keep statistics about the actual
> out-of-orderness, and uses those statistics to implement a variable delay.
>
> David
>
> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <[hidden email]> wrote:
>
>> Hi Aljoscha,
>>
>> Thanks for opening the discussion!
>>
>> I have two comments on the FLIP:
>> 1) we could add lifecycle methods to the Generator, i.e. open()/
>> close(), probably with a Context as argument: I have not fully thought
>> this through but I think that this is more aligned with the rest of
>> our rich functions. In addition, it will allow, for example, to
>> initialize the Watermark value, if we decide to checkpoint the
>> watermark (see [1]) (I also do not know if Table/SQL needs to do
>> anything in the open()).
>> 2) aligned with the above, and with the case where we want to
>> checkpoint the watermark in mind, I am wondering about how we could
>> implement this in the future. In the FLIP, it is proposed to expose
>> the WatermarkOutput in the methods of the WatermarkGenerator. Given
>> that there is the implicit contract that watermarks are
>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
>> assume) a check that will compare the last emitted WM against the
>> provided one, and emit it only if it is >=. If not, then we risk
>> having the user shooting himself on the foot if he/she accidentally
>> forgets the check. Given that the WatermarkGenerator and its caller do
>> not know if the watermark was finally emitted or not (the
>> WatermarkOutput#emitWatermark returns void), who will be responsible
>> for checkpointing the WM?
>>
>> Given this, why not having the methods as:
>>
>> public interface WatermarkGenerator<T> {
>>
>>      Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
>> output);
>>
>>      Watermark onPeriodicEmit(WatermarkOutput output);
>> }
>>
>> and the caller will be the one enforcing any invariants, such as
>> non-decreasing watermarks. In this way, the caller can checkpoint
>> anything that is needed as it will have complete knowledge as to if
>> the WM was emitted or not.
>>
>> What do you think?
>>
>> Cheers,
>> Kostas
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>
>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]> wrote:
>>>
>>> Thanks for the proposal Aljoscha. This is a very useful unification. We
>>> have considered this FLIP already in the interfaces for FLIP-95 [1] and
>>> look forward to update to the new unified watermark generators once
>>> FLIP-126 has been accepted.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://github.com/apache/flink/pull/11692
>>>
>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>> Hi Everyone!
>>>>
>>>> We would like to start a discussion on "FLIP-126: Unify (and separate)
>>>> Watermark Assigners" [1]. This work was started by Stephan in an
>>>> experimental branch. I expanded on that work to provide a PoC for the
>>>> changes proposed in this FLIP: [2].
>>>>
>>>> Currently, we have two different flavours of Watermark
>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>> from TimestampAssigner. This means that sources that want to support
>>>> watermark assignment/extraction in the source need to support two
>>>> separate interfaces, we have two operator implementations for the
>>>> different flavours. Also, this makes features such as generic support
>>>> for idleness detection more complicated to implemented because we again
>>>> have to support two types of watermark assigners.
>>>>
>>>> In this FLIP we propose two things:
>>>>
>>>> Unify the Watermark Assigners into one Interface WatermarkGenerator
>>>> Separate this new interface from the TimestampAssigner
>>>> The motivation for the first is to simplify future implementations and
>>>> code duplication. The motivation for the second point is again code
>>>> deduplication, most assigners currently have to extend from some base
>>>> timestamp extractor or duplicate the extraction logic, or users have to
>>>> override an abstract method of the watermark assigner to provide the
>>>> timestamp extraction logic.
>>>>
>>>> Additionally, we propose to add a generic wrapping WatermarkGenerator
>>>> that provides idleness detection, i.e. it can mark a stream/partition
>> as
>>>> idle if no data arrives after a configured timeout.
>>>>
>>>> The "unify and separate" part refers to the fact that we want to unify
>>>> punctuated and periodic assigners but at the same time split the
>>>> timestamp assigner from the watermark generator.
>>>>
>>>> Please find more details in the FLIP [1]. Looking forward to
>>>> your feedback.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> [1]
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>
>>>>
>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Stephan Ewen
Thanks, Aljoscha, for picking this up.

I agree with the approach of doing the here proposed set of changes for
now. It already makes things simpler and adds idleness support everywhere.

Rich functions and state always add complexity, let's do this in a next
step, if we have a really compelling case.


On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <[hidden email]>
wrote:

> Regarding the WatermarkGenerator (WG) interface itself. The proposal is
> basically to turn emitting into a "flatMap", we give the
> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
> decide whether to output a watermark or not and can also mark the output
> as idle. Changing the interface to return a Watermark (as the previous
> watermark assigner interface did) would not allow that flexibility.
>
> Regarding checkpointing the watermark and keeping track of the minimum
> watermark, this would be the responsibility of the framework (or the
> KafkaConsumer in the current implementation). The user-supplied WG does
> not need to make sure the watermark doesn't regress.
>
> Regarding making the WG a "rich function", I can see the potential
> benefit but I also see a lot of pitfalls. For example, how should the
> watermark state be handled in the case of scale-in? It could be made to
> work in the Kafka case by attaching the state to the partition state
> that we keep, but then we have potential backwards compatibility
> problems also for the WM state. Does the WG usually need to keep the
> state or might it be enough if the state is transient, i.e. if you have
> a restart the WG would loose its histogram but it would rebuild it
> quickly and you would get back to the same steady state as before.
>
> Best,
> Aljoscha
>
> On 27.04.20 12:12, David Anderson wrote:
> > Overall I like this proposal; thanks for bringing it forward, Aljoscha.
> >
> > I also like the idea of making the Watermark generator a rich function --
> > this should make it more straightforward to implement smarter watermark
> > generators. Eg, one that uses state to keep statistics about the actual
> > out-of-orderness, and uses those statistics to implement a variable
> delay.
> >
> > David
> >
> > On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <[hidden email]>
> wrote:
> >
> >> Hi Aljoscha,
> >>
> >> Thanks for opening the discussion!
> >>
> >> I have two comments on the FLIP:
> >> 1) we could add lifecycle methods to the Generator, i.e. open()/
> >> close(), probably with a Context as argument: I have not fully thought
> >> this through but I think that this is more aligned with the rest of
> >> our rich functions. In addition, it will allow, for example, to
> >> initialize the Watermark value, if we decide to checkpoint the
> >> watermark (see [1]) (I also do not know if Table/SQL needs to do
> >> anything in the open()).
> >> 2) aligned with the above, and with the case where we want to
> >> checkpoint the watermark in mind, I am wondering about how we could
> >> implement this in the future. In the FLIP, it is proposed to expose
> >> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> >> that there is the implicit contract that watermarks are
> >> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> >> assume) a check that will compare the last emitted WM against the
> >> provided one, and emit it only if it is >=. If not, then we risk
> >> having the user shooting himself on the foot if he/she accidentally
> >> forgets the check. Given that the WatermarkGenerator and its caller do
> >> not know if the watermark was finally emitted or not (the
> >> WatermarkOutput#emitWatermark returns void), who will be responsible
> >> for checkpointing the WM?
> >>
> >> Given this, why not having the methods as:
> >>
> >> public interface WatermarkGenerator<T> {
> >>
> >>      Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
> >> output);
> >>
> >>      Watermark onPeriodicEmit(WatermarkOutput output);
> >> }
> >>
> >> and the caller will be the one enforcing any invariants, such as
> >> non-decreasing watermarks. In this way, the caller can checkpoint
> >> anything that is needed as it will have complete knowledge as to if
> >> the WM was emitted or not.
> >>
> >> What do you think?
> >>
> >> Cheers,
> >> Kostas
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>
> >> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]>
> wrote:
> >>>
> >>> Thanks for the proposal Aljoscha. This is a very useful unification. We
> >>> have considered this FLIP already in the interfaces for FLIP-95 [1] and
> >>> look forward to update to the new unified watermark generators once
> >>> FLIP-126 has been accepted.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> [1] https://github.com/apache/flink/pull/11692
> >>>
> >>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>> Hi Everyone!
> >>>>
> >>>> We would like to start a discussion on "FLIP-126: Unify (and separate)
> >>>> Watermark Assigners" [1]. This work was started by Stephan in an
> >>>> experimental branch. I expanded on that work to provide a PoC for the
> >>>> changes proposed in this FLIP: [2].
> >>>>
> >>>> Currently, we have two different flavours of Watermark
> >>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>> from TimestampAssigner. This means that sources that want to support
> >>>> watermark assignment/extraction in the source need to support two
> >>>> separate interfaces, we have two operator implementations for the
> >>>> different flavours. Also, this makes features such as generic support
> >>>> for idleness detection more complicated to implemented because we
> again
> >>>> have to support two types of watermark assigners.
> >>>>
> >>>> In this FLIP we propose two things:
> >>>>
> >>>> Unify the Watermark Assigners into one Interface WatermarkGenerator
> >>>> Separate this new interface from the TimestampAssigner
> >>>> The motivation for the first is to simplify future implementations and
> >>>> code duplication. The motivation for the second point is again code
> >>>> deduplication, most assigners currently have to extend from some base
> >>>> timestamp extractor or duplicate the extraction logic, or users have
> to
> >>>> override an abstract method of the watermark assigner to provide the
> >>>> timestamp extraction logic.
> >>>>
> >>>> Additionally, we propose to add a generic wrapping WatermarkGenerator
> >>>> that provides idleness detection, i.e. it can mark a stream/partition
> >> as
> >>>> idle if no data arrives after a configured timeout.
> >>>>
> >>>> The "unify and separate" part refers to the fact that we want to unify
> >>>> punctuated and periodic assigners but at the same time split the
> >>>> timestamp assigner from the watermark generator.
> >>>>
> >>>> Please find more details in the FLIP [1]. Looking forward to
> >>>> your feedback.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> [1]
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>>
> >>>>
> >>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Jark Wu-2
Hi,

Regarding to the `open()/close()`, I think it's necessary for Table&SQL to
compile the generated code.
In Table&SQL, the watermark strategy and event-timestamp is defined using
SQL expressions, we will
translate and generate Java code for the expressions. If we have
`open()/close()`, we don't need lazy initialization.
Besides that, I can see a need to report some metrics, e.g. the current
watermark, the dirty timestamps (null value), etc.
So I think a simple `open()/close()` with a context which can get
MetricGroup is nice and not complex for the first version.

Best,
Jark



On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]> wrote:

> Thanks, Aljoscha, for picking this up.
>
> I agree with the approach of doing the here proposed set of changes for
> now. It already makes things simpler and adds idleness support everywhere.
>
> Rich functions and state always add complexity, let's do this in a next
> step, if we have a really compelling case.
>
>
> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Regarding the WatermarkGenerator (WG) interface itself. The proposal is
> > basically to turn emitting into a "flatMap", we give the
> > WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
> > decide whether to output a watermark or not and can also mark the output
> > as idle. Changing the interface to return a Watermark (as the previous
> > watermark assigner interface did) would not allow that flexibility.
> >
> > Regarding checkpointing the watermark and keeping track of the minimum
> > watermark, this would be the responsibility of the framework (or the
> > KafkaConsumer in the current implementation). The user-supplied WG does
> > not need to make sure the watermark doesn't regress.
> >
> > Regarding making the WG a "rich function", I can see the potential
> > benefit but I also see a lot of pitfalls. For example, how should the
> > watermark state be handled in the case of scale-in? It could be made to
> > work in the Kafka case by attaching the state to the partition state
> > that we keep, but then we have potential backwards compatibility
> > problems also for the WM state. Does the WG usually need to keep the
> > state or might it be enough if the state is transient, i.e. if you have
> > a restart the WG would loose its histogram but it would rebuild it
> > quickly and you would get back to the same steady state as before.
> >
> > Best,
> > Aljoscha
> >
> > On 27.04.20 12:12, David Anderson wrote:
> > > Overall I like this proposal; thanks for bringing it forward, Aljoscha.
> > >
> > > I also like the idea of making the Watermark generator a rich function
> --
> > > this should make it more straightforward to implement smarter watermark
> > > generators. Eg, one that uses state to keep statistics about the actual
> > > out-of-orderness, and uses those statistics to implement a variable
> > delay.
> > >
> > > David
> > >
> > > On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <[hidden email]>
> > wrote:
> > >
> > >> Hi Aljoscha,
> > >>
> > >> Thanks for opening the discussion!
> > >>
> > >> I have two comments on the FLIP:
> > >> 1) we could add lifecycle methods to the Generator, i.e. open()/
> > >> close(), probably with a Context as argument: I have not fully thought
> > >> this through but I think that this is more aligned with the rest of
> > >> our rich functions. In addition, it will allow, for example, to
> > >> initialize the Watermark value, if we decide to checkpoint the
> > >> watermark (see [1]) (I also do not know if Table/SQL needs to do
> > >> anything in the open()).
> > >> 2) aligned with the above, and with the case where we want to
> > >> checkpoint the watermark in mind, I am wondering about how we could
> > >> implement this in the future. In the FLIP, it is proposed to expose
> > >> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> > >> that there is the implicit contract that watermarks are
> > >> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> > >> assume) a check that will compare the last emitted WM against the
> > >> provided one, and emit it only if it is >=. If not, then we risk
> > >> having the user shooting himself on the foot if he/she accidentally
> > >> forgets the check. Given that the WatermarkGenerator and its caller do
> > >> not know if the watermark was finally emitted or not (the
> > >> WatermarkOutput#emitWatermark returns void), who will be responsible
> > >> for checkpointing the WM?
> > >>
> > >> Given this, why not having the methods as:
> > >>
> > >> public interface WatermarkGenerator<T> {
> > >>
> > >>      Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
> > >> output);
> > >>
> > >>      Watermark onPeriodicEmit(WatermarkOutput output);
> > >> }
> > >>
> > >> and the caller will be the one enforcing any invariants, such as
> > >> non-decreasing watermarks. In this way, the caller can checkpoint
> > >> anything that is needed as it will have complete knowledge as to if
> > >> the WM was emitted or not.
> > >>
> > >> What do you think?
> > >>
> > >> Cheers,
> > >> Kostas
> > >>
> > >> [1] https://issues.apache.org/jira/browse/FLINK-5601
> > >>
> > >> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]>
> > wrote:
> > >>>
> > >>> Thanks for the proposal Aljoscha. This is a very useful unification.
> We
> > >>> have considered this FLIP already in the interfaces for FLIP-95 [1]
> and
> > >>> look forward to update to the new unified watermark generators once
> > >>> FLIP-126 has been accepted.
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>> [1] https://github.com/apache/flink/pull/11692
> > >>>
> > >>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> > >>>> Hi Everyone!
> > >>>>
> > >>>> We would like to start a discussion on "FLIP-126: Unify (and
> separate)
> > >>>> Watermark Assigners" [1]. This work was started by Stephan in an
> > >>>> experimental branch. I expanded on that work to provide a PoC for
> the
> > >>>> changes proposed in this FLIP: [2].
> > >>>>
> > >>>> Currently, we have two different flavours of Watermark
> > >>>> Assigners: AssignerWithPunctuatedWatermarks
> > >>>> and AssignerWithPeriodicWatermarks. Both of them extend
> > >>>> from TimestampAssigner. This means that sources that want to support
> > >>>> watermark assignment/extraction in the source need to support two
> > >>>> separate interfaces, we have two operator implementations for the
> > >>>> different flavours. Also, this makes features such as generic
> support
> > >>>> for idleness detection more complicated to implemented because we
> > again
> > >>>> have to support two types of watermark assigners.
> > >>>>
> > >>>> In this FLIP we propose two things:
> > >>>>
> > >>>> Unify the Watermark Assigners into one Interface WatermarkGenerator
> > >>>> Separate this new interface from the TimestampAssigner
> > >>>> The motivation for the first is to simplify future implementations
> and
> > >>>> code duplication. The motivation for the second point is again code
> > >>>> deduplication, most assigners currently have to extend from some
> base
> > >>>> timestamp extractor or duplicate the extraction logic, or users have
> > to
> > >>>> override an abstract method of the watermark assigner to provide the
> > >>>> timestamp extraction logic.
> > >>>>
> > >>>> Additionally, we propose to add a generic wrapping
> WatermarkGenerator
> > >>>> that provides idleness detection, i.e. it can mark a
> stream/partition
> > >> as
> > >>>> idle if no data arrives after a configured timeout.
> > >>>>
> > >>>> The "unify and separate" part refers to the fact that we want to
> unify
> > >>>> punctuated and periodic assigners but at the same time split the
> > >>>> timestamp assigner from the watermark generator.
> > >>>>
> > >>>> Please find more details in the FLIP [1]. Looking forward to
> > >>>> your feedback.
> > >>>>
> > >>>> Best,
> > >>>> Aljoscha
> > >>>>
> > >>>> [1]
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> > >>>>
> > >>>>
> > >>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> > >>>
> > >>
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Aljoscha Krettek-2
Ah, I meant to write this in my previous email, sorry about that.

The WatermarkStrategy, which is basically a factory for a
WatermarkGenerator is the replacement for the open() method. This is the
same strategy that was followed for StreamOperatorFactory, which was
introduced to allow code generation in the Table API [1]. If we need
metrics or other things we would add that as a parameter to the factory
method. What do you think?

Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-11974

On 10.05.20 05:07, Jark Wu wrote:

> Hi,
>
> Regarding to the `open()/close()`, I think it's necessary for Table&SQL to
> compile the generated code.
> In Table&SQL, the watermark strategy and event-timestamp is defined using
> SQL expressions, we will
> translate and generate Java code for the expressions. If we have
> `open()/close()`, we don't need lazy initialization.
> Besides that, I can see a need to report some metrics, e.g. the current
> watermark, the dirty timestamps (null value), etc.
> So I think a simple `open()/close()` with a context which can get
> MetricGroup is nice and not complex for the first version.
>
> Best,
> Jark
>
>
>
> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]> wrote:
>
>> Thanks, Aljoscha, for picking this up.
>>
>> I agree with the approach of doing the here proposed set of changes for
>> now. It already makes things simpler and adds idleness support everywhere.
>>
>> Rich functions and state always add complexity, let's do this in a next
>> step, if we have a really compelling case.
>>
>>
>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal is
>>> basically to turn emitting into a "flatMap", we give the
>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
>>> decide whether to output a watermark or not and can also mark the output
>>> as idle. Changing the interface to return a Watermark (as the previous
>>> watermark assigner interface did) would not allow that flexibility.
>>>
>>> Regarding checkpointing the watermark and keeping track of the minimum
>>> watermark, this would be the responsibility of the framework (or the
>>> KafkaConsumer in the current implementation). The user-supplied WG does
>>> not need to make sure the watermark doesn't regress.
>>>
>>> Regarding making the WG a "rich function", I can see the potential
>>> benefit but I also see a lot of pitfalls. For example, how should the
>>> watermark state be handled in the case of scale-in? It could be made to
>>> work in the Kafka case by attaching the state to the partition state
>>> that we keep, but then we have potential backwards compatibility
>>> problems also for the WM state. Does the WG usually need to keep the
>>> state or might it be enough if the state is transient, i.e. if you have
>>> a restart the WG would loose its histogram but it would rebuild it
>>> quickly and you would get back to the same steady state as before.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 27.04.20 12:12, David Anderson wrote:
>>>> Overall I like this proposal; thanks for bringing it forward, Aljoscha.
>>>>
>>>> I also like the idea of making the Watermark generator a rich function
>> --
>>>> this should make it more straightforward to implement smarter watermark
>>>> generators. Eg, one that uses state to keep statistics about the actual
>>>> out-of-orderness, and uses those statistics to implement a variable
>>> delay.
>>>>
>>>> David
>>>>
>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <[hidden email]>
>>> wrote:
>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> Thanks for opening the discussion!
>>>>>
>>>>> I have two comments on the FLIP:
>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
>>>>> close(), probably with a Context as argument: I have not fully thought
>>>>> this through but I think that this is more aligned with the rest of
>>>>> our rich functions. In addition, it will allow, for example, to
>>>>> initialize the Watermark value, if we decide to checkpoint the
>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
>>>>> anything in the open()).
>>>>> 2) aligned with the above, and with the case where we want to
>>>>> checkpoint the watermark in mind, I am wondering about how we could
>>>>> implement this in the future. In the FLIP, it is proposed to expose
>>>>> the WatermarkOutput in the methods of the WatermarkGenerator. Given
>>>>> that there is the implicit contract that watermarks are
>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
>>>>> assume) a check that will compare the last emitted WM against the
>>>>> provided one, and emit it only if it is >=. If not, then we risk
>>>>> having the user shooting himself on the foot if he/she accidentally
>>>>> forgets the check. Given that the WatermarkGenerator and its caller do
>>>>> not know if the watermark was finally emitted or not (the
>>>>> WatermarkOutput#emitWatermark returns void), who will be responsible
>>>>> for checkpointing the WM?
>>>>>
>>>>> Given this, why not having the methods as:
>>>>>
>>>>> public interface WatermarkGenerator<T> {
>>>>>
>>>>>       Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
>>>>> output);
>>>>>
>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
>>>>> }
>>>>>
>>>>> and the caller will be the one enforcing any invariants, such as
>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
>>>>> anything that is needed as it will have complete knowledge as to if
>>>>> the WM was emitted or not.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>
>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]>
>>> wrote:
>>>>>>
>>>>>> Thanks for the proposal Aljoscha. This is a very useful unification.
>> We
>>>>>> have considered this FLIP already in the interfaces for FLIP-95 [1]
>> and
>>>>>> look forward to update to the new unified watermark generators once
>>>>>> FLIP-126 has been accepted.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>
>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>> Hi Everyone!
>>>>>>>
>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
>> separate)
>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an
>>>>>>> experimental branch. I expanded on that work to provide a PoC for
>> the
>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>
>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>> from TimestampAssigner. This means that sources that want to support
>>>>>>> watermark assignment/extraction in the source need to support two
>>>>>>> separate interfaces, we have two operator implementations for the
>>>>>>> different flavours. Also, this makes features such as generic
>> support
>>>>>>> for idleness detection more complicated to implemented because we
>>> again
>>>>>>> have to support two types of watermark assigners.
>>>>>>>
>>>>>>> In this FLIP we propose two things:
>>>>>>>
>>>>>>> Unify the Watermark Assigners into one Interface WatermarkGenerator
>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>> The motivation for the first is to simplify future implementations
>> and
>>>>>>> code duplication. The motivation for the second point is again code
>>>>>>> deduplication, most assigners currently have to extend from some
>> base
>>>>>>> timestamp extractor or duplicate the extraction logic, or users have
>>> to
>>>>>>> override an abstract method of the watermark assigner to provide the
>>>>>>> timestamp extraction logic.
>>>>>>>
>>>>>>> Additionally, we propose to add a generic wrapping
>> WatermarkGenerator
>>>>>>> that provides idleness detection, i.e. it can mark a
>> stream/partition
>>>>> as
>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>
>>>>>>> The "unify and separate" part refers to the fact that we want to
>> unify
>>>>>>> punctuated and periodic assigners but at the same time split the
>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>
>>>>>>> Please find more details in the FLIP [1]. Looking forward to
>>>>>>> your feedback.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>>>>
>>>>>>>
>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Aljoscha Krettek-2
We're slightly running out of time. I would propose we vote on the basic
principle and remain open to later additions. This feature is quite
important to make the new Kafka Source that is developed as part of
FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
the newly added connector.

I know that's a bit unorthodox but would everyone be OK with what's
currently there and then we iterate?

Best,
Aljoscha

On 11.05.20 13:57, Aljoscha Krettek wrote:

> Ah, I meant to write this in my previous email, sorry about that.
>
> The WatermarkStrategy, which is basically a factory for a
> WatermarkGenerator is the replacement for the open() method. This is the
> same strategy that was followed for StreamOperatorFactory, which was
> introduced to allow code generation in the Table API [1]. If we need
> metrics or other things we would add that as a parameter to the factory
> method. What do you think?
>
> Best,
> Aljoscha
>
> [1] https://issues.apache.org/jira/browse/FLINK-11974
>
> On 10.05.20 05:07, Jark Wu wrote:
>> Hi,
>>
>> Regarding to the `open()/close()`, I think it's necessary for
>> Table&SQL to
>> compile the generated code.
>> In Table&SQL, the watermark strategy and event-timestamp is defined using
>> SQL expressions, we will
>> translate and generate Java code for the expressions. If we have
>> `open()/close()`, we don't need lazy initialization.
>> Besides that, I can see a need to report some metrics, e.g. the current
>> watermark, the dirty timestamps (null value), etc.
>> So I think a simple `open()/close()` with a context which can get
>> MetricGroup is nice and not complex for the first version.
>>
>> Best,
>> Jark
>>
>>
>>
>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]> wrote:
>>
>>> Thanks, Aljoscha, for picking this up.
>>>
>>> I agree with the approach of doing the here proposed set of changes for
>>> now. It already makes things simpler and adds idleness support
>>> everywhere.
>>>
>>> Rich functions and state always add complexity, let's do this in a next
>>> step, if we have a really compelling case.
>>>
>>>
>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <[hidden email]>
>>> wrote:
>>>
>>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal is
>>>> basically to turn emitting into a "flatMap", we give the
>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
>>>> decide whether to output a watermark or not and can also mark the
>>>> output
>>>> as idle. Changing the interface to return a Watermark (as the previous
>>>> watermark assigner interface did) would not allow that flexibility.
>>>>
>>>> Regarding checkpointing the watermark and keeping track of the minimum
>>>> watermark, this would be the responsibility of the framework (or the
>>>> KafkaConsumer in the current implementation). The user-supplied WG does
>>>> not need to make sure the watermark doesn't regress.
>>>>
>>>> Regarding making the WG a "rich function", I can see the potential
>>>> benefit but I also see a lot of pitfalls. For example, how should the
>>>> watermark state be handled in the case of scale-in? It could be made to
>>>> work in the Kafka case by attaching the state to the partition state
>>>> that we keep, but then we have potential backwards compatibility
>>>> problems also for the WM state. Does the WG usually need to keep the
>>>> state or might it be enough if the state is transient, i.e. if you have
>>>> a restart the WG would loose its histogram but it would rebuild it
>>>> quickly and you would get back to the same steady state as before.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 27.04.20 12:12, David Anderson wrote:
>>>>> Overall I like this proposal; thanks for bringing it forward,
>>>>> Aljoscha.
>>>>>
>>>>> I also like the idea of making the Watermark generator a rich function
>>> --
>>>>> this should make it more straightforward to implement smarter
>>>>> watermark
>>>>> generators. Eg, one that uses state to keep statistics about the
>>>>> actual
>>>>> out-of-orderness, and uses those statistics to implement a variable
>>>> delay.
>>>>>
>>>>> David
>>>>>
>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <[hidden email]>
>>>> wrote:
>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> Thanks for opening the discussion!
>>>>>>
>>>>>> I have two comments on the FLIP:
>>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
>>>>>> close(), probably with a Context as argument: I have not fully
>>>>>> thought
>>>>>> this through but I think that this is more aligned with the rest of
>>>>>> our rich functions. In addition, it will allow, for example, to
>>>>>> initialize the Watermark value, if we decide to checkpoint the
>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
>>>>>> anything in the open()).
>>>>>> 2) aligned with the above, and with the case where we want to
>>>>>> checkpoint the watermark in mind, I am wondering about how we could
>>>>>> implement this in the future. In the FLIP, it is proposed to expose
>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator. Given
>>>>>> that there is the implicit contract that watermarks are
>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
>>>>>> assume) a check that will compare the last emitted WM against the
>>>>>> provided one, and emit it only if it is >=. If not, then we risk
>>>>>> having the user shooting himself on the foot if he/she accidentally
>>>>>> forgets the check. Given that the WatermarkGenerator and its
>>>>>> caller do
>>>>>> not know if the watermark was finally emitted or not (the
>>>>>> WatermarkOutput#emitWatermark returns void), who will be responsible
>>>>>> for checkpointing the WM?
>>>>>>
>>>>>> Given this, why not having the methods as:
>>>>>>
>>>>>> public interface WatermarkGenerator<T> {
>>>>>>
>>>>>>       Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
>>>>>> output);
>>>>>>
>>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
>>>>>> }
>>>>>>
>>>>>> and the caller will be the one enforcing any invariants, such as
>>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
>>>>>> anything that is needed as it will have complete knowledge as to if
>>>>>> the WM was emitted or not.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>>
>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]>
>>>> wrote:
>>>>>>>
>>>>>>> Thanks for the proposal Aljoscha. This is a very useful unification.
>>> We
>>>>>>> have considered this FLIP already in the interfaces for FLIP-95 [1]
>>> and
>>>>>>> look forward to update to the new unified watermark generators once
>>>>>>> FLIP-126 has been accepted.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>>
>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>>> Hi Everyone!
>>>>>>>>
>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
>>> separate)
>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an
>>>>>>>> experimental branch. I expanded on that work to provide a PoC for
>>> the
>>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>>
>>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>>> from TimestampAssigner. This means that sources that want to
>>>>>>>> support
>>>>>>>> watermark assignment/extraction in the source need to support two
>>>>>>>> separate interfaces, we have two operator implementations for the
>>>>>>>> different flavours. Also, this makes features such as generic
>>> support
>>>>>>>> for idleness detection more complicated to implemented because we
>>>> again
>>>>>>>> have to support two types of watermark assigners.
>>>>>>>>
>>>>>>>> In this FLIP we propose two things:
>>>>>>>>
>>>>>>>> Unify the Watermark Assigners into one Interface WatermarkGenerator
>>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>>> The motivation for the first is to simplify future implementations
>>> and
>>>>>>>> code duplication. The motivation for the second point is again code
>>>>>>>> deduplication, most assigners currently have to extend from some
>>> base
>>>>>>>> timestamp extractor or duplicate the extraction logic, or users
>>>>>>>> have
>>>> to
>>>>>>>> override an abstract method of the watermark assigner to provide
>>>>>>>> the
>>>>>>>> timestamp extraction logic.
>>>>>>>>
>>>>>>>> Additionally, we propose to add a generic wrapping
>>> WatermarkGenerator
>>>>>>>> that provides idleness detection, i.e. it can mark a
>>> stream/partition
>>>>>> as
>>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>>
>>>>>>>> The "unify and separate" part refers to the fact that we want to
>>> unify
>>>>>>>> punctuated and periodic assigners but at the same time split the
>>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>>
>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
>>>>>>>> your feedback.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners 
>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Stephan Ewen
I am fine with that.

Much of the principles seem agreed upon. I understand the need to support
code-generated extractors and we should support most of it already (as
Aljoscha mentioned via the factories) can extend this if needed.

I think that the factory approach supports code-generated extractors in a
cleaner way even than an extractor with an open/init method.


On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <[hidden email]>
wrote:

> We're slightly running out of time. I would propose we vote on the basic
> principle and remain open to later additions. This feature is quite
> important to make the new Kafka Source that is developed as part of
> FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
> the newly added connector.
>
> I know that's a bit unorthodox but would everyone be OK with what's
> currently there and then we iterate?
>
> Best,
> Aljoscha
>
> On 11.05.20 13:57, Aljoscha Krettek wrote:
> > Ah, I meant to write this in my previous email, sorry about that.
> >
> > The WatermarkStrategy, which is basically a factory for a
> > WatermarkGenerator is the replacement for the open() method. This is the
> > same strategy that was followed for StreamOperatorFactory, which was
> > introduced to allow code generation in the Table API [1]. If we need
> > metrics or other things we would add that as a parameter to the factory
> > method. What do you think?
> >
> > Best,
> > Aljoscha
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-11974
> >
> > On 10.05.20 05:07, Jark Wu wrote:
> >> Hi,
> >>
> >> Regarding to the `open()/close()`, I think it's necessary for
> >> Table&SQL to
> >> compile the generated code.
> >> In Table&SQL, the watermark strategy and event-timestamp is defined
> using
> >> SQL expressions, we will
> >> translate and generate Java code for the expressions. If we have
> >> `open()/close()`, we don't need lazy initialization.
> >> Besides that, I can see a need to report some metrics, e.g. the current
> >> watermark, the dirty timestamps (null value), etc.
> >> So I think a simple `open()/close()` with a context which can get
> >> MetricGroup is nice and not complex for the first version.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]> wrote:
> >>
> >>> Thanks, Aljoscha, for picking this up.
> >>>
> >>> I agree with the approach of doing the here proposed set of changes for
> >>> now. It already makes things simpler and adds idleness support
> >>> everywhere.
> >>>
> >>> Rich functions and state always add complexity, let's do this in a next
> >>> step, if we have a really compelling case.
> >>>
> >>>
> >>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <[hidden email]>
> >>> wrote:
> >>>
> >>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal
> is
> >>>> basically to turn emitting into a "flatMap", we give the
> >>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
> >>>> decide whether to output a watermark or not and can also mark the
> >>>> output
> >>>> as idle. Changing the interface to return a Watermark (as the previous
> >>>> watermark assigner interface did) would not allow that flexibility.
> >>>>
> >>>> Regarding checkpointing the watermark and keeping track of the minimum
> >>>> watermark, this would be the responsibility of the framework (or the
> >>>> KafkaConsumer in the current implementation). The user-supplied WG
> does
> >>>> not need to make sure the watermark doesn't regress.
> >>>>
> >>>> Regarding making the WG a "rich function", I can see the potential
> >>>> benefit but I also see a lot of pitfalls. For example, how should the
> >>>> watermark state be handled in the case of scale-in? It could be made
> to
> >>>> work in the Kafka case by attaching the state to the partition state
> >>>> that we keep, but then we have potential backwards compatibility
> >>>> problems also for the WM state. Does the WG usually need to keep the
> >>>> state or might it be enough if the state is transient, i.e. if you
> have
> >>>> a restart the WG would loose its histogram but it would rebuild it
> >>>> quickly and you would get back to the same steady state as before.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> On 27.04.20 12:12, David Anderson wrote:
> >>>>> Overall I like this proposal; thanks for bringing it forward,
> >>>>> Aljoscha.
> >>>>>
> >>>>> I also like the idea of making the Watermark generator a rich
> function
> >>> --
> >>>>> this should make it more straightforward to implement smarter
> >>>>> watermark
> >>>>> generators. Eg, one that uses state to keep statistics about the
> >>>>> actual
> >>>>> out-of-orderness, and uses those statistics to implement a variable
> >>>> delay.
> >>>>>
> >>>>> David
> >>>>>
> >>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <[hidden email]>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Aljoscha,
> >>>>>>
> >>>>>> Thanks for opening the discussion!
> >>>>>>
> >>>>>> I have two comments on the FLIP:
> >>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
> >>>>>> close(), probably with a Context as argument: I have not fully
> >>>>>> thought
> >>>>>> this through but I think that this is more aligned with the rest of
> >>>>>> our rich functions. In addition, it will allow, for example, to
> >>>>>> initialize the Watermark value, if we decide to checkpoint the
> >>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
> >>>>>> anything in the open()).
> >>>>>> 2) aligned with the above, and with the case where we want to
> >>>>>> checkpoint the watermark in mind, I am wondering about how we could
> >>>>>> implement this in the future. In the FLIP, it is proposed to expose
> >>>>>> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> >>>>>> that there is the implicit contract that watermarks are
> >>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> >>>>>> assume) a check that will compare the last emitted WM against the
> >>>>>> provided one, and emit it only if it is >=. If not, then we risk
> >>>>>> having the user shooting himself on the foot if he/she accidentally
> >>>>>> forgets the check. Given that the WatermarkGenerator and its
> >>>>>> caller do
> >>>>>> not know if the watermark was finally emitted or not (the
> >>>>>> WatermarkOutput#emitWatermark returns void), who will be responsible
> >>>>>> for checkpointing the WM?
> >>>>>>
> >>>>>> Given this, why not having the methods as:
> >>>>>>
> >>>>>> public interface WatermarkGenerator<T> {
> >>>>>>
> >>>>>>       Watermark onEvent(T event, long eventTimestamp,
> WatermarkOutput
> >>>>>> output);
> >>>>>>
> >>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
> >>>>>> }
> >>>>>>
> >>>>>> and the caller will be the one enforcing any invariants, such as
> >>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
> >>>>>> anything that is needed as it will have complete knowledge as to if
> >>>>>> the WM was emitted or not.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Kostas
> >>>>>>
> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>>>>>
> >>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]>
> >>>> wrote:
> >>>>>>>
> >>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> unification.
> >>> We
> >>>>>>> have considered this FLIP already in the interfaces for FLIP-95 [1]
> >>> and
> >>>>>>> look forward to update to the new unified watermark generators once
> >>>>>>> FLIP-126 has been accepted.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>> [1] https://github.com/apache/flink/pull/11692
> >>>>>>>
> >>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>>>>>> Hi Everyone!
> >>>>>>>>
> >>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
> >>> separate)
> >>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an
> >>>>>>>> experimental branch. I expanded on that work to provide a PoC for
> >>> the
> >>>>>>>> changes proposed in this FLIP: [2].
> >>>>>>>>
> >>>>>>>> Currently, we have two different flavours of Watermark
> >>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>>>>>> from TimestampAssigner. This means that sources that want to
> >>>>>>>> support
> >>>>>>>> watermark assignment/extraction in the source need to support two
> >>>>>>>> separate interfaces, we have two operator implementations for the
> >>>>>>>> different flavours. Also, this makes features such as generic
> >>> support
> >>>>>>>> for idleness detection more complicated to implemented because we
> >>>> again
> >>>>>>>> have to support two types of watermark assigners.
> >>>>>>>>
> >>>>>>>> In this FLIP we propose two things:
> >>>>>>>>
> >>>>>>>> Unify the Watermark Assigners into one Interface
> WatermarkGenerator
> >>>>>>>> Separate this new interface from the TimestampAssigner
> >>>>>>>> The motivation for the first is to simplify future implementations
> >>> and
> >>>>>>>> code duplication. The motivation for the second point is again
> code
> >>>>>>>> deduplication, most assigners currently have to extend from some
> >>> base
> >>>>>>>> timestamp extractor or duplicate the extraction logic, or users
> >>>>>>>> have
> >>>> to
> >>>>>>>> override an abstract method of the watermark assigner to provide
> >>>>>>>> the
> >>>>>>>> timestamp extraction logic.
> >>>>>>>>
> >>>>>>>> Additionally, we propose to add a generic wrapping
> >>> WatermarkGenerator
> >>>>>>>> that provides idleness detection, i.e. it can mark a
> >>> stream/partition
> >>>>>> as
> >>>>>>>> idle if no data arrives after a configured timeout.
> >>>>>>>>
> >>>>>>>> The "unify and separate" part refers to the fact that we want to
> >>> unify
> >>>>>>>> punctuated and periodic assigners but at the same time split the
> >>>>>>>> timestamp assigner from the watermark generator.
> >>>>>>>>
> >>>>>>>> Please find more details in the FLIP [1]. Looking forward to
> >>>>>>>> your feedback.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Jark Wu-2
Thanks for the explanation. I like the fatory pattern to make the member
variables immutable and final.

So +1 to the proposal.

Best,
Jark

On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]> wrote:

> I am fine with that.
>
> Much of the principles seem agreed upon. I understand the need to support
> code-generated extractors and we should support most of it already (as
> Aljoscha mentioned via the factories) can extend this if needed.
>
> I think that the factory approach supports code-generated extractors in a
> cleaner way even than an extractor with an open/init method.
>
>
> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > We're slightly running out of time. I would propose we vote on the basic
> > principle and remain open to later additions. This feature is quite
> > important to make the new Kafka Source that is developed as part of
> > FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
> > the newly added connector.
> >
> > I know that's a bit unorthodox but would everyone be OK with what's
> > currently there and then we iterate?
> >
> > Best,
> > Aljoscha
> >
> > On 11.05.20 13:57, Aljoscha Krettek wrote:
> > > Ah, I meant to write this in my previous email, sorry about that.
> > >
> > > The WatermarkStrategy, which is basically a factory for a
> > > WatermarkGenerator is the replacement for the open() method. This is
> the
> > > same strategy that was followed for StreamOperatorFactory, which was
> > > introduced to allow code generation in the Table API [1]. If we need
> > > metrics or other things we would add that as a parameter to the factory
> > > method. What do you think?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-11974
> > >
> > > On 10.05.20 05:07, Jark Wu wrote:
> > >> Hi,
> > >>
> > >> Regarding to the `open()/close()`, I think it's necessary for
> > >> Table&SQL to
> > >> compile the generated code.
> > >> In Table&SQL, the watermark strategy and event-timestamp is defined
> > using
> > >> SQL expressions, we will
> > >> translate and generate Java code for the expressions. If we have
> > >> `open()/close()`, we don't need lazy initialization.
> > >> Besides that, I can see a need to report some metrics, e.g. the
> current
> > >> watermark, the dirty timestamps (null value), etc.
> > >> So I think a simple `open()/close()` with a context which can get
> > >> MetricGroup is nice and not complex for the first version.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >>
> > >> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]> wrote:
> > >>
> > >>> Thanks, Aljoscha, for picking this up.
> > >>>
> > >>> I agree with the approach of doing the here proposed set of changes
> for
> > >>> now. It already makes things simpler and adds idleness support
> > >>> everywhere.
> > >>>
> > >>> Rich functions and state always add complexity, let's do this in a
> next
> > >>> step, if we have a really compelling case.
> > >>>
> > >>>
> > >>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> [hidden email]>
> > >>> wrote:
> > >>>
> > >>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal
> > is
> > >>>> basically to turn emitting into a "flatMap", we give the
> > >>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
> can
> > >>>> decide whether to output a watermark or not and can also mark the
> > >>>> output
> > >>>> as idle. Changing the interface to return a Watermark (as the
> previous
> > >>>> watermark assigner interface did) would not allow that flexibility.
> > >>>>
> > >>>> Regarding checkpointing the watermark and keeping track of the
> minimum
> > >>>> watermark, this would be the responsibility of the framework (or the
> > >>>> KafkaConsumer in the current implementation). The user-supplied WG
> > does
> > >>>> not need to make sure the watermark doesn't regress.
> > >>>>
> > >>>> Regarding making the WG a "rich function", I can see the potential
> > >>>> benefit but I also see a lot of pitfalls. For example, how should
> the
> > >>>> watermark state be handled in the case of scale-in? It could be made
> > to
> > >>>> work in the Kafka case by attaching the state to the partition state
> > >>>> that we keep, but then we have potential backwards compatibility
> > >>>> problems also for the WM state. Does the WG usually need to keep the
> > >>>> state or might it be enough if the state is transient, i.e. if you
> > have
> > >>>> a restart the WG would loose its histogram but it would rebuild it
> > >>>> quickly and you would get back to the same steady state as before.
> > >>>>
> > >>>> Best,
> > >>>> Aljoscha
> > >>>>
> > >>>> On 27.04.20 12:12, David Anderson wrote:
> > >>>>> Overall I like this proposal; thanks for bringing it forward,
> > >>>>> Aljoscha.
> > >>>>>
> > >>>>> I also like the idea of making the Watermark generator a rich
> > function
> > >>> --
> > >>>>> this should make it more straightforward to implement smarter
> > >>>>> watermark
> > >>>>> generators. Eg, one that uses state to keep statistics about the
> > >>>>> actual
> > >>>>> out-of-orderness, and uses those statistics to implement a variable
> > >>>> delay.
> > >>>>>
> > >>>>> David
> > >>>>>
> > >>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
> [hidden email]>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hi Aljoscha,
> > >>>>>>
> > >>>>>> Thanks for opening the discussion!
> > >>>>>>
> > >>>>>> I have two comments on the FLIP:
> > >>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
> > >>>>>> close(), probably with a Context as argument: I have not fully
> > >>>>>> thought
> > >>>>>> this through but I think that this is more aligned with the rest
> of
> > >>>>>> our rich functions. In addition, it will allow, for example, to
> > >>>>>> initialize the Watermark value, if we decide to checkpoint the
> > >>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
> > >>>>>> anything in the open()).
> > >>>>>> 2) aligned with the above, and with the case where we want to
> > >>>>>> checkpoint the watermark in mind, I am wondering about how we
> could
> > >>>>>> implement this in the future. In the FLIP, it is proposed to
> expose
> > >>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
> Given
> > >>>>>> that there is the implicit contract that watermarks are
> > >>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> > >>>>>> assume) a check that will compare the last emitted WM against the
> > >>>>>> provided one, and emit it only if it is >=. If not, then we risk
> > >>>>>> having the user shooting himself on the foot if he/she
> accidentally
> > >>>>>> forgets the check. Given that the WatermarkGenerator and its
> > >>>>>> caller do
> > >>>>>> not know if the watermark was finally emitted or not (the
> > >>>>>> WatermarkOutput#emitWatermark returns void), who will be
> responsible
> > >>>>>> for checkpointing the WM?
> > >>>>>>
> > >>>>>> Given this, why not having the methods as:
> > >>>>>>
> > >>>>>> public interface WatermarkGenerator<T> {
> > >>>>>>
> > >>>>>>       Watermark onEvent(T event, long eventTimestamp,
> > WatermarkOutput
> > >>>>>> output);
> > >>>>>>
> > >>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
> > >>>>>> }
> > >>>>>>
> > >>>>>> and the caller will be the one enforcing any invariants, such as
> > >>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
> > >>>>>> anything that is needed as it will have complete knowledge as to
> if
> > >>>>>> the WM was emitted or not.
> > >>>>>>
> > >>>>>> What do you think?
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>> Kostas
> > >>>>>>
> > >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> > >>>>>>
> > >>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> > unification.
> > >>> We
> > >>>>>>> have considered this FLIP already in the interfaces for FLIP-95
> [1]
> > >>> and
> > >>>>>>> look forward to update to the new unified watermark generators
> once
> > >>>>>>> FLIP-126 has been accepted.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Timo
> > >>>>>>>
> > >>>>>>> [1] https://github.com/apache/flink/pull/11692
> > >>>>>>>
> > >>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> > >>>>>>>> Hi Everyone!
> > >>>>>>>>
> > >>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
> > >>> separate)
> > >>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an
> > >>>>>>>> experimental branch. I expanded on that work to provide a PoC
> for
> > >>> the
> > >>>>>>>> changes proposed in this FLIP: [2].
> > >>>>>>>>
> > >>>>>>>> Currently, we have two different flavours of Watermark
> > >>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> > >>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> > >>>>>>>> from TimestampAssigner. This means that sources that want to
> > >>>>>>>> support
> > >>>>>>>> watermark assignment/extraction in the source need to support
> two
> > >>>>>>>> separate interfaces, we have two operator implementations for
> the
> > >>>>>>>> different flavours. Also, this makes features such as generic
> > >>> support
> > >>>>>>>> for idleness detection more complicated to implemented because
> we
> > >>>> again
> > >>>>>>>> have to support two types of watermark assigners.
> > >>>>>>>>
> > >>>>>>>> In this FLIP we propose two things:
> > >>>>>>>>
> > >>>>>>>> Unify the Watermark Assigners into one Interface
> > WatermarkGenerator
> > >>>>>>>> Separate this new interface from the TimestampAssigner
> > >>>>>>>> The motivation for the first is to simplify future
> implementations
> > >>> and
> > >>>>>>>> code duplication. The motivation for the second point is again
> > code
> > >>>>>>>> deduplication, most assigners currently have to extend from some
> > >>> base
> > >>>>>>>> timestamp extractor or duplicate the extraction logic, or users
> > >>>>>>>> have
> > >>>> to
> > >>>>>>>> override an abstract method of the watermark assigner to provide
> > >>>>>>>> the
> > >>>>>>>> timestamp extraction logic.
> > >>>>>>>>
> > >>>>>>>> Additionally, we propose to add a generic wrapping
> > >>> WatermarkGenerator
> > >>>>>>>> that provides idleness detection, i.e. it can mark a
> > >>> stream/partition
> > >>>>>> as
> > >>>>>>>> idle if no data arrives after a configured timeout.
> > >>>>>>>>
> > >>>>>>>> The "unify and separate" part refers to the fact that we want to
> > >>> unify
> > >>>>>>>> punctuated and periodic assigners but at the same time split the
> > >>>>>>>> timestamp assigner from the watermark generator.
> > >>>>>>>>
> > >>>>>>>> Please find more details in the FLIP [1]. Looking forward to
> > >>>>>>>> your feedback.
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> Aljoscha
> > >>>>>>>>
> > >>>>>>>> [1]
> > >>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> > >>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

dwysakowicz
Hi Aljoscha,

Sorry for adding comments during the vote, but I have some really minor
suggestions that should not influence the voting thread imo.

1) Does it make sense to have the TimestampAssigner extend from Flink's
Function? This implies it has to be serializable which with the factory
pattern is not strictly necessary, right? BTW I really like that you
suggested the FunctionInterface annotation there.

2) Could we rename the IdentityTimestampAssigner to e.g.
RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
Personally I found the IdentityTimestampAssigner a bit misleading as it
usually mean a no-op. Which did not click for me, as I assumed it
somehow returns the incoming record itself.

3) Could we rename the second parameter of TimestampAssigner#extract to
e.g. recordTimestamp/nativeTimestamp? This is similar to the point
above. This parameter was also a bit confusing for me as I thought at
times its somehow related to
TimerService#currentProcessingTimestamp()/currentWatermark() as the
whole system currentTimestamp.

Other than those three points I like the proposal and I was about to
vote +1 if it was not for those three points.

Best,

Dawid

On 11/05/2020 16:57, Jark Wu wrote:

> Thanks for the explanation. I like the fatory pattern to make the member
> variables immutable and final.
>
> So +1 to the proposal.
>
> Best,
> Jark
>
> On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]> wrote:
>
>> I am fine with that.
>>
>> Much of the principles seem agreed upon. I understand the need to support
>> code-generated extractors and we should support most of it already (as
>> Aljoscha mentioned via the factories) can extend this if needed.
>>
>> I think that the factory approach supports code-generated extractors in a
>> cleaner way even than an extractor with an open/init method.
>>
>>
>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> We're slightly running out of time. I would propose we vote on the basic
>>> principle and remain open to later additions. This feature is quite
>>> important to make the new Kafka Source that is developed as part of
>>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
>>> the newly added connector.
>>>
>>> I know that's a bit unorthodox but would everyone be OK with what's
>>> currently there and then we iterate?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
>>>> Ah, I meant to write this in my previous email, sorry about that.
>>>>
>>>> The WatermarkStrategy, which is basically a factory for a
>>>> WatermarkGenerator is the replacement for the open() method. This is
>> the
>>>> same strategy that was followed for StreamOperatorFactory, which was
>>>> introduced to allow code generation in the Table API [1]. If we need
>>>> metrics or other things we would add that as a parameter to the factory
>>>> method. What do you think?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
>>>>
>>>> On 10.05.20 05:07, Jark Wu wrote:
>>>>> Hi,
>>>>>
>>>>> Regarding to the `open()/close()`, I think it's necessary for
>>>>> Table&SQL to
>>>>> compile the generated code.
>>>>> In Table&SQL, the watermark strategy and event-timestamp is defined
>>> using
>>>>> SQL expressions, we will
>>>>> translate and generate Java code for the expressions. If we have
>>>>> `open()/close()`, we don't need lazy initialization.
>>>>> Besides that, I can see a need to report some metrics, e.g. the
>> current
>>>>> watermark, the dirty timestamps (null value), etc.
>>>>> So I think a simple `open()/close()` with a context which can get
>>>>> MetricGroup is nice and not complex for the first version.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>>
>>>>>
>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>>> Thanks, Aljoscha, for picking this up.
>>>>>>
>>>>>> I agree with the approach of doing the here proposed set of changes
>> for
>>>>>> now. It already makes things simpler and adds idleness support
>>>>>> everywhere.
>>>>>>
>>>>>> Rich functions and state always add complexity, let's do this in a
>> next
>>>>>> step, if we have a really compelling case.
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
>> [hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal
>>> is
>>>>>>> basically to turn emitting into a "flatMap", we give the
>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
>> can
>>>>>>> decide whether to output a watermark or not and can also mark the
>>>>>>> output
>>>>>>> as idle. Changing the interface to return a Watermark (as the
>> previous
>>>>>>> watermark assigner interface did) would not allow that flexibility.
>>>>>>>
>>>>>>> Regarding checkpointing the watermark and keeping track of the
>> minimum
>>>>>>> watermark, this would be the responsibility of the framework (or the
>>>>>>> KafkaConsumer in the current implementation). The user-supplied WG
>>> does
>>>>>>> not need to make sure the watermark doesn't regress.
>>>>>>>
>>>>>>> Regarding making the WG a "rich function", I can see the potential
>>>>>>> benefit but I also see a lot of pitfalls. For example, how should
>> the
>>>>>>> watermark state be handled in the case of scale-in? It could be made
>>> to
>>>>>>> work in the Kafka case by attaching the state to the partition state
>>>>>>> that we keep, but then we have potential backwards compatibility
>>>>>>> problems also for the WM state. Does the WG usually need to keep the
>>>>>>> state or might it be enough if the state is transient, i.e. if you
>>> have
>>>>>>> a restart the WG would loose its histogram but it would rebuild it
>>>>>>> quickly and you would get back to the same steady state as before.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 27.04.20 12:12, David Anderson wrote:
>>>>>>>> Overall I like this proposal; thanks for bringing it forward,
>>>>>>>> Aljoscha.
>>>>>>>>
>>>>>>>> I also like the idea of making the Watermark generator a rich
>>> function
>>>>>> --
>>>>>>>> this should make it more straightforward to implement smarter
>>>>>>>> watermark
>>>>>>>> generators. Eg, one that uses state to keep statistics about the
>>>>>>>> actual
>>>>>>>> out-of-orderness, and uses those statistics to implement a variable
>>>>>>> delay.
>>>>>>>> David
>>>>>>>>
>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
>> [hidden email]>
>>>>>>> wrote:
>>>>>>>>> Hi Aljoscha,
>>>>>>>>>
>>>>>>>>> Thanks for opening the discussion!
>>>>>>>>>
>>>>>>>>> I have two comments on the FLIP:
>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
>>>>>>>>> close(), probably with a Context as argument: I have not fully
>>>>>>>>> thought
>>>>>>>>> this through but I think that this is more aligned with the rest
>> of
>>>>>>>>> our rich functions. In addition, it will allow, for example, to
>>>>>>>>> initialize the Watermark value, if we decide to checkpoint the
>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
>>>>>>>>> anything in the open()).
>>>>>>>>> 2) aligned with the above, and with the case where we want to
>>>>>>>>> checkpoint the watermark in mind, I am wondering about how we
>> could
>>>>>>>>> implement this in the future. In the FLIP, it is proposed to
>> expose
>>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
>> Given
>>>>>>>>> that there is the implicit contract that watermarks are
>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
>>>>>>>>> assume) a check that will compare the last emitted WM against the
>>>>>>>>> provided one, and emit it only if it is >=. If not, then we risk
>>>>>>>>> having the user shooting himself on the foot if he/she
>> accidentally
>>>>>>>>> forgets the check. Given that the WatermarkGenerator and its
>>>>>>>>> caller do
>>>>>>>>> not know if the watermark was finally emitted or not (the
>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
>> responsible
>>>>>>>>> for checkpointing the WM?
>>>>>>>>>
>>>>>>>>> Given this, why not having the methods as:
>>>>>>>>>
>>>>>>>>> public interface WatermarkGenerator<T> {
>>>>>>>>>
>>>>>>>>>       Watermark onEvent(T event, long eventTimestamp,
>>> WatermarkOutput
>>>>>>>>> output);
>>>>>>>>>
>>>>>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> and the caller will be the one enforcing any invariants, such as
>>>>>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
>>>>>>>>> anything that is needed as it will have complete knowledge as to
>> if
>>>>>>>>> the WM was emitted or not.
>>>>>>>>>
>>>>>>>>> What do you think?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>>>>>
>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]>
>>>>>>> wrote:
>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
>>> unification.
>>>>>> We
>>>>>>>>>> have considered this FLIP already in the interfaces for FLIP-95
>> [1]
>>>>>> and
>>>>>>>>>> look forward to update to the new unified watermark generators
>> once
>>>>>>>>>> FLIP-126 has been accepted.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>>>>>
>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>>>>>> Hi Everyone!
>>>>>>>>>>>
>>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
>>>>>> separate)
>>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an
>>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC
>> for
>>>>>> the
>>>>>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>>>>>
>>>>>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>>>>>> from TimestampAssigner. This means that sources that want to
>>>>>>>>>>> support
>>>>>>>>>>> watermark assignment/extraction in the source need to support
>> two
>>>>>>>>>>> separate interfaces, we have two operator implementations for
>> the
>>>>>>>>>>> different flavours. Also, this makes features such as generic
>>>>>> support
>>>>>>>>>>> for idleness detection more complicated to implemented because
>> we
>>>>>>> again
>>>>>>>>>>> have to support two types of watermark assigners.
>>>>>>>>>>>
>>>>>>>>>>> In this FLIP we propose two things:
>>>>>>>>>>>
>>>>>>>>>>> Unify the Watermark Assigners into one Interface
>>> WatermarkGenerator
>>>>>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>>>>>> The motivation for the first is to simplify future
>> implementations
>>>>>> and
>>>>>>>>>>> code duplication. The motivation for the second point is again
>>> code
>>>>>>>>>>> deduplication, most assigners currently have to extend from some
>>>>>> base
>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or users
>>>>>>>>>>> have
>>>>>>> to
>>>>>>>>>>> override an abstract method of the watermark assigner to provide
>>>>>>>>>>> the
>>>>>>>>>>> timestamp extraction logic.
>>>>>>>>>>>
>>>>>>>>>>> Additionally, we propose to add a generic wrapping
>>>>>> WatermarkGenerator
>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
>>>>>> stream/partition
>>>>>>>>> as
>>>>>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>>>>>
>>>>>>>>>>> The "unify and separate" part refers to the fact that we want to
>>>>>> unify
>>>>>>>>>>> punctuated and periodic assigners but at the same time split the
>>>>>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>>>>>
>>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
>>>>>>>>>>> your feedback.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>>>>>>>>
>>>>>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>>>>
>>>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Stephan Ewen
+1 to all of Dawid's suggestions, makes a lot of sense to me

On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi Aljoscha,
>
> Sorry for adding comments during the vote, but I have some really minor
> suggestions that should not influence the voting thread imo.
>
> 1) Does it make sense to have the TimestampAssigner extend from Flink's
> Function? This implies it has to be serializable which with the factory
> pattern is not strictly necessary, right? BTW I really like that you
> suggested the FunctionInterface annotation there.
>
> 2) Could we rename the IdentityTimestampAssigner to e.g.
> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
> Personally I found the IdentityTimestampAssigner a bit misleading as it
> usually mean a no-op. Which did not click for me, as I assumed it
> somehow returns the incoming record itself.
>
> 3) Could we rename the second parameter of TimestampAssigner#extract to
> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
> above. This parameter was also a bit confusing for me as I thought at
> times its somehow related to
> TimerService#currentProcessingTimestamp()/currentWatermark() as the
> whole system currentTimestamp.
>
> Other than those three points I like the proposal and I was about to
> vote +1 if it was not for those three points.
>
> Best,
>
> Dawid
>
> On 11/05/2020 16:57, Jark Wu wrote:
> > Thanks for the explanation. I like the fatory pattern to make the member
> > variables immutable and final.
> >
> > So +1 to the proposal.
> >
> > Best,
> > Jark
> >
> > On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]> wrote:
> >
> >> I am fine with that.
> >>
> >> Much of the principles seem agreed upon. I understand the need to
> support
> >> code-generated extractors and we should support most of it already (as
> >> Aljoscha mentioned via the factories) can extend this if needed.
> >>
> >> I think that the factory approach supports code-generated extractors in
> a
> >> cleaner way even than an extractor with an open/init method.
> >>
> >>
> >> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <[hidden email]>
> >> wrote:
> >>
> >>> We're slightly running out of time. I would propose we vote on the
> basic
> >>> principle and remain open to later additions. This feature is quite
> >>> important to make the new Kafka Source that is developed as part of
> >>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
> >>> the newly added connector.
> >>>
> >>> I know that's a bit unorthodox but would everyone be OK with what's
> >>> currently there and then we iterate?
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> On 11.05.20 13:57, Aljoscha Krettek wrote:
> >>>> Ah, I meant to write this in my previous email, sorry about that.
> >>>>
> >>>> The WatermarkStrategy, which is basically a factory for a
> >>>> WatermarkGenerator is the replacement for the open() method. This is
> >> the
> >>>> same strategy that was followed for StreamOperatorFactory, which was
> >>>> introduced to allow code generation in the Table API [1]. If we need
> >>>> metrics or other things we would add that as a parameter to the
> factory
> >>>> method. What do you think?
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
> >>>>
> >>>> On 10.05.20 05:07, Jark Wu wrote:
> >>>>> Hi,
> >>>>>
> >>>>> Regarding to the `open()/close()`, I think it's necessary for
> >>>>> Table&SQL to
> >>>>> compile the generated code.
> >>>>> In Table&SQL, the watermark strategy and event-timestamp is defined
> >>> using
> >>>>> SQL expressions, we will
> >>>>> translate and generate Java code for the expressions. If we have
> >>>>> `open()/close()`, we don't need lazy initialization.
> >>>>> Besides that, I can see a need to report some metrics, e.g. the
> >> current
> >>>>> watermark, the dirty timestamps (null value), etc.
> >>>>> So I think a simple `open()/close()` with a context which can get
> >>>>> MetricGroup is nice and not complex for the first version.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]> wrote:
> >>>>>
> >>>>>> Thanks, Aljoscha, for picking this up.
> >>>>>>
> >>>>>> I agree with the approach of doing the here proposed set of changes
> >> for
> >>>>>> now. It already makes things simpler and adds idleness support
> >>>>>> everywhere.
> >>>>>>
> >>>>>> Rich functions and state always add complexity, let's do this in a
> >> next
> >>>>>> step, if we have a really compelling case.
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> >> [hidden email]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
> proposal
> >>> is
> >>>>>>> basically to turn emitting into a "flatMap", we give the
> >>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
> >> can
> >>>>>>> decide whether to output a watermark or not and can also mark the
> >>>>>>> output
> >>>>>>> as idle. Changing the interface to return a Watermark (as the
> >> previous
> >>>>>>> watermark assigner interface did) would not allow that flexibility.
> >>>>>>>
> >>>>>>> Regarding checkpointing the watermark and keeping track of the
> >> minimum
> >>>>>>> watermark, this would be the responsibility of the framework (or
> the
> >>>>>>> KafkaConsumer in the current implementation). The user-supplied WG
> >>> does
> >>>>>>> not need to make sure the watermark doesn't regress.
> >>>>>>>
> >>>>>>> Regarding making the WG a "rich function", I can see the potential
> >>>>>>> benefit but I also see a lot of pitfalls. For example, how should
> >> the
> >>>>>>> watermark state be handled in the case of scale-in? It could be
> made
> >>> to
> >>>>>>> work in the Kafka case by attaching the state to the partition
> state
> >>>>>>> that we keep, but then we have potential backwards compatibility
> >>>>>>> problems also for the WM state. Does the WG usually need to keep
> the
> >>>>>>> state or might it be enough if the state is transient, i.e. if you
> >>> have
> >>>>>>> a restart the WG would loose its histogram but it would rebuild it
> >>>>>>> quickly and you would get back to the same steady state as before.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>> On 27.04.20 12:12, David Anderson wrote:
> >>>>>>>> Overall I like this proposal; thanks for bringing it forward,
> >>>>>>>> Aljoscha.
> >>>>>>>>
> >>>>>>>> I also like the idea of making the Watermark generator a rich
> >>> function
> >>>>>> --
> >>>>>>>> this should make it more straightforward to implement smarter
> >>>>>>>> watermark
> >>>>>>>> generators. Eg, one that uses state to keep statistics about the
> >>>>>>>> actual
> >>>>>>>> out-of-orderness, and uses those statistics to implement a
> variable
> >>>>>>> delay.
> >>>>>>>> David
> >>>>>>>>
> >>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
> >> [hidden email]>
> >>>>>>> wrote:
> >>>>>>>>> Hi Aljoscha,
> >>>>>>>>>
> >>>>>>>>> Thanks for opening the discussion!
> >>>>>>>>>
> >>>>>>>>> I have two comments on the FLIP:
> >>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
> >>>>>>>>> close(), probably with a Context as argument: I have not fully
> >>>>>>>>> thought
> >>>>>>>>> this through but I think that this is more aligned with the rest
> >> of
> >>>>>>>>> our rich functions. In addition, it will allow, for example, to
> >>>>>>>>> initialize the Watermark value, if we decide to checkpoint the
> >>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
> >>>>>>>>> anything in the open()).
> >>>>>>>>> 2) aligned with the above, and with the case where we want to
> >>>>>>>>> checkpoint the watermark in mind, I am wondering about how we
> >> could
> >>>>>>>>> implement this in the future. In the FLIP, it is proposed to
> >> expose
> >>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
> >> Given
> >>>>>>>>> that there is the implicit contract that watermarks are
> >>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> >>>>>>>>> assume) a check that will compare the last emitted WM against the
> >>>>>>>>> provided one, and emit it only if it is >=. If not, then we risk
> >>>>>>>>> having the user shooting himself on the foot if he/she
> >> accidentally
> >>>>>>>>> forgets the check. Given that the WatermarkGenerator and its
> >>>>>>>>> caller do
> >>>>>>>>> not know if the watermark was finally emitted or not (the
> >>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
> >> responsible
> >>>>>>>>> for checkpointing the WM?
> >>>>>>>>>
> >>>>>>>>> Given this, why not having the methods as:
> >>>>>>>>>
> >>>>>>>>> public interface WatermarkGenerator<T> {
> >>>>>>>>>
> >>>>>>>>>       Watermark onEvent(T event, long eventTimestamp,
> >>> WatermarkOutput
> >>>>>>>>> output);
> >>>>>>>>>
> >>>>>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> and the caller will be the one enforcing any invariants, such as
> >>>>>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
> >>>>>>>>> anything that is needed as it will have complete knowledge as to
> >> if
> >>>>>>>>> the WM was emitted or not.
> >>>>>>>>>
> >>>>>>>>> What do you think?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Kostas
> >>>>>>>>>
> >>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>>>>>>>>
> >>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]
> >
> >>>>>>> wrote:
> >>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> >>> unification.
> >>>>>> We
> >>>>>>>>>> have considered this FLIP already in the interfaces for FLIP-95
> >> [1]
> >>>>>> and
> >>>>>>>>>> look forward to update to the new unified watermark generators
> >> once
> >>>>>>>>>> FLIP-126 has been accepted.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
> >>>>>>>>>>
> >>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>>>>>>>>> Hi Everyone!
> >>>>>>>>>>>
> >>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
> >>>>>> separate)
> >>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in
> an
> >>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC
> >> for
> >>>>>> the
> >>>>>>>>>>> changes proposed in this FLIP: [2].
> >>>>>>>>>>>
> >>>>>>>>>>> Currently, we have two different flavours of Watermark
> >>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>>>>>>>>> from TimestampAssigner. This means that sources that want to
> >>>>>>>>>>> support
> >>>>>>>>>>> watermark assignment/extraction in the source need to support
> >> two
> >>>>>>>>>>> separate interfaces, we have two operator implementations for
> >> the
> >>>>>>>>>>> different flavours. Also, this makes features such as generic
> >>>>>> support
> >>>>>>>>>>> for idleness detection more complicated to implemented because
> >> we
> >>>>>>> again
> >>>>>>>>>>> have to support two types of watermark assigners.
> >>>>>>>>>>>
> >>>>>>>>>>> In this FLIP we propose two things:
> >>>>>>>>>>>
> >>>>>>>>>>> Unify the Watermark Assigners into one Interface
> >>> WatermarkGenerator
> >>>>>>>>>>> Separate this new interface from the TimestampAssigner
> >>>>>>>>>>> The motivation for the first is to simplify future
> >> implementations
> >>>>>> and
> >>>>>>>>>>> code duplication. The motivation for the second point is again
> >>> code
> >>>>>>>>>>> deduplication, most assigners currently have to extend from
> some
> >>>>>> base
> >>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or users
> >>>>>>>>>>> have
> >>>>>>> to
> >>>>>>>>>>> override an abstract method of the watermark assigner to
> provide
> >>>>>>>>>>> the
> >>>>>>>>>>> timestamp extraction logic.
> >>>>>>>>>>>
> >>>>>>>>>>> Additionally, we propose to add a generic wrapping
> >>>>>> WatermarkGenerator
> >>>>>>>>>>> that provides idleness detection, i.e. it can mark a
> >>>>>> stream/partition
> >>>>>>>>> as
> >>>>>>>>>>> idle if no data arrives after a configured timeout.
> >>>>>>>>>>>
> >>>>>>>>>>> The "unify and separate" part refers to the fact that we want
> to
> >>>>>> unify
> >>>>>>>>>>> punctuated and periodic assigners but at the same time split
> the
> >>>>>>>>>>> timestamp assigner from the watermark generator.
> >>>>>>>>>>>
> >>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
> >>>>>>>>>>> your feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Aljoscha
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>>>>>>>>>
> >>>>>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>>>>>
> >>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Aljoscha Krettek-2
Definitely +1 to point 2) raised by Dawid. I'm not sure on points 1) and 3).

1) I can see the benefit of that but in reality most timestamp assigners
will probably need to be Serializable. If you look at my (updated) POC
branch [1] you can see how a TimestampAssigner would be specified on the
WatermarkStrategies helper class: [2]. The signature of this would have
to be changed to something like:

public <TA extends TimestampAssigner<T> & Serializable>
WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)

Then, however, it would not be possible for users to specify a lambda or
anonymous inner function for the TimestampAssigner like this:

WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
                .forGenerator(new PeriodicTestWatermarkGenerator())
                .withTimestampAssigner((event, timestamp) -> event)
                .build();

3) This makes sense if we only allow WatermarkStrategies on sources,
where the previous timestamp really is the "native" timestamp.
Currently, we also allow setting watermark strategies at arbitrary
points in the graph. I'm thinking we probably should only allow that in
sources but it's not the reality currently. I'm not against renaming it,
just voicing those thoughts.

Best,
Aljoscha


[1] https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
[2]
https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81

On 12.05.20 15:48, Stephan Ewen wrote:

> +1 to all of Dawid's suggestions, makes a lot of sense to me
>
> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <[hidden email]>
> wrote:
>
>> Hi Aljoscha,
>>
>> Sorry for adding comments during the vote, but I have some really minor
>> suggestions that should not influence the voting thread imo.
>>
>> 1) Does it make sense to have the TimestampAssigner extend from Flink's
>> Function? This implies it has to be serializable which with the factory
>> pattern is not strictly necessary, right? BTW I really like that you
>> suggested the FunctionInterface annotation there.
>>
>> 2) Could we rename the IdentityTimestampAssigner to e.g.
>> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
>> Personally I found the IdentityTimestampAssigner a bit misleading as it
>> usually mean a no-op. Which did not click for me, as I assumed it
>> somehow returns the incoming record itself.
>>
>> 3) Could we rename the second parameter of TimestampAssigner#extract to
>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
>> above. This parameter was also a bit confusing for me as I thought at
>> times its somehow related to
>> TimerService#currentProcessingTimestamp()/currentWatermark() as the
>> whole system currentTimestamp.
>>
>> Other than those three points I like the proposal and I was about to
>> vote +1 if it was not for those three points.
>>
>> Best,
>>
>> Dawid
>>
>> On 11/05/2020 16:57, Jark Wu wrote:
>>> Thanks for the explanation. I like the fatory pattern to make the member
>>> variables immutable and final.
>>>
>>> So +1 to the proposal.
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]> wrote:
>>>
>>>> I am fine with that.
>>>>
>>>> Much of the principles seem agreed upon. I understand the need to
>> support
>>>> code-generated extractors and we should support most of it already (as
>>>> Aljoscha mentioned via the factories) can extend this if needed.
>>>>
>>>> I think that the factory approach supports code-generated extractors in
>> a
>>>> cleaner way even than an extractor with an open/init method.
>>>>
>>>>
>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>
>>>>> We're slightly running out of time. I would propose we vote on the
>> basic
>>>>> principle and remain open to later additions. This feature is quite
>>>>> important to make the new Kafka Source that is developed as part of
>>>>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
>>>>> the newly added connector.
>>>>>
>>>>> I know that's a bit unorthodox but would everyone be OK with what's
>>>>> currently there and then we iterate?
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
>>>>>> Ah, I meant to write this in my previous email, sorry about that.
>>>>>>
>>>>>> The WatermarkStrategy, which is basically a factory for a
>>>>>> WatermarkGenerator is the replacement for the open() method. This is
>>>> the
>>>>>> same strategy that was followed for StreamOperatorFactory, which was
>>>>>> introduced to allow code generation in the Table API [1]. If we need
>>>>>> metrics or other things we would add that as a parameter to the
>> factory
>>>>>> method. What do you think?
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
>>>>>>
>>>>>> On 10.05.20 05:07, Jark Wu wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> Regarding to the `open()/close()`, I think it's necessary for
>>>>>>> Table&SQL to
>>>>>>> compile the generated code.
>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is defined
>>>>> using
>>>>>>> SQL expressions, we will
>>>>>>> translate and generate Java code for the expressions. If we have
>>>>>>> `open()/close()`, we don't need lazy initialization.
>>>>>>> Besides that, I can see a need to report some metrics, e.g. the
>>>> current
>>>>>>> watermark, the dirty timestamps (null value), etc.
>>>>>>> So I think a simple `open()/close()` with a context which can get
>>>>>>> MetricGroup is nice and not complex for the first version.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]> wrote:
>>>>>>>
>>>>>>>> Thanks, Aljoscha, for picking this up.
>>>>>>>>
>>>>>>>> I agree with the approach of doing the here proposed set of changes
>>>> for
>>>>>>>> now. It already makes things simpler and adds idleness support
>>>>>>>> everywhere.
>>>>>>>>
>>>>>>>> Rich functions and state always add complexity, let's do this in a
>>>> next
>>>>>>>> step, if we have a really compelling case.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
>>>> [hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
>> proposal
>>>>> is
>>>>>>>>> basically to turn emitting into a "flatMap", we give the
>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
>>>> can
>>>>>>>>> decide whether to output a watermark or not and can also mark the
>>>>>>>>> output
>>>>>>>>> as idle. Changing the interface to return a Watermark (as the
>>>> previous
>>>>>>>>> watermark assigner interface did) would not allow that flexibility.
>>>>>>>>>
>>>>>>>>> Regarding checkpointing the watermark and keeping track of the
>>>> minimum
>>>>>>>>> watermark, this would be the responsibility of the framework (or
>> the
>>>>>>>>> KafkaConsumer in the current implementation). The user-supplied WG
>>>>> does
>>>>>>>>> not need to make sure the watermark doesn't regress.
>>>>>>>>>
>>>>>>>>> Regarding making the WG a "rich function", I can see the potential
>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how should
>>>> the
>>>>>>>>> watermark state be handled in the case of scale-in? It could be
>> made
>>>>> to
>>>>>>>>> work in the Kafka case by attaching the state to the partition
>> state
>>>>>>>>> that we keep, but then we have potential backwards compatibility
>>>>>>>>> problems also for the WM state. Does the WG usually need to keep
>> the
>>>>>>>>> state or might it be enough if the state is transient, i.e. if you
>>>>> have
>>>>>>>>> a restart the WG would loose its histogram but it would rebuild it
>>>>>>>>> quickly and you would get back to the same steady state as before.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
>>>>>>>>>> Overall I like this proposal; thanks for bringing it forward,
>>>>>>>>>> Aljoscha.
>>>>>>>>>>
>>>>>>>>>> I also like the idea of making the Watermark generator a rich
>>>>> function
>>>>>>>> --
>>>>>>>>>> this should make it more straightforward to implement smarter
>>>>>>>>>> watermark
>>>>>>>>>> generators. Eg, one that uses state to keep statistics about the
>>>>>>>>>> actual
>>>>>>>>>> out-of-orderness, and uses those statistics to implement a
>> variable
>>>>>>>>> delay.
>>>>>>>>>> David
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
>>>> [hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for opening the discussion!
>>>>>>>>>>>
>>>>>>>>>>> I have two comments on the FLIP:
>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
>>>>>>>>>>> close(), probably with a Context as argument: I have not fully
>>>>>>>>>>> thought
>>>>>>>>>>> this through but I think that this is more aligned with the rest
>>>> of
>>>>>>>>>>> our rich functions. In addition, it will allow, for example, to
>>>>>>>>>>> initialize the Watermark value, if we decide to checkpoint the
>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
>>>>>>>>>>> anything in the open()).
>>>>>>>>>>> 2) aligned with the above, and with the case where we want to
>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about how we
>>>> could
>>>>>>>>>>> implement this in the future. In the FLIP, it is proposed to
>>>> expose
>>>>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
>>>> Given
>>>>>>>>>>> that there is the implicit contract that watermarks are
>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
>>>>>>>>>>> assume) a check that will compare the last emitted WM against the
>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we risk
>>>>>>>>>>> having the user shooting himself on the foot if he/she
>>>> accidentally
>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and its
>>>>>>>>>>> caller do
>>>>>>>>>>> not know if the watermark was finally emitted or not (the
>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
>>>> responsible
>>>>>>>>>>> for checkpointing the WM?
>>>>>>>>>>>
>>>>>>>>>>> Given this, why not having the methods as:
>>>>>>>>>>>
>>>>>>>>>>> public interface WatermarkGenerator<T> {
>>>>>>>>>>>
>>>>>>>>>>>        Watermark onEvent(T event, long eventTimestamp,
>>>>> WatermarkOutput
>>>>>>>>>>> output);
>>>>>>>>>>>
>>>>>>>>>>>        Watermark onPeriodicEmit(WatermarkOutput output);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> and the caller will be the one enforcing any invariants, such as
>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
>>>>>>>>>>> anything that is needed as it will have complete knowledge as to
>>>> if
>>>>>>>>>>> the WM was emitted or not.
>>>>>>>>>>>
>>>>>>>>>>> What do you think?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <[hidden email]
>>>
>>>>>>>>> wrote:
>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
>>>>> unification.
>>>>>>>> We
>>>>>>>>>>>> have considered this FLIP already in the interfaces for FLIP-95
>>>> [1]
>>>>>>>> and
>>>>>>>>>>>> look forward to update to the new unified watermark generators
>>>> once
>>>>>>>>>>>> FLIP-126 has been accepted.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Timo
>>>>>>>>>>>>
>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>>>>>>>
>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>>>>>>>> Hi Everyone!
>>>>>>>>>>>>>
>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
>>>>>>>> separate)
>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in
>> an
>>>>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC
>>>> for
>>>>>>>> the
>>>>>>>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>>>>>>>
>>>>>>>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>>>>>>>> from TimestampAssigner. This means that sources that want to
>>>>>>>>>>>>> support
>>>>>>>>>>>>> watermark assignment/extraction in the source need to support
>>>> two
>>>>>>>>>>>>> separate interfaces, we have two operator implementations for
>>>> the
>>>>>>>>>>>>> different flavours. Also, this makes features such as generic
>>>>>>>> support
>>>>>>>>>>>>> for idleness detection more complicated to implemented because
>>>> we
>>>>>>>>> again
>>>>>>>>>>>>> have to support two types of watermark assigners.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In this FLIP we propose two things:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
>>>>> WatermarkGenerator
>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>>>>>>>> The motivation for the first is to simplify future
>>>> implementations
>>>>>>>> and
>>>>>>>>>>>>> code duplication. The motivation for the second point is again
>>>>> code
>>>>>>>>>>>>> deduplication, most assigners currently have to extend from
>> some
>>>>>>>> base
>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or users
>>>>>>>>>>>>> have
>>>>>>>>> to
>>>>>>>>>>>>> override an abstract method of the watermark assigner to
>> provide
>>>>>>>>>>>>> the
>>>>>>>>>>>>> timestamp extraction logic.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
>>>>>>>> stream/partition
>>>>>>>>>>> as
>>>>>>>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The "unify and separate" part refers to the fact that we want
>> to
>>>>>>>> unify
>>>>>>>>>>>>> punctuated and periodic assigners but at the same time split
>> the
>>>>>>>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
>>>>>>>>>>>>> your feedback.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>>>>>>>>>>
>>>>>>>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>>>>>>
>>>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Stephan Ewen
@Aljoscha

About (1) could we have an interface SerializableTimestampAssigner that
simply mixes in the java.io.Serializable interface? Or will this be too
clumsy?

About (3) RecordTimeStamp seems to fit both cases (in-source-record
timestamp, in stream-record timestamp).

On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <[hidden email]>
wrote:

> Definitely +1 to point 2) raised by Dawid. I'm not sure on points 1) and
> 3).
>
> 1) I can see the benefit of that but in reality most timestamp assigners
> will probably need to be Serializable. If you look at my (updated) POC
> branch [1] you can see how a TimestampAssigner would be specified on the
> WatermarkStrategies helper class: [2]. The signature of this would have
> to be changed to something like:
>
> public <TA extends TimestampAssigner<T> & Serializable>
> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)
>
> Then, however, it would not be possible for users to specify a lambda or
> anonymous inner function for the TimestampAssigner like this:
>
> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
>                 .forGenerator(new PeriodicTestWatermarkGenerator())
>                 .withTimestampAssigner((event, timestamp) -> event)
>                 .build();
>
> 3) This makes sense if we only allow WatermarkStrategies on sources,
> where the previous timestamp really is the "native" timestamp.
> Currently, we also allow setting watermark strategies at arbitrary
> points in the graph. I'm thinking we probably should only allow that in
> sources but it's not the reality currently. I'm not against renaming it,
> just voicing those thoughts.
>
> Best,
> Aljoscha
>
>
> [1] https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
> [2]
>
> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81
>
> On 12.05.20 15:48, Stephan Ewen wrote:
> > +1 to all of Dawid's suggestions, makes a lot of sense to me
> >
> > On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <[hidden email]
> >
> > wrote:
> >
> >> Hi Aljoscha,
> >>
> >> Sorry for adding comments during the vote, but I have some really minor
> >> suggestions that should not influence the voting thread imo.
> >>
> >> 1) Does it make sense to have the TimestampAssigner extend from Flink's
> >> Function? This implies it has to be serializable which with the factory
> >> pattern is not strictly necessary, right? BTW I really like that you
> >> suggested the FunctionInterface annotation there.
> >>
> >> 2) Could we rename the IdentityTimestampAssigner to e.g.
> >>
> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
> >> Personally I found the IdentityTimestampAssigner a bit misleading as it
> >> usually mean a no-op. Which did not click for me, as I assumed it
> >> somehow returns the incoming record itself.
> >>
> >> 3) Could we rename the second parameter of TimestampAssigner#extract to
> >> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
> >> above. This parameter was also a bit confusing for me as I thought at
> >> times its somehow related to
> >> TimerService#currentProcessingTimestamp()/currentWatermark() as the
> >> whole system currentTimestamp.
> >>
> >> Other than those three points I like the proposal and I was about to
> >> vote +1 if it was not for those three points.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 11/05/2020 16:57, Jark Wu wrote:
> >>> Thanks for the explanation. I like the fatory pattern to make the
> member
> >>> variables immutable and final.
> >>>
> >>> So +1 to the proposal.
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]> wrote:
> >>>
> >>>> I am fine with that.
> >>>>
> >>>> Much of the principles seem agreed upon. I understand the need to
> >> support
> >>>> code-generated extractors and we should support most of it already (as
> >>>> Aljoscha mentioned via the factories) can extend this if needed.
> >>>>
> >>>> I think that the factory approach supports code-generated extractors
> in
> >> a
> >>>> cleaner way even than an extractor with an open/init method.
> >>>>
> >>>>
> >>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <[hidden email]
> >
> >>>> wrote:
> >>>>
> >>>>> We're slightly running out of time. I would propose we vote on the
> >> basic
> >>>>> principle and remain open to later additions. This feature is quite
> >>>>> important to make the new Kafka Source that is developed as part of
> >>>>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces
> in
> >>>>> the newly added connector.
> >>>>>
> >>>>> I know that's a bit unorthodox but would everyone be OK with what's
> >>>>> currently there and then we iterate?
> >>>>>
> >>>>> Best,
> >>>>> Aljoscha
> >>>>>
> >>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
> >>>>>> Ah, I meant to write this in my previous email, sorry about that.
> >>>>>>
> >>>>>> The WatermarkStrategy, which is basically a factory for a
> >>>>>> WatermarkGenerator is the replacement for the open() method. This is
> >>>> the
> >>>>>> same strategy that was followed for StreamOperatorFactory, which was
> >>>>>> introduced to allow code generation in the Table API [1]. If we need
> >>>>>> metrics or other things we would add that as a parameter to the
> >> factory
> >>>>>> method. What do you think?
> >>>>>>
> >>>>>> Best,
> >>>>>> Aljoscha
> >>>>>>
> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
> >>>>>>
> >>>>>> On 10.05.20 05:07, Jark Wu wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Regarding to the `open()/close()`, I think it's necessary for
> >>>>>>> Table&SQL to
> >>>>>>> compile the generated code.
> >>>>>>> In Table&SQL, the watermark strategy and event-timestamp is defined
> >>>>> using
> >>>>>>> SQL expressions, we will
> >>>>>>> translate and generate Java code for the expressions. If we have
> >>>>>>> `open()/close()`, we don't need lazy initialization.
> >>>>>>> Besides that, I can see a need to report some metrics, e.g. the
> >>>> current
> >>>>>>> watermark, the dirty timestamps (null value), etc.
> >>>>>>> So I think a simple `open()/close()` with a context which can get
> >>>>>>> MetricGroup is nice and not complex for the first version.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]>
> wrote:
> >>>>>>>
> >>>>>>>> Thanks, Aljoscha, for picking this up.
> >>>>>>>>
> >>>>>>>> I agree with the approach of doing the here proposed set of
> changes
> >>>> for
> >>>>>>>> now. It already makes things simpler and adds idleness support
> >>>>>>>> everywhere.
> >>>>>>>>
> >>>>>>>> Rich functions and state always add complexity, let's do this in a
> >>>> next
> >>>>>>>> step, if we have a really compelling case.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> >>>> [hidden email]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
> >> proposal
> >>>>> is
> >>>>>>>>> basically to turn emitting into a "flatMap", we give the
> >>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
> >>>> can
> >>>>>>>>> decide whether to output a watermark or not and can also mark the
> >>>>>>>>> output
> >>>>>>>>> as idle. Changing the interface to return a Watermark (as the
> >>>> previous
> >>>>>>>>> watermark assigner interface did) would not allow that
> flexibility.
> >>>>>>>>>
> >>>>>>>>> Regarding checkpointing the watermark and keeping track of the
> >>>> minimum
> >>>>>>>>> watermark, this would be the responsibility of the framework (or
> >> the
> >>>>>>>>> KafkaConsumer in the current implementation). The user-supplied
> WG
> >>>>> does
> >>>>>>>>> not need to make sure the watermark doesn't regress.
> >>>>>>>>>
> >>>>>>>>> Regarding making the WG a "rich function", I can see the
> potential
> >>>>>>>>> benefit but I also see a lot of pitfalls. For example, how should
> >>>> the
> >>>>>>>>> watermark state be handled in the case of scale-in? It could be
> >> made
> >>>>> to
> >>>>>>>>> work in the Kafka case by attaching the state to the partition
> >> state
> >>>>>>>>> that we keep, but then we have potential backwards compatibility
> >>>>>>>>> problems also for the WM state. Does the WG usually need to keep
> >> the
> >>>>>>>>> state or might it be enough if the state is transient, i.e. if
> you
> >>>>> have
> >>>>>>>>> a restart the WG would loose its histogram but it would rebuild
> it
> >>>>>>>>> quickly and you would get back to the same steady state as
> before.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Aljoscha
> >>>>>>>>>
> >>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
> >>>>>>>>>> Overall I like this proposal; thanks for bringing it forward,
> >>>>>>>>>> Aljoscha.
> >>>>>>>>>>
> >>>>>>>>>> I also like the idea of making the Watermark generator a rich
> >>>>> function
> >>>>>>>> --
> >>>>>>>>>> this should make it more straightforward to implement smarter
> >>>>>>>>>> watermark
> >>>>>>>>>> generators. Eg, one that uses state to keep statistics about the
> >>>>>>>>>> actual
> >>>>>>>>>> out-of-orderness, and uses those statistics to implement a
> >> variable
> >>>>>>>>> delay.
> >>>>>>>>>> David
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
> >>>> [hidden email]>
> >>>>>>>>> wrote:
> >>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for opening the discussion!
> >>>>>>>>>>>
> >>>>>>>>>>> I have two comments on the FLIP:
> >>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e.
> open()/
> >>>>>>>>>>> close(), probably with a Context as argument: I have not fully
> >>>>>>>>>>> thought
> >>>>>>>>>>> this through but I think that this is more aligned with the
> rest
> >>>> of
> >>>>>>>>>>> our rich functions. In addition, it will allow, for example, to
> >>>>>>>>>>> initialize the Watermark value, if we decide to checkpoint the
> >>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to
> do
> >>>>>>>>>>> anything in the open()).
> >>>>>>>>>>> 2) aligned with the above, and with the case where we want to
> >>>>>>>>>>> checkpoint the watermark in mind, I am wondering about how we
> >>>> could
> >>>>>>>>>>> implement this in the future. In the FLIP, it is proposed to
> >>>> expose
> >>>>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
> >>>> Given
> >>>>>>>>>>> that there is the implicit contract that watermarks are
> >>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have
> (I
> >>>>>>>>>>> assume) a check that will compare the last emitted WM against
> the
> >>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we
> risk
> >>>>>>>>>>> having the user shooting himself on the foot if he/she
> >>>> accidentally
> >>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and its
> >>>>>>>>>>> caller do
> >>>>>>>>>>> not know if the watermark was finally emitted or not (the
> >>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
> >>>> responsible
> >>>>>>>>>>> for checkpointing the WM?
> >>>>>>>>>>>
> >>>>>>>>>>> Given this, why not having the methods as:
> >>>>>>>>>>>
> >>>>>>>>>>> public interface WatermarkGenerator<T> {
> >>>>>>>>>>>
> >>>>>>>>>>>        Watermark onEvent(T event, long eventTimestamp,
> >>>>> WatermarkOutput
> >>>>>>>>>>> output);
> >>>>>>>>>>>
> >>>>>>>>>>>        Watermark onPeriodicEmit(WatermarkOutput output);
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> and the caller will be the one enforcing any invariants, such
> as
> >>>>>>>>>>> non-decreasing watermarks. In this way, the caller can
> checkpoint
> >>>>>>>>>>> anything that is needed as it will have complete knowledge as
> to
> >>>> if
> >>>>>>>>>>> the WM was emitted or not.
> >>>>>>>>>>>
> >>>>>>>>>>> What do you think?
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Kostas
> >>>>>>>>>>>
> >>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
> [hidden email]
> >>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> >>>>> unification.
> >>>>>>>> We
> >>>>>>>>>>>> have considered this FLIP already in the interfaces for
> FLIP-95
> >>>> [1]
> >>>>>>>> and
> >>>>>>>>>>>> look forward to update to the new unified watermark generators
> >>>> once
> >>>>>>>>>>>> FLIP-126 has been accepted.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>>>>>>>>>>> Hi Everyone!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
> >>>>>>>> separate)
> >>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in
> >> an
> >>>>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC
> >>>> for
> >>>>>>>> the
> >>>>>>>>>>>>> changes proposed in this FLIP: [2].
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Currently, we have two different flavours of Watermark
> >>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>>>>>>>>>>> from TimestampAssigner. This means that sources that want to
> >>>>>>>>>>>>> support
> >>>>>>>>>>>>> watermark assignment/extraction in the source need to support
> >>>> two
> >>>>>>>>>>>>> separate interfaces, we have two operator implementations for
> >>>> the
> >>>>>>>>>>>>> different flavours. Also, this makes features such as generic
> >>>>>>>> support
> >>>>>>>>>>>>> for idleness detection more complicated to implemented
> because
> >>>> we
> >>>>>>>>> again
> >>>>>>>>>>>>> have to support two types of watermark assigners.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In this FLIP we propose two things:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
> >>>>> WatermarkGenerator
> >>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
> >>>>>>>>>>>>> The motivation for the first is to simplify future
> >>>> implementations
> >>>>>>>> and
> >>>>>>>>>>>>> code duplication. The motivation for the second point is
> again
> >>>>> code
> >>>>>>>>>>>>> deduplication, most assigners currently have to extend from
> >> some
> >>>>>>>> base
> >>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or
> users
> >>>>>>>>>>>>> have
> >>>>>>>>> to
> >>>>>>>>>>>>> override an abstract method of the watermark assigner to
> >> provide
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> timestamp extraction logic.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
> >>>>>>>> WatermarkGenerator
> >>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
> >>>>>>>> stream/partition
> >>>>>>>>>>> as
> >>>>>>>>>>>>> idle if no data arrives after a configured timeout.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The "unify and separate" part refers to the fact that we want
> >> to
> >>>>>>>> unify
> >>>>>>>>>>>>> punctuated and periodic assigners but at the same time split
> >> the
> >>>>>>>>>>>>> timestamp assigner from the watermark generator.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
> >>>>>>>>>>>>> your feedback.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [2]
> https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>>>>>>>
> >>>>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

dwysakowicz
I have similar thoughts to @Stephan

Ad. 1 I tried something like this on your branch:

    /**
     * Adds the given {@link TimestampAssigner} to this {@link
WatermarkStrategies}. For top-level classes that implement both
Serializable and TimestampAssigner
     */
    public <TA extends TimestampAssigner<T> & Serializable>
WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        this.timestampAssigner = timestampAssigner;
        return this;
    }   

   @FunctionalInterface
    public interface SerializableTimestampAssigner<T> extends
TimestampAssigner<T>, Serializable {
    }   

     /**
      * Adds the given {@link TimestampAssigner} to this {@link
WatermarkStrategies}.
     * Helper method for serializable lambdas.
     */
    public WatermarkStrategies<T>
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
        checkNotNull(timestampAssigner, "timestampAssigner");
        this.timestampAssigner = timestampAssigner;
        return this;
    }

But I understand if that's too hacky. It's just a pity that we must
enforce limitations on an interface that are not strictly necessary.

Ad 2/3

I am aware the watermark assigner/timestamp extractor can be applied
further down the graph. Originally I also wanted to suggest
sourceTimestamp and SourceTimestampAssigner, but then I realized it can
be used also after the sources as you correctly pointed out. Even if the
TimestampAssigner is used after the source there might be some
native/record timestamp in the StreamRecord, that could've been
extracted by previous assigner.

Best,

Dawid

On 12/05/2020 16:47, Stephan Ewen wrote:

> @Aljoscha
>
> About (1) could we have an interface SerializableTimestampAssigner that
> simply mixes in the java.io.Serializable interface? Or will this be too
> clumsy?
>
> About (3) RecordTimeStamp seems to fit both cases (in-source-record
> timestamp, in stream-record timestamp).
>
> On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Definitely +1 to point 2) raised by Dawid. I'm not sure on points 1) and
>> 3).
>>
>> 1) I can see the benefit of that but in reality most timestamp assigners
>> will probably need to be Serializable. If you look at my (updated) POC
>> branch [1] you can see how a TimestampAssigner would be specified on the
>> WatermarkStrategies helper class: [2]. The signature of this would have
>> to be changed to something like:
>>
>> public <TA extends TimestampAssigner<T> & Serializable>
>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)
>>
>> Then, however, it would not be possible for users to specify a lambda or
>> anonymous inner function for the TimestampAssigner like this:
>>
>> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
>>                 .forGenerator(new PeriodicTestWatermarkGenerator())
>>                 .withTimestampAssigner((event, timestamp) -> event)
>>                 .build();
>>
>> 3) This makes sense if we only allow WatermarkStrategies on sources,
>> where the previous timestamp really is the "native" timestamp.
>> Currently, we also allow setting watermark strategies at arbitrary
>> points in the graph. I'm thinking we probably should only allow that in
>> sources but it's not the reality currently. I'm not against renaming it,
>> just voicing those thoughts.
>>
>> Best,
>> Aljoscha
>>
>>
>> [1] https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
>> [2]
>>
>> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81
>>
>> On 12.05.20 15:48, Stephan Ewen wrote:
>>> +1 to all of Dawid's suggestions, makes a lot of sense to me
>>>
>>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <[hidden email]
>>>
>>> wrote:
>>>
>>>> Hi Aljoscha,
>>>>
>>>> Sorry for adding comments during the vote, but I have some really minor
>>>> suggestions that should not influence the voting thread imo.
>>>>
>>>> 1) Does it make sense to have the TimestampAssigner extend from Flink's
>>>> Function? This implies it has to be serializable which with the factory
>>>> pattern is not strictly necessary, right? BTW I really like that you
>>>> suggested the FunctionInterface annotation there.
>>>>
>>>> 2) Could we rename the IdentityTimestampAssigner to e.g.
>>>>
>> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
>>>> Personally I found the IdentityTimestampAssigner a bit misleading as it
>>>> usually mean a no-op. Which did not click for me, as I assumed it
>>>> somehow returns the incoming record itself.
>>>>
>>>> 3) Could we rename the second parameter of TimestampAssigner#extract to
>>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
>>>> above. This parameter was also a bit confusing for me as I thought at
>>>> times its somehow related to
>>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the
>>>> whole system currentTimestamp.
>>>>
>>>> Other than those three points I like the proposal and I was about to
>>>> vote +1 if it was not for those three points.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 11/05/2020 16:57, Jark Wu wrote:
>>>>> Thanks for the explanation. I like the fatory pattern to make the
>> member
>>>>> variables immutable and final.
>>>>>
>>>>> So +1 to the proposal.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]> wrote:
>>>>>
>>>>>> I am fine with that.
>>>>>>
>>>>>> Much of the principles seem agreed upon. I understand the need to
>>>> support
>>>>>> code-generated extractors and we should support most of it already (as
>>>>>> Aljoscha mentioned via the factories) can extend this if needed.
>>>>>>
>>>>>> I think that the factory approach supports code-generated extractors
>> in
>>>> a
>>>>>> cleaner way even than an extractor with an open/init method.
>>>>>>
>>>>>>
>>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <[hidden email]
>>>>>> wrote:
>>>>>>
>>>>>>> We're slightly running out of time. I would propose we vote on the
>>>> basic
>>>>>>> principle and remain open to later additions. This feature is quite
>>>>>>> important to make the new Kafka Source that is developed as part of
>>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces
>> in
>>>>>>> the newly added connector.
>>>>>>>
>>>>>>> I know that's a bit unorthodox but would everyone be OK with what's
>>>>>>> currently there and then we iterate?
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
>>>>>>>> Ah, I meant to write this in my previous email, sorry about that.
>>>>>>>>
>>>>>>>> The WatermarkStrategy, which is basically a factory for a
>>>>>>>> WatermarkGenerator is the replacement for the open() method. This is
>>>>>> the
>>>>>>>> same strategy that was followed for StreamOperatorFactory, which was
>>>>>>>> introduced to allow code generation in the Table API [1]. If we need
>>>>>>>> metrics or other things we would add that as a parameter to the
>>>> factory
>>>>>>>> method. What do you think?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
>>>>>>>>
>>>>>>>> On 10.05.20 05:07, Jark Wu wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for
>>>>>>>>> Table&SQL to
>>>>>>>>> compile the generated code.
>>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is defined
>>>>>>> using
>>>>>>>>> SQL expressions, we will
>>>>>>>>> translate and generate Java code for the expressions. If we have
>>>>>>>>> `open()/close()`, we don't need lazy initialization.
>>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the
>>>>>> current
>>>>>>>>> watermark, the dirty timestamps (null value), etc.
>>>>>>>>> So I think a simple `open()/close()` with a context which can get
>>>>>>>>> MetricGroup is nice and not complex for the first version.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]>
>> wrote:
>>>>>>>>>> Thanks, Aljoscha, for picking this up.
>>>>>>>>>>
>>>>>>>>>> I agree with the approach of doing the here proposed set of
>> changes
>>>>>> for
>>>>>>>>>> now. It already makes things simpler and adds idleness support
>>>>>>>>>> everywhere.
>>>>>>>>>>
>>>>>>>>>> Rich functions and state always add complexity, let's do this in a
>>>>>> next
>>>>>>>>>> step, if we have a really compelling case.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
>>>>>> [hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
>>>> proposal
>>>>>>> is
>>>>>>>>>>> basically to turn emitting into a "flatMap", we give the
>>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
>>>>>> can
>>>>>>>>>>> decide whether to output a watermark or not and can also mark the
>>>>>>>>>>> output
>>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the
>>>>>> previous
>>>>>>>>>>> watermark assigner interface did) would not allow that
>> flexibility.
>>>>>>>>>>> Regarding checkpointing the watermark and keeping track of the
>>>>>> minimum
>>>>>>>>>>> watermark, this would be the responsibility of the framework (or
>>>> the
>>>>>>>>>>> KafkaConsumer in the current implementation). The user-supplied
>> WG
>>>>>>> does
>>>>>>>>>>> not need to make sure the watermark doesn't regress.
>>>>>>>>>>>
>>>>>>>>>>> Regarding making the WG a "rich function", I can see the
>> potential
>>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how should
>>>>>> the
>>>>>>>>>>> watermark state be handled in the case of scale-in? It could be
>>>> made
>>>>>>> to
>>>>>>>>>>> work in the Kafka case by attaching the state to the partition
>>>> state
>>>>>>>>>>> that we keep, but then we have potential backwards compatibility
>>>>>>>>>>> problems also for the WM state. Does the WG usually need to keep
>>>> the
>>>>>>>>>>> state or might it be enough if the state is transient, i.e. if
>> you
>>>>>>> have
>>>>>>>>>>> a restart the WG would loose its histogram but it would rebuild
>> it
>>>>>>>>>>> quickly and you would get back to the same steady state as
>> before.
>>>>>>>>>>> Best,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>
>>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
>>>>>>>>>>>> Overall I like this proposal; thanks for bringing it forward,
>>>>>>>>>>>> Aljoscha.
>>>>>>>>>>>>
>>>>>>>>>>>> I also like the idea of making the Watermark generator a rich
>>>>>>> function
>>>>>>>>>> --
>>>>>>>>>>>> this should make it more straightforward to implement smarter
>>>>>>>>>>>> watermark
>>>>>>>>>>>> generators. Eg, one that uses state to keep statistics about the
>>>>>>>>>>>> actual
>>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a
>>>> variable
>>>>>>>>>>> delay.
>>>>>>>>>>>> David
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
>>>>>> [hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for opening the discussion!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have two comments on the FLIP:
>>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e.
>> open()/
>>>>>>>>>>>>> close(), probably with a Context as argument: I have not fully
>>>>>>>>>>>>> thought
>>>>>>>>>>>>> this through but I think that this is more aligned with the
>> rest
>>>>>> of
>>>>>>>>>>>>> our rich functions. In addition, it will allow, for example, to
>>>>>>>>>>>>> initialize the Watermark value, if we decide to checkpoint the
>>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to
>> do
>>>>>>>>>>>>> anything in the open()).
>>>>>>>>>>>>> 2) aligned with the above, and with the case where we want to
>>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about how we
>>>>>> could
>>>>>>>>>>>>> implement this in the future. In the FLIP, it is proposed to
>>>>>> expose
>>>>>>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
>>>>>> Given
>>>>>>>>>>>>> that there is the implicit contract that watermarks are
>>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have
>> (I
>>>>>>>>>>>>> assume) a check that will compare the last emitted WM against
>> the
>>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we
>> risk
>>>>>>>>>>>>> having the user shooting himself on the foot if he/she
>>>>>> accidentally
>>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and its
>>>>>>>>>>>>> caller do
>>>>>>>>>>>>> not know if the watermark was finally emitted or not (the
>>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
>>>>>> responsible
>>>>>>>>>>>>> for checkpointing the WM?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Given this, why not having the methods as:
>>>>>>>>>>>>>
>>>>>>>>>>>>> public interface WatermarkGenerator<T> {
>>>>>>>>>>>>>
>>>>>>>>>>>>>        Watermark onEvent(T event, long eventTimestamp,
>>>>>>> WatermarkOutput
>>>>>>>>>>>>> output);
>>>>>>>>>>>>>
>>>>>>>>>>>>>        Watermark onPeriodicEmit(WatermarkOutput output);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> and the caller will be the one enforcing any invariants, such
>> as
>>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can
>> checkpoint
>>>>>>>>>>>>> anything that is needed as it will have complete knowledge as
>> to
>>>>>> if
>>>>>>>>>>>>> the WM was emitted or not.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
>> [hidden email]
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
>>>>>>> unification.
>>>>>>>>>> We
>>>>>>>>>>>>>> have considered this FLIP already in the interfaces for
>> FLIP-95
>>>>>> [1]
>>>>>>>>>> and
>>>>>>>>>>>>>> look forward to update to the new unified watermark generators
>>>>>> once
>>>>>>>>>>>>>> FLIP-126 has been accepted.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>>>>>>>>>> Hi Everyone!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
>>>>>>>>>> separate)
>>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in
>>>> an
>>>>>>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC
>>>>>> for
>>>>>>>>>> the
>>>>>>>>>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that want to
>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>> watermark assignment/extraction in the source need to support
>>>>>> two
>>>>>>>>>>>>>>> separate interfaces, we have two operator implementations for
>>>>>> the
>>>>>>>>>>>>>>> different flavours. Also, this makes features such as generic
>>>>>>>>>> support
>>>>>>>>>>>>>>> for idleness detection more complicated to implemented
>> because
>>>>>> we
>>>>>>>>>>> again
>>>>>>>>>>>>>>> have to support two types of watermark assigners.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In this FLIP we propose two things:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>>>>>>>>>> The motivation for the first is to simplify future
>>>>>> implementations
>>>>>>>>>> and
>>>>>>>>>>>>>>> code duplication. The motivation for the second point is
>> again
>>>>>>> code
>>>>>>>>>>>>>>> deduplication, most assigners currently have to extend from
>>>> some
>>>>>>>>>> base
>>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or
>> users
>>>>>>>>>>>>>>> have
>>>>>>>>>>> to
>>>>>>>>>>>>>>> override an abstract method of the watermark assigner to
>>>> provide
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> timestamp extraction logic.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
>>>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
>>>>>>>>>> stream/partition
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that we want
>>>> to
>>>>>>>>>> unify
>>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time split
>>>> the
>>>>>>>>>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
>>>>>>>>>>>>>>> your feedback.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>>>>>>>>>>>> [2]
>> https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>
>>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Aljoscha Krettek-2
Yes, I am also ok with a SerializableTimestampAssigner. This only looks
a bit clumsy in the API but as a user (that uses lambdas) you should not
see this. I pushed changes for this to my branch:
https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased

And yes, recordTimestamp sounds good for the TimestampAssigner. I admit
I didn't read this well enough and only saw nativeTimestamp.

Best,
Aljoscha

On 12.05.20 17:16, Dawid Wysakowicz wrote:

> I have similar thoughts to @Stephan
>
> Ad. 1 I tried something like this on your branch:
>
>      /**
>       * Adds the given {@link TimestampAssigner} to this {@link
> WatermarkStrategies}. For top-level classes that implement both
> Serializable and TimestampAssigner
>       */
>      public <TA extends TimestampAssigner<T> & Serializable>
> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) {
>          checkNotNull(timestampAssigner, "timestampAssigner");
>          this.timestampAssigner = timestampAssigner;
>          return this;
>      }
>
>     @FunctionalInterface
>      public interface SerializableTimestampAssigner<T> extends
> TimestampAssigner<T>, Serializable {
>      }
>
>       /**
>        * Adds the given {@link TimestampAssigner} to this {@link
> WatermarkStrategies}.
>       * Helper method for serializable lambdas.
>       */
>      public WatermarkStrategies<T>
> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
>          checkNotNull(timestampAssigner, "timestampAssigner");
>          this.timestampAssigner = timestampAssigner;
>          return this;
>      }
>
> But I understand if that's too hacky. It's just a pity that we must
> enforce limitations on an interface that are not strictly necessary.
>
> Ad 2/3
>
> I am aware the watermark assigner/timestamp extractor can be applied
> further down the graph. Originally I also wanted to suggest
> sourceTimestamp and SourceTimestampAssigner, but then I realized it can
> be used also after the sources as you correctly pointed out. Even if the
> TimestampAssigner is used after the source there might be some
> native/record timestamp in the StreamRecord, that could've been
> extracted by previous assigner.
>
> Best,
>
> Dawid
>
> On 12/05/2020 16:47, Stephan Ewen wrote:
>> @Aljoscha
>>
>> About (1) could we have an interface SerializableTimestampAssigner that
>> simply mixes in the java.io.Serializable interface? Or will this be too
>> clumsy?
>>
>> About (3) RecordTimeStamp seems to fit both cases (in-source-record
>> timestamp, in stream-record timestamp).
>>
>> On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> Definitely +1 to point 2) raised by Dawid. I'm not sure on points 1) and
>>> 3).
>>>
>>> 1) I can see the benefit of that but in reality most timestamp assigners
>>> will probably need to be Serializable. If you look at my (updated) POC
>>> branch [1] you can see how a TimestampAssigner would be specified on the
>>> WatermarkStrategies helper class: [2]. The signature of this would have
>>> to be changed to something like:
>>>
>>> public <TA extends TimestampAssigner<T> & Serializable>
>>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)
>>>
>>> Then, however, it would not be possible for users to specify a lambda or
>>> anonymous inner function for the TimestampAssigner like this:
>>>
>>> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
>>>                  .forGenerator(new PeriodicTestWatermarkGenerator())
>>>                  .withTimestampAssigner((event, timestamp) -> event)
>>>                  .build();
>>>
>>> 3) This makes sense if we only allow WatermarkStrategies on sources,
>>> where the previous timestamp really is the "native" timestamp.
>>> Currently, we also allow setting watermark strategies at arbitrary
>>> points in the graph. I'm thinking we probably should only allow that in
>>> sources but it's not the reality currently. I'm not against renaming it,
>>> just voicing those thoughts.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> [1] https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
>>> [2]
>>>
>>> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81
>>>
>>> On 12.05.20 15:48, Stephan Ewen wrote:
>>>> +1 to all of Dawid's suggestions, makes a lot of sense to me
>>>>
>>>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <[hidden email]
>>>>
>>>> wrote:
>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> Sorry for adding comments during the vote, but I have some really minor
>>>>> suggestions that should not influence the voting thread imo.
>>>>>
>>>>> 1) Does it make sense to have the TimestampAssigner extend from Flink's
>>>>> Function? This implies it has to be serializable which with the factory
>>>>> pattern is not strictly necessary, right? BTW I really like that you
>>>>> suggested the FunctionInterface annotation there.
>>>>>
>>>>> 2) Could we rename the IdentityTimestampAssigner to e.g.
>>>>>
>>> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
>>>>> Personally I found the IdentityTimestampAssigner a bit misleading as it
>>>>> usually mean a no-op. Which did not click for me, as I assumed it
>>>>> somehow returns the incoming record itself.
>>>>>
>>>>> 3) Could we rename the second parameter of TimestampAssigner#extract to
>>>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
>>>>> above. This parameter was also a bit confusing for me as I thought at
>>>>> times its somehow related to
>>>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the
>>>>> whole system currentTimestamp.
>>>>>
>>>>> Other than those three points I like the proposal and I was about to
>>>>> vote +1 if it was not for those three points.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 11/05/2020 16:57, Jark Wu wrote:
>>>>>> Thanks for the explanation. I like the fatory pattern to make the
>>> member
>>>>>> variables immutable and final.
>>>>>>
>>>>>> So +1 to the proposal.
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]> wrote:
>>>>>>
>>>>>>> I am fine with that.
>>>>>>>
>>>>>>> Much of the principles seem agreed upon. I understand the need to
>>>>> support
>>>>>>> code-generated extractors and we should support most of it already (as
>>>>>>> Aljoscha mentioned via the factories) can extend this if needed.
>>>>>>>
>>>>>>> I think that the factory approach supports code-generated extractors
>>> in
>>>>> a
>>>>>>> cleaner way even than an extractor with an open/init method.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <[hidden email]
>>>>>>> wrote:
>>>>>>>
>>>>>>>> We're slightly running out of time. I would propose we vote on the
>>>>> basic
>>>>>>>> principle and remain open to later additions. This feature is quite
>>>>>>>> important to make the new Kafka Source that is developed as part of
>>>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces
>>> in
>>>>>>>> the newly added connector.
>>>>>>>>
>>>>>>>> I know that's a bit unorthodox but would everyone be OK with what's
>>>>>>>> currently there and then we iterate?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
>>>>>>>>> Ah, I meant to write this in my previous email, sorry about that.
>>>>>>>>>
>>>>>>>>> The WatermarkStrategy, which is basically a factory for a
>>>>>>>>> WatermarkGenerator is the replacement for the open() method. This is
>>>>>>> the
>>>>>>>>> same strategy that was followed for StreamOperatorFactory, which was
>>>>>>>>> introduced to allow code generation in the Table API [1]. If we need
>>>>>>>>> metrics or other things we would add that as a parameter to the
>>>>> factory
>>>>>>>>> method. What do you think?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
>>>>>>>>>
>>>>>>>>> On 10.05.20 05:07, Jark Wu wrote:
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for
>>>>>>>>>> Table&SQL to
>>>>>>>>>> compile the generated code.
>>>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is defined
>>>>>>>> using
>>>>>>>>>> SQL expressions, we will
>>>>>>>>>> translate and generate Java code for the expressions. If we have
>>>>>>>>>> `open()/close()`, we don't need lazy initialization.
>>>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the
>>>>>>> current
>>>>>>>>>> watermark, the dirty timestamps (null value), etc.
>>>>>>>>>> So I think a simple `open()/close()` with a context which can get
>>>>>>>>>> MetricGroup is nice and not complex for the first version.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]>
>>> wrote:
>>>>>>>>>>> Thanks, Aljoscha, for picking this up.
>>>>>>>>>>>
>>>>>>>>>>> I agree with the approach of doing the here proposed set of
>>> changes
>>>>>>> for
>>>>>>>>>>> now. It already makes things simpler and adds idleness support
>>>>>>>>>>> everywhere.
>>>>>>>>>>>
>>>>>>>>>>> Rich functions and state always add complexity, let's do this in a
>>>>>>> next
>>>>>>>>>>> step, if we have a really compelling case.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
>>>>>>> [hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
>>>>> proposal
>>>>>>>> is
>>>>>>>>>>>> basically to turn emitting into a "flatMap", we give the
>>>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
>>>>>>> can
>>>>>>>>>>>> decide whether to output a watermark or not and can also mark the
>>>>>>>>>>>> output
>>>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the
>>>>>>> previous
>>>>>>>>>>>> watermark assigner interface did) would not allow that
>>> flexibility.
>>>>>>>>>>>> Regarding checkpointing the watermark and keeping track of the
>>>>>>> minimum
>>>>>>>>>>>> watermark, this would be the responsibility of the framework (or
>>>>> the
>>>>>>>>>>>> KafkaConsumer in the current implementation). The user-supplied
>>> WG
>>>>>>>> does
>>>>>>>>>>>> not need to make sure the watermark doesn't regress.
>>>>>>>>>>>>
>>>>>>>>>>>> Regarding making the WG a "rich function", I can see the
>>> potential
>>>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how should
>>>>>>> the
>>>>>>>>>>>> watermark state be handled in the case of scale-in? It could be
>>>>> made
>>>>>>>> to
>>>>>>>>>>>> work in the Kafka case by attaching the state to the partition
>>>>> state
>>>>>>>>>>>> that we keep, but then we have potential backwards compatibility
>>>>>>>>>>>> problems also for the WM state. Does the WG usually need to keep
>>>>> the
>>>>>>>>>>>> state or might it be enough if the state is transient, i.e. if
>>> you
>>>>>>>> have
>>>>>>>>>>>> a restart the WG would loose its histogram but it would rebuild
>>> it
>>>>>>>>>>>> quickly and you would get back to the same steady state as
>>> before.
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>
>>>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
>>>>>>>>>>>>> Overall I like this proposal; thanks for bringing it forward,
>>>>>>>>>>>>> Aljoscha.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I also like the idea of making the Watermark generator a rich
>>>>>>>> function
>>>>>>>>>>> --
>>>>>>>>>>>>> this should make it more straightforward to implement smarter
>>>>>>>>>>>>> watermark
>>>>>>>>>>>>> generators. Eg, one that uses state to keep statistics about the
>>>>>>>>>>>>> actual
>>>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a
>>>>> variable
>>>>>>>>>>>> delay.
>>>>>>>>>>>>> David
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
>>>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for opening the discussion!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have two comments on the FLIP:
>>>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e.
>>> open()/
>>>>>>>>>>>>>> close(), probably with a Context as argument: I have not fully
>>>>>>>>>>>>>> thought
>>>>>>>>>>>>>> this through but I think that this is more aligned with the
>>> rest
>>>>>>> of
>>>>>>>>>>>>>> our rich functions. In addition, it will allow, for example, to
>>>>>>>>>>>>>> initialize the Watermark value, if we decide to checkpoint the
>>>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to
>>> do
>>>>>>>>>>>>>> anything in the open()).
>>>>>>>>>>>>>> 2) aligned with the above, and with the case where we want to
>>>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about how we
>>>>>>> could
>>>>>>>>>>>>>> implement this in the future. In the FLIP, it is proposed to
>>>>>>> expose
>>>>>>>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
>>>>>>> Given
>>>>>>>>>>>>>> that there is the implicit contract that watermarks are
>>>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have
>>> (I
>>>>>>>>>>>>>> assume) a check that will compare the last emitted WM against
>>> the
>>>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we
>>> risk
>>>>>>>>>>>>>> having the user shooting himself on the foot if he/she
>>>>>>> accidentally
>>>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and its
>>>>>>>>>>>>>> caller do
>>>>>>>>>>>>>> not know if the watermark was finally emitted or not (the
>>>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
>>>>>>> responsible
>>>>>>>>>>>>>> for checkpointing the WM?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Given this, why not having the methods as:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public interface WatermarkGenerator<T> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         Watermark onEvent(T event, long eventTimestamp,
>>>>>>>> WatermarkOutput
>>>>>>>>>>>>>> output);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         Watermark onPeriodicEmit(WatermarkOutput output);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and the caller will be the one enforcing any invariants, such
>>> as
>>>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can
>>> checkpoint
>>>>>>>>>>>>>> anything that is needed as it will have complete knowledge as
>>> to
>>>>>>> if
>>>>>>>>>>>>>> the WM was emitted or not.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
>>> [hidden email]
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
>>>>>>>> unification.
>>>>>>>>>>> We
>>>>>>>>>>>>>>> have considered this FLIP already in the interfaces for
>>> FLIP-95
>>>>>>> [1]
>>>>>>>>>>> and
>>>>>>>>>>>>>>> look forward to update to the new unified watermark generators
>>>>>>> once
>>>>>>>>>>>>>>> FLIP-126 has been accepted.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>>>>>>>>>>> Hi Everyone!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
>>>>>>>>>>> separate)
>>>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in
>>>>> an
>>>>>>>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC
>>>>>>> for
>>>>>>>>>>> the
>>>>>>>>>>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that want to
>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>> watermark assignment/extraction in the source need to support
>>>>>>> two
>>>>>>>>>>>>>>>> separate interfaces, we have two operator implementations for
>>>>>>> the
>>>>>>>>>>>>>>>> different flavours. Also, this makes features such as generic
>>>>>>>>>>> support
>>>>>>>>>>>>>>>> for idleness detection more complicated to implemented
>>> because
>>>>>>> we
>>>>>>>>>>>> again
>>>>>>>>>>>>>>>> have to support two types of watermark assigners.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In this FLIP we propose two things:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>>>>>>>>>>> The motivation for the first is to simplify future
>>>>>>> implementations
>>>>>>>>>>> and
>>>>>>>>>>>>>>>> code duplication. The motivation for the second point is
>>> again
>>>>>>>> code
>>>>>>>>>>>>>>>> deduplication, most assigners currently have to extend from
>>>>> some
>>>>>>>>>>> base
>>>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or
>>> users
>>>>>>>>>>>>>>>> have
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> override an abstract method of the watermark assigner to
>>>>> provide
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> timestamp extraction logic.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
>>>>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
>>>>>>>>>>> stream/partition
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that we want
>>>>> to
>>>>>>>>>>> unify
>>>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time split
>>>>> the
>>>>>>>>>>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
>>>>>>>>>>>>>>>> your feedback.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>>>>>>>>>>>>> [2]
>>> https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

dwysakowicz
Thank you for the update and sorry again for chiming in so late...

Best,

Dawid


On 12/05/2020 18:21, Aljoscha Krettek wrote:

> Yes, I am also ok with a SerializableTimestampAssigner. This only
> looks a bit clumsy in the API but as a user (that uses lambdas) you
> should not see this. I pushed changes for this to my branch:
> https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
>
> And yes, recordTimestamp sounds good for the TimestampAssigner. I
> admit I didn't read this well enough and only saw nativeTimestamp.
>
> Best,
> Aljoscha
>
> On 12.05.20 17:16, Dawid Wysakowicz wrote:
>> I have similar thoughts to @Stephan
>>
>> Ad. 1 I tried something like this on your branch:
>>
>>      /**
>>       * Adds the given {@link TimestampAssigner} to this {@link
>> WatermarkStrategies}. For top-level classes that implement both
>> Serializable and TimestampAssigner
>>       */
>>      public <TA extends TimestampAssigner<T> & Serializable>
>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) {
>>          checkNotNull(timestampAssigner, "timestampAssigner");
>>          this.timestampAssigner = timestampAssigner;
>>          return this;
>>      }
>>
>>     @FunctionalInterface
>>      public interface SerializableTimestampAssigner<T> extends
>> TimestampAssigner<T>, Serializable {
>>      }
>>
>>       /**
>>        * Adds the given {@link TimestampAssigner} to this {@link
>> WatermarkStrategies}.
>>       * Helper method for serializable lambdas.
>>       */
>>      public WatermarkStrategies<T>
>> withTimestampAssigner(SerializableTimestampAssigner<T>
>> timestampAssigner) {
>>          checkNotNull(timestampAssigner, "timestampAssigner");
>>          this.timestampAssigner = timestampAssigner;
>>          return this;
>>      }
>>
>> But I understand if that's too hacky. It's just a pity that we must
>> enforce limitations on an interface that are not strictly necessary.
>>
>> Ad 2/3
>>
>> I am aware the watermark assigner/timestamp extractor can be applied
>> further down the graph. Originally I also wanted to suggest
>> sourceTimestamp and SourceTimestampAssigner, but then I realized it can
>> be used also after the sources as you correctly pointed out. Even if the
>> TimestampAssigner is used after the source there might be some
>> native/record timestamp in the StreamRecord, that could've been
>> extracted by previous assigner.
>>
>> Best,
>>
>> Dawid
>>
>> On 12/05/2020 16:47, Stephan Ewen wrote:
>>> @Aljoscha
>>>
>>> About (1) could we have an interface SerializableTimestampAssigner that
>>> simply mixes in the java.io.Serializable interface? Or will this be too
>>> clumsy?
>>>
>>> About (3) RecordTimeStamp seems to fit both cases (in-source-record
>>> timestamp, in stream-record timestamp).
>>>
>>> On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <[hidden email]>
>>> wrote:
>>>
>>>> Definitely +1 to point 2) raised by Dawid. I'm not sure on points
>>>> 1) and
>>>> 3).
>>>>
>>>> 1) I can see the benefit of that but in reality most timestamp
>>>> assigners
>>>> will probably need to be Serializable. If you look at my (updated) POC
>>>> branch [1] you can see how a TimestampAssigner would be specified
>>>> on the
>>>> WatermarkStrategies helper class: [2]. The signature of this would
>>>> have
>>>> to be changed to something like:
>>>>
>>>> public <TA extends TimestampAssigner<T> & Serializable>
>>>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)
>>>>
>>>> Then, however, it would not be possible for users to specify a
>>>> lambda or
>>>> anonymous inner function for the TimestampAssigner like this:
>>>>
>>>> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
>>>>                  .forGenerator(new PeriodicTestWatermarkGenerator())
>>>>                  .withTimestampAssigner((event, timestamp) -> event)
>>>>                  .build();
>>>>
>>>> 3) This makes sense if we only allow WatermarkStrategies on sources,
>>>> where the previous timestamp really is the "native" timestamp.
>>>> Currently, we also allow setting watermark strategies at arbitrary
>>>> points in the graph. I'm thinking we probably should only allow
>>>> that in
>>>> sources but it's not the reality currently. I'm not against
>>>> renaming it,
>>>> just voicing those thoughts.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> [1]
>>>> https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
>>>> [2]
>>>>
>>>> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81
>>>>
>>>>
>>>> On 12.05.20 15:48, Stephan Ewen wrote:
>>>>> +1 to all of Dawid's suggestions, makes a lot of sense to me
>>>>>
>>>>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz
>>>>> <[hidden email]
>>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> Sorry for adding comments during the vote, but I have some really
>>>>>> minor
>>>>>> suggestions that should not influence the voting thread imo.
>>>>>>
>>>>>> 1) Does it make sense to have the TimestampAssigner extend from
>>>>>> Flink's
>>>>>> Function? This implies it has to be serializable which with the
>>>>>> factory
>>>>>> pattern is not strictly necessary, right? BTW I really like that you
>>>>>> suggested the FunctionInterface annotation there.
>>>>>>
>>>>>> 2) Could we rename the IdentityTimestampAssigner to e.g.
>>>>>>
>>>> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
>>>>
>>>>>> Personally I found the IdentityTimestampAssigner a bit misleading
>>>>>> as it
>>>>>> usually mean a no-op. Which did not click for me, as I assumed it
>>>>>> somehow returns the incoming record itself.
>>>>>>
>>>>>> 3) Could we rename the second parameter of
>>>>>> TimestampAssigner#extract to
>>>>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
>>>>>> above. This parameter was also a bit confusing for me as I
>>>>>> thought at
>>>>>> times its somehow related to
>>>>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the
>>>>>> whole system currentTimestamp.
>>>>>>
>>>>>> Other than those three points I like the proposal and I was about to
>>>>>> vote +1 if it was not for those three points.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 11/05/2020 16:57, Jark Wu wrote:
>>>>>>> Thanks for the explanation. I like the fatory pattern to make the
>>>> member
>>>>>>> variables immutable and final.
>>>>>>>
>>>>>>> So +1 to the proposal.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am fine with that.
>>>>>>>>
>>>>>>>> Much of the principles seem agreed upon. I understand the need to
>>>>>> support
>>>>>>>> code-generated extractors and we should support most of it
>>>>>>>> already (as
>>>>>>>> Aljoscha mentioned via the factories) can extend this if needed.
>>>>>>>>
>>>>>>>> I think that the factory approach supports code-generated
>>>>>>>> extractors
>>>> in
>>>>>> a
>>>>>>>> cleaner way even than an extractor with an open/init method.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek
>>>>>>>> <[hidden email]
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> We're slightly running out of time. I would propose we vote on
>>>>>>>>> the
>>>>>> basic
>>>>>>>>> principle and remain open to later additions. This feature is
>>>>>>>>> quite
>>>>>>>>> important to make the new Kafka Source that is developed as
>>>>>>>>> part of
>>>>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy
>>>>>>>>> interfaces
>>>> in
>>>>>>>>> the newly added connector.
>>>>>>>>>
>>>>>>>>> I know that's a bit unorthodox but would everyone be OK with
>>>>>>>>> what's
>>>>>>>>> currently there and then we iterate?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
>>>>>>>>>> Ah, I meant to write this in my previous email, sorry about
>>>>>>>>>> that.
>>>>>>>>>>
>>>>>>>>>> The WatermarkStrategy, which is basically a factory for a
>>>>>>>>>> WatermarkGenerator is the replacement for the open() method.
>>>>>>>>>> This is
>>>>>>>> the
>>>>>>>>>> same strategy that was followed for StreamOperatorFactory,
>>>>>>>>>> which was
>>>>>>>>>> introduced to allow code generation in the Table API [1]. If
>>>>>>>>>> we need
>>>>>>>>>> metrics or other things we would add that as a parameter to the
>>>>>> factory
>>>>>>>>>> method. What do you think?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
>>>>>>>>>>
>>>>>>>>>> On 10.05.20 05:07, Jark Wu wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for
>>>>>>>>>>> Table&SQL to
>>>>>>>>>>> compile the generated code.
>>>>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is
>>>>>>>>>>> defined
>>>>>>>>> using
>>>>>>>>>>> SQL expressions, we will
>>>>>>>>>>> translate and generate Java code for the expressions. If we
>>>>>>>>>>> have
>>>>>>>>>>> `open()/close()`, we don't need lazy initialization.
>>>>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the
>>>>>>>> current
>>>>>>>>>>> watermark, the dirty timestamps (null value), etc.
>>>>>>>>>>> So I think a simple `open()/close()` with a context which
>>>>>>>>>>> can get
>>>>>>>>>>> MetricGroup is nice and not complex for the first version.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jark
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]>
>>>> wrote:
>>>>>>>>>>>> Thanks, Aljoscha, for picking this up.
>>>>>>>>>>>>
>>>>>>>>>>>> I agree with the approach of doing the here proposed set of
>>>> changes
>>>>>>>> for
>>>>>>>>>>>> now. It already makes things simpler and adds idleness support
>>>>>>>>>>>> everywhere.
>>>>>>>>>>>>
>>>>>>>>>>>> Rich functions and state always add complexity, let's do
>>>>>>>>>>>> this in a
>>>>>>>> next
>>>>>>>>>>>> step, if we have a really compelling case.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
>>>>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
>>>>>> proposal
>>>>>>>>> is
>>>>>>>>>>>>> basically to turn emitting into a "flatMap", we give the
>>>>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and
>>>>>>>>>>>>> the WG
>>>>>>>> can
>>>>>>>>>>>>> decide whether to output a watermark or not and can also
>>>>>>>>>>>>> mark the
>>>>>>>>>>>>> output
>>>>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the
>>>>>>>> previous
>>>>>>>>>>>>> watermark assigner interface did) would not allow that
>>>> flexibility.
>>>>>>>>>>>>> Regarding checkpointing the watermark and keeping track of
>>>>>>>>>>>>> the
>>>>>>>> minimum
>>>>>>>>>>>>> watermark, this would be the responsibility of the
>>>>>>>>>>>>> framework (or
>>>>>> the
>>>>>>>>>>>>> KafkaConsumer in the current implementation). The
>>>>>>>>>>>>> user-supplied
>>>> WG
>>>>>>>>> does
>>>>>>>>>>>>> not need to make sure the watermark doesn't regress.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding making the WG a "rich function", I can see the
>>>> potential
>>>>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how
>>>>>>>>>>>>> should
>>>>>>>> the
>>>>>>>>>>>>> watermark state be handled in the case of scale-in? It
>>>>>>>>>>>>> could be
>>>>>> made
>>>>>>>>> to
>>>>>>>>>>>>> work in the Kafka case by attaching the state to the
>>>>>>>>>>>>> partition
>>>>>> state
>>>>>>>>>>>>> that we keep, but then we have potential backwards
>>>>>>>>>>>>> compatibility
>>>>>>>>>>>>> problems also for the WM state. Does the WG usually need
>>>>>>>>>>>>> to keep
>>>>>> the
>>>>>>>>>>>>> state or might it be enough if the state is transient,
>>>>>>>>>>>>> i.e. if
>>>> you
>>>>>>>>> have
>>>>>>>>>>>>> a restart the WG would loose its histogram but it would
>>>>>>>>>>>>> rebuild
>>>> it
>>>>>>>>>>>>> quickly and you would get back to the same steady state as
>>>> before.
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
>>>>>>>>>>>>>> Overall I like this proposal; thanks for bringing it
>>>>>>>>>>>>>> forward,
>>>>>>>>>>>>>> Aljoscha.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I also like the idea of making the Watermark generator a
>>>>>>>>>>>>>> rich
>>>>>>>>> function
>>>>>>>>>>>> --
>>>>>>>>>>>>>> this should make it more straightforward to implement
>>>>>>>>>>>>>> smarter
>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>> generators. Eg, one that uses state to keep statistics
>>>>>>>>>>>>>> about the
>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a
>>>>>> variable
>>>>>>>>>>>>> delay.
>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
>>>>>>>> [hidden email]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for opening the discussion!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have two comments on the FLIP:
>>>>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e.
>>>> open()/
>>>>>>>>>>>>>>> close(), probably with a Context as argument: I have not
>>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>>> thought
>>>>>>>>>>>>>>> this through but I think that this is more aligned with the
>>>> rest
>>>>>>>> of
>>>>>>>>>>>>>>> our rich functions. In addition, it will allow, for
>>>>>>>>>>>>>>> example, to
>>>>>>>>>>>>>>> initialize the Watermark value, if we decide to
>>>>>>>>>>>>>>> checkpoint the
>>>>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL
>>>>>>>>>>>>>>> needs to
>>>> do
>>>>>>>>>>>>>>> anything in the open()).
>>>>>>>>>>>>>>> 2) aligned with the above, and with the case where we
>>>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about
>>>>>>>>>>>>>>> how we
>>>>>>>> could
>>>>>>>>>>>>>>> implement this in the future. In the FLIP, it is
>>>>>>>>>>>>>>> proposed to
>>>>>>>> expose
>>>>>>>>>>>>>>> the WatermarkOutput in the methods of the
>>>>>>>>>>>>>>> WatermarkGenerator.
>>>>>>>> Given
>>>>>>>>>>>>>>> that there is the implicit contract that watermarks are
>>>>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will
>>>>>>>>>>>>>>> have
>>>> (I
>>>>>>>>>>>>>>> assume) a check that will compare the last emitted WM
>>>>>>>>>>>>>>> against
>>>> the
>>>>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we
>>>> risk
>>>>>>>>>>>>>>> having the user shooting himself on the foot if he/she
>>>>>>>> accidentally
>>>>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and
>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>> caller do
>>>>>>>>>>>>>>> not know if the watermark was finally emitted or not (the
>>>>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
>>>>>>>> responsible
>>>>>>>>>>>>>>> for checkpointing the WM?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Given this, why not having the methods as:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public interface WatermarkGenerator<T> {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Watermark onEvent(T event, long eventTimestamp,
>>>>>>>>> WatermarkOutput
>>>>>>>>>>>>>>> output);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Watermark onPeriodicEmit(WatermarkOutput output);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and the caller will be the one enforcing any invariants,
>>>>>>>>>>>>>>> such
>>>> as
>>>>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can
>>>> checkpoint
>>>>>>>>>>>>>>> anything that is needed as it will have complete
>>>>>>>>>>>>>>> knowledge as
>>>> to
>>>>>>>> if
>>>>>>>>>>>>>>> the WM was emitted or not.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
>>>> [hidden email]
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
>>>>>>>>> unification.
>>>>>>>>>>>> We
>>>>>>>>>>>>>>>> have considered this FLIP already in the interfaces for
>>>> FLIP-95
>>>>>>>> [1]
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> look forward to update to the new unified watermark
>>>>>>>>>>>>>>>> generators
>>>>>>>> once
>>>>>>>>>>>>>>>> FLIP-126 has been accepted.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>>>>>>>>>>>> Hi Everyone!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126:
>>>>>>>>>>>>>>>>> Unify (and
>>>>>>>>>>>> separate)
>>>>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by
>>>>>>>>>>>>>>>>> Stephan in
>>>>>> an
>>>>>>>>>>>>>>>>> experimental branch. I expanded on that work to
>>>>>>>>>>>>>>>>> provide a PoC
>>>>>>>> for
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that
>>>>>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>> watermark assignment/extraction in the source need to
>>>>>>>>>>>>>>>>> support
>>>>>>>> two
>>>>>>>>>>>>>>>>> separate interfaces, we have two operator
>>>>>>>>>>>>>>>>> implementations for
>>>>>>>> the
>>>>>>>>>>>>>>>>> different flavours. Also, this makes features such as
>>>>>>>>>>>>>>>>> generic
>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>> for idleness detection more complicated to implemented
>>>> because
>>>>>>>> we
>>>>>>>>>>>>> again
>>>>>>>>>>>>>>>>> have to support two types of watermark assigners.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In this FLIP we propose two things:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
>>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>>>>>>>>>>>> The motivation for the first is to simplify future
>>>>>>>> implementations
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> code duplication. The motivation for the second point is
>>>> again
>>>>>>>>> code
>>>>>>>>>>>>>>>>> deduplication, most assigners currently have to extend
>>>>>>>>>>>>>>>>> from
>>>>>> some
>>>>>>>>>>>> base
>>>>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or
>>>> users
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> override an abstract method of the watermark assigner to
>>>>>> provide
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> timestamp extraction logic.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
>>>>>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
>>>>>>>>>>>> stream/partition
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that
>>>>>>>>>>>>>>>>> we want
>>>>>> to
>>>>>>>>>>>> unify
>>>>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time
>>>>>>>>>>>>>>>>> split
>>>>>> the
>>>>>>>>>>>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking
>>>>>>>>>>>>>>>>> forward to
>>>>>>>>>>>>>>>>> your feedback.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>
>>>>>>>>>>>>>>>>> [2]
>>>> https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>>>
>>>>
>>
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

张光辉
Hi, @Aljoscha,
the function param currentTimestamp comment does not match the
recordTimestamp  "long extractTimestamp(T element, long recordTimestamp)"
on wiki.

Best,
Zhangguanghui

Dawid Wysakowicz <[hidden email]> 于2020年5月13日周三 上午12:28写道:

> Thank you for the update and sorry again for chiming in so late...
>
> Best,
>
> Dawid
>
>
> On 12/05/2020 18:21, Aljoscha Krettek wrote:
> > Yes, I am also ok with a SerializableTimestampAssigner. This only
> > looks a bit clumsy in the API but as a user (that uses lambdas) you
> > should not see this. I pushed changes for this to my branch:
> > https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
> >
> > And yes, recordTimestamp sounds good for the TimestampAssigner. I
> > admit I didn't read this well enough and only saw nativeTimestamp.
> >
> > Best,
> > Aljoscha
> >
> > On 12.05.20 17:16, Dawid Wysakowicz wrote:
> >> I have similar thoughts to @Stephan
> >>
> >> Ad. 1 I tried something like this on your branch:
> >>
> >>      /**
> >>       * Adds the given {@link TimestampAssigner} to this {@link
> >> WatermarkStrategies}. For top-level classes that implement both
> >> Serializable and TimestampAssigner
> >>       */
> >>      public <TA extends TimestampAssigner<T> & Serializable>
> >> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) {
> >>          checkNotNull(timestampAssigner, "timestampAssigner");
> >>          this.timestampAssigner = timestampAssigner;
> >>          return this;
> >>      }
> >>
> >>     @FunctionalInterface
> >>      public interface SerializableTimestampAssigner<T> extends
> >> TimestampAssigner<T>, Serializable {
> >>      }
> >>
> >>       /**
> >>        * Adds the given {@link TimestampAssigner} to this {@link
> >> WatermarkStrategies}.
> >>       * Helper method for serializable lambdas.
> >>       */
> >>      public WatermarkStrategies<T>
> >> withTimestampAssigner(SerializableTimestampAssigner<T>
> >> timestampAssigner) {
> >>          checkNotNull(timestampAssigner, "timestampAssigner");
> >>          this.timestampAssigner = timestampAssigner;
> >>          return this;
> >>      }
> >>
> >> But I understand if that's too hacky. It's just a pity that we must
> >> enforce limitations on an interface that are not strictly necessary.
> >>
> >> Ad 2/3
> >>
> >> I am aware the watermark assigner/timestamp extractor can be applied
> >> further down the graph. Originally I also wanted to suggest
> >> sourceTimestamp and SourceTimestampAssigner, but then I realized it can
> >> be used also after the sources as you correctly pointed out. Even if the
> >> TimestampAssigner is used after the source there might be some
> >> native/record timestamp in the StreamRecord, that could've been
> >> extracted by previous assigner.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 12/05/2020 16:47, Stephan Ewen wrote:
> >>> @Aljoscha
> >>>
> >>> About (1) could we have an interface SerializableTimestampAssigner that
> >>> simply mixes in the java.io.Serializable interface? Or will this be too
> >>> clumsy?
> >>>
> >>> About (3) RecordTimeStamp seems to fit both cases (in-source-record
> >>> timestamp, in stream-record timestamp).
> >>>
> >>> On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <[hidden email]>
> >>> wrote:
> >>>
> >>>> Definitely +1 to point 2) raised by Dawid. I'm not sure on points
> >>>> 1) and
> >>>> 3).
> >>>>
> >>>> 1) I can see the benefit of that but in reality most timestamp
> >>>> assigners
> >>>> will probably need to be Serializable. If you look at my (updated) POC
> >>>> branch [1] you can see how a TimestampAssigner would be specified
> >>>> on the
> >>>> WatermarkStrategies helper class: [2]. The signature of this would
> >>>> have
> >>>> to be changed to something like:
> >>>>
> >>>> public <TA extends TimestampAssigner<T> & Serializable>
> >>>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)
> >>>>
> >>>> Then, however, it would not be possible for users to specify a
> >>>> lambda or
> >>>> anonymous inner function for the TimestampAssigner like this:
> >>>>
> >>>> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
> >>>>                  .forGenerator(new PeriodicTestWatermarkGenerator())
> >>>>                  .withTimestampAssigner((event, timestamp) -> event)
> >>>>                  .build();
> >>>>
> >>>> 3) This makes sense if we only allow WatermarkStrategies on sources,
> >>>> where the previous timestamp really is the "native" timestamp.
> >>>> Currently, we also allow setting watermark strategies at arbitrary
> >>>> points in the graph. I'm thinking we probably should only allow
> >>>> that in
> >>>> sources but it's not the reality currently. I'm not against
> >>>> renaming it,
> >>>> just voicing those thoughts.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>>
> >>>> [1]
> >>>>
> https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
> >>>> [2]
> >>>>
> >>>>
> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81
> >>>>
> >>>>
> >>>> On 12.05.20 15:48, Stephan Ewen wrote:
> >>>>> +1 to all of Dawid's suggestions, makes a lot of sense to me
> >>>>>
> >>>>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz
> >>>>> <[hidden email]
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Aljoscha,
> >>>>>>
> >>>>>> Sorry for adding comments during the vote, but I have some really
> >>>>>> minor
> >>>>>> suggestions that should not influence the voting thread imo.
> >>>>>>
> >>>>>> 1) Does it make sense to have the TimestampAssigner extend from
> >>>>>> Flink's
> >>>>>> Function? This implies it has to be serializable which with the
> >>>>>> factory
> >>>>>> pattern is not strictly necessary, right? BTW I really like that you
> >>>>>> suggested the FunctionInterface annotation there.
> >>>>>>
> >>>>>> 2) Could we rename the IdentityTimestampAssigner to e.g.
> >>>>>>
> >>>>
> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
> >>>>
> >>>>>> Personally I found the IdentityTimestampAssigner a bit misleading
> >>>>>> as it
> >>>>>> usually mean a no-op. Which did not click for me, as I assumed it
> >>>>>> somehow returns the incoming record itself.
> >>>>>>
> >>>>>> 3) Could we rename the second parameter of
> >>>>>> TimestampAssigner#extract to
> >>>>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
> >>>>>> above. This parameter was also a bit confusing for me as I
> >>>>>> thought at
> >>>>>> times its somehow related to
> >>>>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the
> >>>>>> whole system currentTimestamp.
> >>>>>>
> >>>>>> Other than those three points I like the proposal and I was about to
> >>>>>> vote +1 if it was not for those three points.
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Dawid
> >>>>>>
> >>>>>> On 11/05/2020 16:57, Jark Wu wrote:
> >>>>>>> Thanks for the explanation. I like the fatory pattern to make the
> >>>> member
> >>>>>>> variables immutable and final.
> >>>>>>>
> >>>>>>> So +1 to the proposal.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jark
> >>>>>>>
> >>>>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I am fine with that.
> >>>>>>>>
> >>>>>>>> Much of the principles seem agreed upon. I understand the need to
> >>>>>> support
> >>>>>>>> code-generated extractors and we should support most of it
> >>>>>>>> already (as
> >>>>>>>> Aljoscha mentioned via the factories) can extend this if needed.
> >>>>>>>>
> >>>>>>>> I think that the factory approach supports code-generated
> >>>>>>>> extractors
> >>>> in
> >>>>>> a
> >>>>>>>> cleaner way even than an extractor with an open/init method.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek
> >>>>>>>> <[hidden email]
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> We're slightly running out of time. I would propose we vote on
> >>>>>>>>> the
> >>>>>> basic
> >>>>>>>>> principle and remain open to later additions. This feature is
> >>>>>>>>> quite
> >>>>>>>>> important to make the new Kafka Source that is developed as
> >>>>>>>>> part of
> >>>>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy
> >>>>>>>>> interfaces
> >>>> in
> >>>>>>>>> the newly added connector.
> >>>>>>>>>
> >>>>>>>>> I know that's a bit unorthodox but would everyone be OK with
> >>>>>>>>> what's
> >>>>>>>>> currently there and then we iterate?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Aljoscha
> >>>>>>>>>
> >>>>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
> >>>>>>>>>> Ah, I meant to write this in my previous email, sorry about
> >>>>>>>>>> that.
> >>>>>>>>>>
> >>>>>>>>>> The WatermarkStrategy, which is basically a factory for a
> >>>>>>>>>> WatermarkGenerator is the replacement for the open() method.
> >>>>>>>>>> This is
> >>>>>>>> the
> >>>>>>>>>> same strategy that was followed for StreamOperatorFactory,
> >>>>>>>>>> which was
> >>>>>>>>>> introduced to allow code generation in the Table API [1]. If
> >>>>>>>>>> we need
> >>>>>>>>>> metrics or other things we would add that as a parameter to the
> >>>>>> factory
> >>>>>>>>>> method. What do you think?
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>
> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
> >>>>>>>>>>
> >>>>>>>>>> On 10.05.20 05:07, Jark Wu wrote:
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for
> >>>>>>>>>>> Table&SQL to
> >>>>>>>>>>> compile the generated code.
> >>>>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is
> >>>>>>>>>>> defined
> >>>>>>>>> using
> >>>>>>>>>>> SQL expressions, we will
> >>>>>>>>>>> translate and generate Java code for the expressions. If we
> >>>>>>>>>>> have
> >>>>>>>>>>> `open()/close()`, we don't need lazy initialization.
> >>>>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the
> >>>>>>>> current
> >>>>>>>>>>> watermark, the dirty timestamps (null value), etc.
> >>>>>>>>>>> So I think a simple `open()/close()` with a context which
> >>>>>>>>>>> can get
> >>>>>>>>>>> MetricGroup is nice and not complex for the first version.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Jark
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]>
> >>>> wrote:
> >>>>>>>>>>>> Thanks, Aljoscha, for picking this up.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I agree with the approach of doing the here proposed set of
> >>>> changes
> >>>>>>>> for
> >>>>>>>>>>>> now. It already makes things simpler and adds idleness support
> >>>>>>>>>>>> everywhere.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Rich functions and state always add complexity, let's do
> >>>>>>>>>>>> this in a
> >>>>>>>> next
> >>>>>>>>>>>> step, if we have a really compelling case.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> >>>>>>>> [hidden email]>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
> >>>>>> proposal
> >>>>>>>>> is
> >>>>>>>>>>>>> basically to turn emitting into a "flatMap", we give the
> >>>>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and
> >>>>>>>>>>>>> the WG
> >>>>>>>> can
> >>>>>>>>>>>>> decide whether to output a watermark or not and can also
> >>>>>>>>>>>>> mark the
> >>>>>>>>>>>>> output
> >>>>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the
> >>>>>>>> previous
> >>>>>>>>>>>>> watermark assigner interface did) would not allow that
> >>>> flexibility.
> >>>>>>>>>>>>> Regarding checkpointing the watermark and keeping track of
> >>>>>>>>>>>>> the
> >>>>>>>> minimum
> >>>>>>>>>>>>> watermark, this would be the responsibility of the
> >>>>>>>>>>>>> framework (or
> >>>>>> the
> >>>>>>>>>>>>> KafkaConsumer in the current implementation). The
> >>>>>>>>>>>>> user-supplied
> >>>> WG
> >>>>>>>>> does
> >>>>>>>>>>>>> not need to make sure the watermark doesn't regress.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding making the WG a "rich function", I can see the
> >>>> potential
> >>>>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how
> >>>>>>>>>>>>> should
> >>>>>>>> the
> >>>>>>>>>>>>> watermark state be handled in the case of scale-in? It
> >>>>>>>>>>>>> could be
> >>>>>> made
> >>>>>>>>> to
> >>>>>>>>>>>>> work in the Kafka case by attaching the state to the
> >>>>>>>>>>>>> partition
> >>>>>> state
> >>>>>>>>>>>>> that we keep, but then we have potential backwards
> >>>>>>>>>>>>> compatibility
> >>>>>>>>>>>>> problems also for the WM state. Does the WG usually need
> >>>>>>>>>>>>> to keep
> >>>>>> the
> >>>>>>>>>>>>> state or might it be enough if the state is transient,
> >>>>>>>>>>>>> i.e. if
> >>>> you
> >>>>>>>>> have
> >>>>>>>>>>>>> a restart the WG would loose its histogram but it would
> >>>>>>>>>>>>> rebuild
> >>>> it
> >>>>>>>>>>>>> quickly and you would get back to the same steady state as
> >>>> before.
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
> >>>>>>>>>>>>>> Overall I like this proposal; thanks for bringing it
> >>>>>>>>>>>>>> forward,
> >>>>>>>>>>>>>> Aljoscha.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I also like the idea of making the Watermark generator a
> >>>>>>>>>>>>>> rich
> >>>>>>>>> function
> >>>>>>>>>>>> --
> >>>>>>>>>>>>>> this should make it more straightforward to implement
> >>>>>>>>>>>>>> smarter
> >>>>>>>>>>>>>> watermark
> >>>>>>>>>>>>>> generators. Eg, one that uses state to keep statistics
> >>>>>>>>>>>>>> about the
> >>>>>>>>>>>>>> actual
> >>>>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a
> >>>>>> variable
> >>>>>>>>>>>>> delay.
> >>>>>>>>>>>>>> David
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
> >>>>>>>> [hidden email]>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for opening the discussion!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have two comments on the FLIP:
> >>>>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e.
> >>>> open()/
> >>>>>>>>>>>>>>> close(), probably with a Context as argument: I have not
> >>>>>>>>>>>>>>> fully
> >>>>>>>>>>>>>>> thought
> >>>>>>>>>>>>>>> this through but I think that this is more aligned with the
> >>>> rest
> >>>>>>>> of
> >>>>>>>>>>>>>>> our rich functions. In addition, it will allow, for
> >>>>>>>>>>>>>>> example, to
> >>>>>>>>>>>>>>> initialize the Watermark value, if we decide to
> >>>>>>>>>>>>>>> checkpoint the
> >>>>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL
> >>>>>>>>>>>>>>> needs to
> >>>> do
> >>>>>>>>>>>>>>> anything in the open()).
> >>>>>>>>>>>>>>> 2) aligned with the above, and with the case where we
> >>>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about
> >>>>>>>>>>>>>>> how we
> >>>>>>>> could
> >>>>>>>>>>>>>>> implement this in the future. In the FLIP, it is
> >>>>>>>>>>>>>>> proposed to
> >>>>>>>> expose
> >>>>>>>>>>>>>>> the WatermarkOutput in the methods of the
> >>>>>>>>>>>>>>> WatermarkGenerator.
> >>>>>>>> Given
> >>>>>>>>>>>>>>> that there is the implicit contract that watermarks are
> >>>>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will
> >>>>>>>>>>>>>>> have
> >>>> (I
> >>>>>>>>>>>>>>> assume) a check that will compare the last emitted WM
> >>>>>>>>>>>>>>> against
> >>>> the
> >>>>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we
> >>>> risk
> >>>>>>>>>>>>>>> having the user shooting himself on the foot if he/she
> >>>>>>>> accidentally
> >>>>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and
> >>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>> caller do
> >>>>>>>>>>>>>>> not know if the watermark was finally emitted or not (the
> >>>>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
> >>>>>>>> responsible
> >>>>>>>>>>>>>>> for checkpointing the WM?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Given this, why not having the methods as:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> public interface WatermarkGenerator<T> {
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>         Watermark onEvent(T event, long eventTimestamp,
> >>>>>>>>> WatermarkOutput
> >>>>>>>>>>>>>>> output);
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>         Watermark onPeriodicEmit(WatermarkOutput output);
> >>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> and the caller will be the one enforcing any invariants,
> >>>>>>>>>>>>>>> such
> >>>> as
> >>>>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can
> >>>> checkpoint
> >>>>>>>>>>>>>>> anything that is needed as it will have complete
> >>>>>>>>>>>>>>> knowledge as
> >>>> to
> >>>>>>>> if
> >>>>>>>>>>>>>>> the WM was emitted or not.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Kostas
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
> >>>> [hidden email]
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> >>>>>>>>> unification.
> >>>>>>>>>>>> We
> >>>>>>>>>>>>>>>> have considered this FLIP already in the interfaces for
> >>>> FLIP-95
> >>>>>>>> [1]
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> look forward to update to the new unified watermark
> >>>>>>>>>>>>>>>> generators
> >>>>>>>> once
> >>>>>>>>>>>>>>>> FLIP-126 has been accepted.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>>>>>>>>>>>>>>> Hi Everyone!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126:
> >>>>>>>>>>>>>>>>> Unify (and
> >>>>>>>>>>>> separate)
> >>>>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by
> >>>>>>>>>>>>>>>>> Stephan in
> >>>>>> an
> >>>>>>>>>>>>>>>>> experimental branch. I expanded on that work to
> >>>>>>>>>>>>>>>>> provide a PoC
> >>>>>>>> for
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> changes proposed in this FLIP: [2].
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark
> >>>>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that
> >>>>>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>> watermark assignment/extraction in the source need to
> >>>>>>>>>>>>>>>>> support
> >>>>>>>> two
> >>>>>>>>>>>>>>>>> separate interfaces, we have two operator
> >>>>>>>>>>>>>>>>> implementations for
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> different flavours. Also, this makes features such as
> >>>>>>>>>>>>>>>>> generic
> >>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>> for idleness detection more complicated to implemented
> >>>> because
> >>>>>>>> we
> >>>>>>>>>>>>> again
> >>>>>>>>>>>>>>>>> have to support two types of watermark assigners.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> In this FLIP we propose two things:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
> >>>>>>>>> WatermarkGenerator
> >>>>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
> >>>>>>>>>>>>>>>>> The motivation for the first is to simplify future
> >>>>>>>> implementations
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> code duplication. The motivation for the second point is
> >>>> again
> >>>>>>>>> code
> >>>>>>>>>>>>>>>>> deduplication, most assigners currently have to extend
> >>>>>>>>>>>>>>>>> from
> >>>>>> some
> >>>>>>>>>>>> base
> >>>>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or
> >>>> users
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> override an abstract method of the watermark assigner to
> >>>>>> provide
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> timestamp extraction logic.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
> >>>>>>>>>>>> WatermarkGenerator
> >>>>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
> >>>>>>>>>>>> stream/partition
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> idle if no data arrives after a configured timeout.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that
> >>>>>>>>>>>>>>>>> we want
> >>>>>> to
> >>>>>>>>>>>> unify
> >>>>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time
> >>>>>>>>>>>>>>>>> split
> >>>>>> the
> >>>>>>>>>>>>>>>>> timestamp assigner from the watermark generator.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking
> >>>>>>>>>>>>>>>>> forward to
> >>>>>>>>>>>>>>>>> your feedback.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>>
> >>>>>>>>>>>>>>>>> [2]
> >>>> https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>>>>
> >>>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-126: Unify (and separate) Watermark Assigners

Aljoscha Krettek-2
The specifics of naming diverged a bit from the FLIP during
implementation but that should be fine. What matters in the end is the
intention of the FLIP and that the code that is committed in the end is
good and consistent in itself.

Best,
Aljoscha

On 24.05.20 05:12, Guanghui Zhang wrote:

> Hi, @Aljoscha,
> the function param currentTimestamp comment does not match the
> recordTimestamp  "long extractTimestamp(T element, long recordTimestamp)"
> on wiki.
>
> Best,
> Zhangguanghui
>
> Dawid Wysakowicz <[hidden email]> 于2020年5月13日周三 上午12:28写道:
>
>> Thank you for the update and sorry again for chiming in so late...
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 12/05/2020 18:21, Aljoscha Krettek wrote:
>>> Yes, I am also ok with a SerializableTimestampAssigner. This only
>>> looks a bit clumsy in the API but as a user (that uses lambdas) you
>>> should not see this. I pushed changes for this to my branch:
>>> https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
>>>
>>> And yes, recordTimestamp sounds good for the TimestampAssigner. I
>>> admit I didn't read this well enough and only saw nativeTimestamp.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 12.05.20 17:16, Dawid Wysakowicz wrote:
>>>> I have similar thoughts to @Stephan
>>>>
>>>> Ad. 1 I tried something like this on your branch:
>>>>
>>>>       /**
>>>>        * Adds the given {@link TimestampAssigner} to this {@link
>>>> WatermarkStrategies}. For top-level classes that implement both
>>>> Serializable and TimestampAssigner
>>>>        */
>>>>       public <TA extends TimestampAssigner<T> & Serializable>
>>>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) {
>>>>           checkNotNull(timestampAssigner, "timestampAssigner");
>>>>           this.timestampAssigner = timestampAssigner;
>>>>           return this;
>>>>       }
>>>>
>>>>      @FunctionalInterface
>>>>       public interface SerializableTimestampAssigner<T> extends
>>>> TimestampAssigner<T>, Serializable {
>>>>       }
>>>>
>>>>        /**
>>>>         * Adds the given {@link TimestampAssigner} to this {@link
>>>> WatermarkStrategies}.
>>>>        * Helper method for serializable lambdas.
>>>>        */
>>>>       public WatermarkStrategies<T>
>>>> withTimestampAssigner(SerializableTimestampAssigner<T>
>>>> timestampAssigner) {
>>>>           checkNotNull(timestampAssigner, "timestampAssigner");
>>>>           this.timestampAssigner = timestampAssigner;
>>>>           return this;
>>>>       }
>>>>
>>>> But I understand if that's too hacky. It's just a pity that we must
>>>> enforce limitations on an interface that are not strictly necessary.
>>>>
>>>> Ad 2/3
>>>>
>>>> I am aware the watermark assigner/timestamp extractor can be applied
>>>> further down the graph. Originally I also wanted to suggest
>>>> sourceTimestamp and SourceTimestampAssigner, but then I realized it can
>>>> be used also after the sources as you correctly pointed out. Even if the
>>>> TimestampAssigner is used after the source there might be some
>>>> native/record timestamp in the StreamRecord, that could've been
>>>> extracted by previous assigner.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 12/05/2020 16:47, Stephan Ewen wrote:
>>>>> @Aljoscha
>>>>>
>>>>> About (1) could we have an interface SerializableTimestampAssigner that
>>>>> simply mixes in the java.io.Serializable interface? Or will this be too
>>>>> clumsy?
>>>>>
>>>>> About (3) RecordTimeStamp seems to fit both cases (in-source-record
>>>>> timestamp, in stream-record timestamp).
>>>>>
>>>>> On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Definitely +1 to point 2) raised by Dawid. I'm not sure on points
>>>>>> 1) and
>>>>>> 3).
>>>>>>
>>>>>> 1) I can see the benefit of that but in reality most timestamp
>>>>>> assigners
>>>>>> will probably need to be Serializable. If you look at my (updated) POC
>>>>>> branch [1] you can see how a TimestampAssigner would be specified
>>>>>> on the
>>>>>> WatermarkStrategies helper class: [2]. The signature of this would
>>>>>> have
>>>>>> to be changed to something like:
>>>>>>
>>>>>> public <TA extends TimestampAssigner<T> & Serializable>
>>>>>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)
>>>>>>
>>>>>> Then, however, it would not be possible for users to specify a
>>>>>> lambda or
>>>>>> anonymous inner function for the TimestampAssigner like this:
>>>>>>
>>>>>> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
>>>>>>                   .forGenerator(new PeriodicTestWatermarkGenerator())
>>>>>>                   .withTimestampAssigner((event, timestamp) -> event)
>>>>>>                   .build();
>>>>>>
>>>>>> 3) This makes sense if we only allow WatermarkStrategies on sources,
>>>>>> where the previous timestamp really is the "native" timestamp.
>>>>>> Currently, we also allow setting watermark strategies at arbitrary
>>>>>> points in the graph. I'm thinking we probably should only allow
>>>>>> that in
>>>>>> sources but it's not the reality currently. I'm not against
>>>>>> renaming it,
>>>>>> just voicing those thoughts.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>>
>> https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
>>>>>> [2]
>>>>>>
>>>>>>
>> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81
>>>>>>
>>>>>>
>>>>>> On 12.05.20 15:48, Stephan Ewen wrote:
>>>>>>> +1 to all of Dawid's suggestions, makes a lot of sense to me
>>>>>>>
>>>>>>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz
>>>>>>> <[hidden email]
>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Aljoscha,
>>>>>>>>
>>>>>>>> Sorry for adding comments during the vote, but I have some really
>>>>>>>> minor
>>>>>>>> suggestions that should not influence the voting thread imo.
>>>>>>>>
>>>>>>>> 1) Does it make sense to have the TimestampAssigner extend from
>>>>>>>> Flink's
>>>>>>>> Function? This implies it has to be serializable which with the
>>>>>>>> factory
>>>>>>>> pattern is not strictly necessary, right? BTW I really like that you
>>>>>>>> suggested the FunctionInterface annotation there.
>>>>>>>>
>>>>>>>> 2) Could we rename the IdentityTimestampAssigner to e.g.
>>>>>>>>
>>>>>>
>> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
>>>>>>
>>>>>>>> Personally I found the IdentityTimestampAssigner a bit misleading
>>>>>>>> as it
>>>>>>>> usually mean a no-op. Which did not click for me, as I assumed it
>>>>>>>> somehow returns the incoming record itself.
>>>>>>>>
>>>>>>>> 3) Could we rename the second parameter of
>>>>>>>> TimestampAssigner#extract to
>>>>>>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
>>>>>>>> above. This parameter was also a bit confusing for me as I
>>>>>>>> thought at
>>>>>>>> times its somehow related to
>>>>>>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the
>>>>>>>> whole system currentTimestamp.
>>>>>>>>
>>>>>>>> Other than those three points I like the proposal and I was about to
>>>>>>>> vote +1 if it was not for those three points.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Dawid
>>>>>>>>
>>>>>>>> On 11/05/2020 16:57, Jark Wu wrote:
>>>>>>>>> Thanks for the explanation. I like the fatory pattern to make the
>>>>>> member
>>>>>>>>> variables immutable and final.
>>>>>>>>>
>>>>>>>>> So +1 to the proposal.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <[hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I am fine with that.
>>>>>>>>>>
>>>>>>>>>> Much of the principles seem agreed upon. I understand the need to
>>>>>>>> support
>>>>>>>>>> code-generated extractors and we should support most of it
>>>>>>>>>> already (as
>>>>>>>>>> Aljoscha mentioned via the factories) can extend this if needed.
>>>>>>>>>>
>>>>>>>>>> I think that the factory approach supports code-generated
>>>>>>>>>> extractors
>>>>>> in
>>>>>>>> a
>>>>>>>>>> cleaner way even than an extractor with an open/init method.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek
>>>>>>>>>> <[hidden email]
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> We're slightly running out of time. I would propose we vote on
>>>>>>>>>>> the
>>>>>>>> basic
>>>>>>>>>>> principle and remain open to later additions. This feature is
>>>>>>>>>>> quite
>>>>>>>>>>> important to make the new Kafka Source that is developed as
>>>>>>>>>>> part of
>>>>>>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy
>>>>>>>>>>> interfaces
>>>>>> in
>>>>>>>>>>> the newly added connector.
>>>>>>>>>>>
>>>>>>>>>>> I know that's a bit unorthodox but would everyone be OK with
>>>>>>>>>>> what's
>>>>>>>>>>> currently there and then we iterate?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>>
>>>>>>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote:
>>>>>>>>>>>> Ah, I meant to write this in my previous email, sorry about
>>>>>>>>>>>> that.
>>>>>>>>>>>>
>>>>>>>>>>>> The WatermarkStrategy, which is basically a factory for a
>>>>>>>>>>>> WatermarkGenerator is the replacement for the open() method.
>>>>>>>>>>>> This is
>>>>>>>>>> the
>>>>>>>>>>>> same strategy that was followed for StreamOperatorFactory,
>>>>>>>>>>>> which was
>>>>>>>>>>>> introduced to allow code generation in the Table API [1]. If
>>>>>>>>>>>> we need
>>>>>>>>>>>> metrics or other things we would add that as a parameter to the
>>>>>>>> factory
>>>>>>>>>>>> method. What do you think?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>
>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
>>>>>>>>>>>>
>>>>>>>>>>>> On 10.05.20 05:07, Jark Wu wrote:
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for
>>>>>>>>>>>>> Table&SQL to
>>>>>>>>>>>>> compile the generated code.
>>>>>>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is
>>>>>>>>>>>>> defined
>>>>>>>>>>> using
>>>>>>>>>>>>> SQL expressions, we will
>>>>>>>>>>>>> translate and generate Java code for the expressions. If we
>>>>>>>>>>>>> have
>>>>>>>>>>>>> `open()/close()`, we don't need lazy initialization.
>>>>>>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the
>>>>>>>>>> current
>>>>>>>>>>>>> watermark, the dirty timestamps (null value), etc.
>>>>>>>>>>>>> So I think a simple `open()/close()` with a context which
>>>>>>>>>>>>> can get
>>>>>>>>>>>>> MetricGroup is nice and not complex for the first version.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <[hidden email]>
>>>>>> wrote:
>>>>>>>>>>>>>> Thanks, Aljoscha, for picking this up.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree with the approach of doing the here proposed set of
>>>>>> changes
>>>>>>>>>> for
>>>>>>>>>>>>>> now. It already makes things simpler and adds idleness support
>>>>>>>>>>>>>> everywhere.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Rich functions and state always add complexity, let's do
>>>>>>>>>>>>>> this in a
>>>>>>>>>> next
>>>>>>>>>>>>>> step, if we have a really compelling case.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
>>>>>>>> proposal
>>>>>>>>>>> is
>>>>>>>>>>>>>>> basically to turn emitting into a "flatMap", we give the
>>>>>>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and
>>>>>>>>>>>>>>> the WG
>>>>>>>>>> can
>>>>>>>>>>>>>>> decide whether to output a watermark or not and can also
>>>>>>>>>>>>>>> mark the
>>>>>>>>>>>>>>> output
>>>>>>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the
>>>>>>>>>> previous
>>>>>>>>>>>>>>> watermark assigner interface did) would not allow that
>>>>>> flexibility.
>>>>>>>>>>>>>>> Regarding checkpointing the watermark and keeping track of
>>>>>>>>>>>>>>> the
>>>>>>>>>> minimum
>>>>>>>>>>>>>>> watermark, this would be the responsibility of the
>>>>>>>>>>>>>>> framework (or
>>>>>>>> the
>>>>>>>>>>>>>>> KafkaConsumer in the current implementation). The
>>>>>>>>>>>>>>> user-supplied
>>>>>> WG
>>>>>>>>>>> does
>>>>>>>>>>>>>>> not need to make sure the watermark doesn't regress.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding making the WG a "rich function", I can see the
>>>>>> potential
>>>>>>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how
>>>>>>>>>>>>>>> should
>>>>>>>>>> the
>>>>>>>>>>>>>>> watermark state be handled in the case of scale-in? It
>>>>>>>>>>>>>>> could be
>>>>>>>> made
>>>>>>>>>>> to
>>>>>>>>>>>>>>> work in the Kafka case by attaching the state to the
>>>>>>>>>>>>>>> partition
>>>>>>>> state
>>>>>>>>>>>>>>> that we keep, but then we have potential backwards
>>>>>>>>>>>>>>> compatibility
>>>>>>>>>>>>>>> problems also for the WM state. Does the WG usually need
>>>>>>>>>>>>>>> to keep
>>>>>>>> the
>>>>>>>>>>>>>>> state or might it be enough if the state is transient,
>>>>>>>>>>>>>>> i.e. if
>>>>>> you
>>>>>>>>>>> have
>>>>>>>>>>>>>>> a restart the WG would loose its histogram but it would
>>>>>>>>>>>>>>> rebuild
>>>>>> it
>>>>>>>>>>>>>>> quickly and you would get back to the same steady state as
>>>>>> before.
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote:
>>>>>>>>>>>>>>>> Overall I like this proposal; thanks for bringing it
>>>>>>>>>>>>>>>> forward,
>>>>>>>>>>>>>>>> Aljoscha.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I also like the idea of making the Watermark generator a
>>>>>>>>>>>>>>>> rich
>>>>>>>>>>> function
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> this should make it more straightforward to implement
>>>>>>>>>>>>>>>> smarter
>>>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>>>> generators. Eg, one that uses state to keep statistics
>>>>>>>>>>>>>>>> about the
>>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a
>>>>>>>> variable
>>>>>>>>>>>>>>> delay.
>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Hi Aljoscha,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for opening the discussion!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have two comments on the FLIP:
>>>>>>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e.
>>>>>> open()/
>>>>>>>>>>>>>>>>> close(), probably with a Context as argument: I have not
>>>>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>>>>> thought
>>>>>>>>>>>>>>>>> this through but I think that this is more aligned with the
>>>>>> rest
>>>>>>>>>> of
>>>>>>>>>>>>>>>>> our rich functions. In addition, it will allow, for
>>>>>>>>>>>>>>>>> example, to
>>>>>>>>>>>>>>>>> initialize the Watermark value, if we decide to
>>>>>>>>>>>>>>>>> checkpoint the
>>>>>>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL
>>>>>>>>>>>>>>>>> needs to
>>>>>> do
>>>>>>>>>>>>>>>>> anything in the open()).
>>>>>>>>>>>>>>>>> 2) aligned with the above, and with the case where we
>>>>>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about
>>>>>>>>>>>>>>>>> how we
>>>>>>>>>> could
>>>>>>>>>>>>>>>>> implement this in the future. In the FLIP, it is
>>>>>>>>>>>>>>>>> proposed to
>>>>>>>>>> expose
>>>>>>>>>>>>>>>>> the WatermarkOutput in the methods of the
>>>>>>>>>>>>>>>>> WatermarkGenerator.
>>>>>>>>>> Given
>>>>>>>>>>>>>>>>> that there is the implicit contract that watermarks are
>>>>>>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will
>>>>>>>>>>>>>>>>> have
>>>>>> (I
>>>>>>>>>>>>>>>>> assume) a check that will compare the last emitted WM
>>>>>>>>>>>>>>>>> against
>>>>>> the
>>>>>>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we
>>>>>> risk
>>>>>>>>>>>>>>>>> having the user shooting himself on the foot if he/she
>>>>>>>>>> accidentally
>>>>>>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and
>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>> caller do
>>>>>>>>>>>>>>>>> not know if the watermark was finally emitted or not (the
>>>>>>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
>>>>>>>>>> responsible
>>>>>>>>>>>>>>>>> for checkpointing the WM?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Given this, why not having the methods as:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> public interface WatermarkGenerator<T> {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          Watermark onEvent(T event, long eventTimestamp,
>>>>>>>>>>> WatermarkOutput
>>>>>>>>>>>>>>>>> output);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>          Watermark onPeriodicEmit(WatermarkOutput output);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and the caller will be the one enforcing any invariants,
>>>>>>>>>>>>>>>>> such
>>>>>> as
>>>>>>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can
>>>>>> checkpoint
>>>>>>>>>>>>>>>>> anything that is needed as it will have complete
>>>>>>>>>>>>>>>>> knowledge as
>>>>>> to
>>>>>>>>>> if
>>>>>>>>>>>>>>>>> the WM was emitted or not.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
>>>>>> [hidden email]
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
>>>>>>>>>>> unification.
>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>> have considered this FLIP already in the interfaces for
>>>>>> FLIP-95
>>>>>>>>>> [1]
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> look forward to update to the new unified watermark
>>>>>>>>>>>>>>>>>> generators
>>>>>>>>>> once
>>>>>>>>>>>>>>>>>> FLIP-126 has been accepted.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
>>>>>>>>>>>>>>>>>>> Hi Everyone!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126:
>>>>>>>>>>>>>>>>>>> Unify (and
>>>>>>>>>>>>>> separate)
>>>>>>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by
>>>>>>>>>>>>>>>>>>> Stephan in
>>>>>>>> an
>>>>>>>>>>>>>>>>>>> experimental branch. I expanded on that work to
>>>>>>>>>>>>>>>>>>> provide a PoC
>>>>>>>>>> for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> changes proposed in this FLIP: [2].
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark
>>>>>>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
>>>>>>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
>>>>>>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that
>>>>>>>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>> watermark assignment/extraction in the source need to
>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>> two
>>>>>>>>>>>>>>>>>>> separate interfaces, we have two operator
>>>>>>>>>>>>>>>>>>> implementations for
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> different flavours. Also, this makes features such as
>>>>>>>>>>>>>>>>>>> generic
>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>> for idleness detection more complicated to implemented
>>>>>> because
>>>>>>>>>> we
>>>>>>>>>>>>>>> again
>>>>>>>>>>>>>>>>>>> have to support two types of watermark assigners.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In this FLIP we propose two things:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface
>>>>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner
>>>>>>>>>>>>>>>>>>> The motivation for the first is to simplify future
>>>>>>>>>> implementations
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> code duplication. The motivation for the second point is
>>>>>> again
>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>> deduplication, most assigners currently have to extend
>>>>>>>>>>>>>>>>>>> from
>>>>>>>> some
>>>>>>>>>>>>>> base
>>>>>>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or
>>>>>> users
>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> override an abstract method of the watermark assigner to
>>>>>>>> provide
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> timestamp extraction logic.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping
>>>>>>>>>>>>>> WatermarkGenerator
>>>>>>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a
>>>>>>>>>>>>>> stream/partition
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> idle if no data arrives after a configured timeout.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that
>>>>>>>>>>>>>>>>>>> we want
>>>>>>>> to
>>>>>>>>>>>>>> unify
>>>>>>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time
>>>>>>>>>>>>>>>>>>> split
>>>>>>>> the
>>>>>>>>>>>>>>>>>>> timestamp assigner from the watermark generator.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking
>>>>>>>>>>>>>>>>>>> forward to
>>>>>>>>>>>>>>>>>>> your feedback.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
>>>>>>
>>>>>>>>>>>>>>>>>>> [2]
>>>>>> https://github.com/aljoscha/flink/tree/stephan-event-time
>>>>>>>>
>>>>>>
>>>>
>>>
>>
>>
>