[DISCUSS] Allowed Lateness in Flink

classic Classic list List threaded Threaded
32 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
Hi Folks,
as part of my effort to improve the windowing in Flink [1] I also thought
about lateness, accumulating/discarding and window cleanup. I have some
ideas on this but I would love to get feedback from the community as I
think that these things are important for everyone doing event-time
windowing on Flink.

The basic problem is this: Some elements can arrive behind the watermark if
the watermark is not 100 % correct (which it is not, in most cases, I would
assume). We need to provide API that allows to specify what happens when
these late elements arrive. There are two main knobs for the user here:

- Allowed Lateness: How late can an element be before it is completely
ignored, i.e. simply discarded

- Accumulating/Discarding Fired Windows: When we fire a window, do we purge
the contents or do we keep it around until the watermark passes the end of
end window plus the allowed lateness? If we keep the window a late element
will be added to the window and the window will be emitted again. If don't
keep the window then the late element will essentially trigger emission of
a one-element window.

This is somewhat straightforward to implement: If accumulating set a timer
for the end of the window plus the allowed lateness. Cleanup the window
when that fires (basically). All in event-time with watermarks.

 My problem is only this: what should happen if the user specifies some
allowed lateness and/or accumulating mode but uses processing-time
windowing. For processing-time windows these don't make sense because
elements cannot can be late by definition. The problem is, that we cannot
figure out, by looking at a WindowAssigner or the Windows that it assigns
to elements whether these windows are in event-time or processing-time
domain. At the API level this is also not easily visible, since a user
might have set the "stream-time-characteristic" to event-time but still use
a processing-time window (plus trigger) in the program.

Any ideas for solving this are extremely welcome. :-)

Cheers,
Aljoscha

[1]
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
By the way. The way I see to fixing this is extending WindowAssigner with
an "isEventTime()" method and then allow accumulating/lateness in the
WindowOperator only if this is true.

But it seems a but hacky because it special cases event-time. But then
again, maybe we need to special case it ...

On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]> wrote:

> Hi Folks,
> as part of my effort to improve the windowing in Flink [1] I also thought
> about lateness, accumulating/discarding and window cleanup. I have some
> ideas on this but I would love to get feedback from the community as I
> think that these things are important for everyone doing event-time
> windowing on Flink.
>
> The basic problem is this: Some elements can arrive behind the watermark
> if the watermark is not 100 % correct (which it is not, in most cases, I
> would assume). We need to provide API that allows to specify what happens
> when these late elements arrive. There are two main knobs for the user here:
>
> - Allowed Lateness: How late can an element be before it is completely
> ignored, i.e. simply discarded
>
> - Accumulating/Discarding Fired Windows: When we fire a window, do we
> purge the contents or do we keep it around until the watermark passes the
> end of end window plus the allowed lateness? If we keep the window a late
> element will be added to the window and the window will be emitted again.
> If don't keep the window then the late element will essentially trigger
> emission of a one-element window.
>
> This is somewhat straightforward to implement: If accumulating set a timer
> for the end of the window plus the allowed lateness. Cleanup the window
> when that fires (basically). All in event-time with watermarks.
>
>  My problem is only this: what should happen if the user specifies some
> allowed lateness and/or accumulating mode but uses processing-time
> windowing. For processing-time windows these don't make sense because
> elements cannot can be late by definition. The problem is, that we cannot
> figure out, by looking at a WindowAssigner or the Windows that it assigns
> to elements whether these windows are in event-time or processing-time
> domain. At the API level this is also not easily visible, since a user
> might have set the "stream-time-characteristic" to event-time but still use
> a processing-time window (plus trigger) in the program.
>
> Any ideas for solving this are extremely welcome. :-)
>
> Cheers,
> Aljoscha
>
> [1]
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

mxm
Hi Aljoscha,

Thank you for the detailed design document.

Wouldn't it be ok to allow these new concepts regardless of the time
semantics? For Event Time and Ingestion Time "Lateness" and
"Accumulating/Discarding" make sense. If the user chooses Processing
time then these can be ignored during translation of the StreamGraph
(possibly with a warning).

Detecting when these concepts make sense should be possible by
checking the "Stream Charateristics" of the ExecutionEnvironment or
the involved classes (e.g. SlidingProcessingTimeWindows) in the
StreamGraph. If the users uses a custom WindowAssigner then the user
has to take care that it is used correctly. I don't like the
"isEventTime()" method. Even with the additional method, users could
return 'true' there although they meant 'false', right? So this does
not really solve the problem that it is hard to distinguish Event Time
and Processing Time semantics in Flink.

Another approach that I could think of is getting rid of
'System.currentTimeMillis()' and only allow to get time via a special
interface that WindowAssigners implement. Then we could determine what
time is assigned and also verify that it is actually used (in contrast
to the isEventTime() method). Would that be an option or would it
break the API?

Cheers,
Max

On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]> wrote:

> By the way. The way I see to fixing this is extending WindowAssigner with
> an "isEventTime()" method and then allow accumulating/lateness in the
> WindowOperator only if this is true.
>
> But it seems a but hacky because it special cases event-time. But then
> again, maybe we need to special case it ...
>
> On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi Folks,
>> as part of my effort to improve the windowing in Flink [1] I also thought
>> about lateness, accumulating/discarding and window cleanup. I have some
>> ideas on this but I would love to get feedback from the community as I
>> think that these things are important for everyone doing event-time
>> windowing on Flink.
>>
>> The basic problem is this: Some elements can arrive behind the watermark
>> if the watermark is not 100 % correct (which it is not, in most cases, I
>> would assume). We need to provide API that allows to specify what happens
>> when these late elements arrive. There are two main knobs for the user here:
>>
>> - Allowed Lateness: How late can an element be before it is completely
>> ignored, i.e. simply discarded
>>
>> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>> purge the contents or do we keep it around until the watermark passes the
>> end of end window plus the allowed lateness? If we keep the window a late
>> element will be added to the window and the window will be emitted again.
>> If don't keep the window then the late element will essentially trigger
>> emission of a one-element window.
>>
>> This is somewhat straightforward to implement: If accumulating set a timer
>> for the end of the window plus the allowed lateness. Cleanup the window
>> when that fires (basically). All in event-time with watermarks.
>>
>>  My problem is only this: what should happen if the user specifies some
>> allowed lateness and/or accumulating mode but uses processing-time
>> windowing. For processing-time windows these don't make sense because
>> elements cannot can be late by definition. The problem is, that we cannot
>> figure out, by looking at a WindowAssigner or the Windows that it assigns
>> to elements whether these windows are in event-time or processing-time
>> domain. At the API level this is also not easily visible, since a user
>> might have set the "stream-time-characteristic" to event-time but still use
>> a processing-time window (plus trigger) in the program.
>>
>> Any ideas for solving this are extremely welcome. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> [1]
>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
Hi Max,
thanks for the Feedback and suggestions! I'll try and address each
paragraph separately.

I'm afraid deciding based on the "StreamTimeCharacteristic is not possible
since a user can use processing-time windows in their job even though the
set the characteristic to event-time. Enabling event time does not disable
processing time, it just enables an additional feature. (IMHO, the handling
of the StreamTimeCharacteristic is still somewhat problematic.)

Making the decision based purely on the class of the WindowAssigner is also
not possible since we don't know in advance which WindowAssigners the users
will write and what time characteristic they will use.

Regarding the third proposition. Removing 'System.currentTimeMillis()' is
very desirable and part of my proposal. However, it is still meant as being
separate from "event-time" since a Trigger/WindowAssigner might need both.
For example, a Trigger might want to do early triggering a few
(processing-time) seconds after the first elements arrive and proper
triggering once the watermark for the end of the window arrives.

These are good ideas but I'm afraid we still don't have a good solution.
This whole processing time/event time business is just very tricky.

Cheers,
Aljoscha

On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:

> Hi Aljoscha,
>
> Thank you for the detailed design document.
>
> Wouldn't it be ok to allow these new concepts regardless of the time
> semantics? For Event Time and Ingestion Time "Lateness" and
> "Accumulating/Discarding" make sense. If the user chooses Processing
> time then these can be ignored during translation of the StreamGraph
> (possibly with a warning).
>
> Detecting when these concepts make sense should be possible by
> checking the "Stream Charateristics" of the ExecutionEnvironment or
> the involved classes (e.g. SlidingProcessingTimeWindows) in the
> StreamGraph. If the users uses a custom WindowAssigner then the user
> has to take care that it is used correctly. I don't like the
> "isEventTime()" method. Even with the additional method, users could
> return 'true' there although they meant 'false', right? So this does
> not really solve the problem that it is hard to distinguish Event Time
> and Processing Time semantics in Flink.
>
> Another approach that I could think of is getting rid of
> 'System.currentTimeMillis()' and only allow to get time via a special
> interface that WindowAssigners implement. Then we could determine what
> time is assigned and also verify that it is actually used (in contrast
> to the isEventTime() method). Would that be an option or would it
> break the API?
>
> Cheers,
> Max
>
> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]>
> wrote:
> > By the way. The way I see to fixing this is extending WindowAssigner with
> > an "isEventTime()" method and then allow accumulating/lateness in the
> > WindowOperator only if this is true.
> >
> > But it seems a but hacky because it special cases event-time. But then
> > again, maybe we need to special case it ...
> >
> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]>
> wrote:
> >
> >> Hi Folks,
> >> as part of my effort to improve the windowing in Flink [1] I also
> thought
> >> about lateness, accumulating/discarding and window cleanup. I have some
> >> ideas on this but I would love to get feedback from the community as I
> >> think that these things are important for everyone doing event-time
> >> windowing on Flink.
> >>
> >> The basic problem is this: Some elements can arrive behind the watermark
> >> if the watermark is not 100 % correct (which it is not, in most cases, I
> >> would assume). We need to provide API that allows to specify what
> happens
> >> when these late elements arrive. There are two main knobs for the user
> here:
> >>
> >> - Allowed Lateness: How late can an element be before it is completely
> >> ignored, i.e. simply discarded
> >>
> >> - Accumulating/Discarding Fired Windows: When we fire a window, do we
> >> purge the contents or do we keep it around until the watermark passes
> the
> >> end of end window plus the allowed lateness? If we keep the window a
> late
> >> element will be added to the window and the window will be emitted
> again.
> >> If don't keep the window then the late element will essentially trigger
> >> emission of a one-element window.
> >>
> >> This is somewhat straightforward to implement: If accumulating set a
> timer
> >> for the end of the window plus the allowed lateness. Cleanup the window
> >> when that fires (basically). All in event-time with watermarks.
> >>
> >>  My problem is only this: what should happen if the user specifies some
> >> allowed lateness and/or accumulating mode but uses processing-time
> >> windowing. For processing-time windows these don't make sense because
> >> elements cannot can be late by definition. The problem is, that we
> cannot
> >> figure out, by looking at a WindowAssigner or the Windows that it
> assigns
> >> to elements whether these windows are in event-time or processing-time
> >> domain. At the API level this is also not easily visible, since a user
> >> might have set the "stream-time-characteristic" to event-time but still
> use
> >> a processing-time window (plus trigger) in the program.
> >>
> >> Any ideas for solving this are extremely welcome. :-)
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> [1]
> >>
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
Hi,
I created a new doc specifically about the interplay of lateness and window
state garbage collection:
https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing

There is still some stuff that needs to be figured out, both in the new doc
and the existing doc. For example, we need to decide whether to make
accumulating/discarding behavior global for a window operation or
controllable by triggers. Initially, I suggested to make
accumulating/discarding a global setting for the window operation because
we can get away with keeping less state if we know that we always discard
when firing. Please take a look at the new doc to see what I'm talking
about there.

Feedback very welcome!

Cheers,
Aljoscha

On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <[hidden email]> wrote:

> Hi Max,
> thanks for the Feedback and suggestions! I'll try and address each
> paragraph separately.
>
> I'm afraid deciding based on the "StreamTimeCharacteristic is not possible
> since a user can use processing-time windows in their job even though the
> set the characteristic to event-time. Enabling event time does not disable
> processing time, it just enables an additional feature. (IMHO, the handling
> of the StreamTimeCharacteristic is still somewhat problematic.)
>
> Making the decision based purely on the class of the WindowAssigner is
> also not possible since we don't know in advance which WindowAssigners the
> users will write and what time characteristic they will use.
>
> Regarding the third proposition. Removing 'System.currentTimeMillis()' is
> very desirable and part of my proposal. However, it is still meant as being
> separate from "event-time" since a Trigger/WindowAssigner might need both.
> For example, a Trigger might want to do early triggering a few
> (processing-time) seconds after the first elements arrive and proper
> triggering once the watermark for the end of the window arrives.
>
> These are good ideas but I'm afraid we still don't have a good solution.
> This whole processing time/event time business is just very tricky.
>
> Cheers,
> Aljoscha
>
> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:
>
>> Hi Aljoscha,
>>
>> Thank you for the detailed design document.
>>
>> Wouldn't it be ok to allow these new concepts regardless of the time
>> semantics? For Event Time and Ingestion Time "Lateness" and
>> "Accumulating/Discarding" make sense. If the user chooses Processing
>> time then these can be ignored during translation of the StreamGraph
>> (possibly with a warning).
>>
>> Detecting when these concepts make sense should be possible by
>> checking the "Stream Charateristics" of the ExecutionEnvironment or
>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
>> StreamGraph. If the users uses a custom WindowAssigner then the user
>> has to take care that it is used correctly. I don't like the
>> "isEventTime()" method. Even with the additional method, users could
>> return 'true' there although they meant 'false', right? So this does
>> not really solve the problem that it is hard to distinguish Event Time
>> and Processing Time semantics in Flink.
>>
>> Another approach that I could think of is getting rid of
>> 'System.currentTimeMillis()' and only allow to get time via a special
>> interface that WindowAssigners implement. Then we could determine what
>> time is assigned and also verify that it is actually used (in contrast
>> to the isEventTime() method). Would that be an option or would it
>> break the API?
>>
>> Cheers,
>> Max
>>
>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]>
>> wrote:
>> > By the way. The way I see to fixing this is extending WindowAssigner
>> with
>> > an "isEventTime()" method and then allow accumulating/lateness in the
>> > WindowOperator only if this is true.
>> >
>> > But it seems a but hacky because it special cases event-time. But then
>> > again, maybe we need to special case it ...
>> >
>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]>
>> wrote:
>> >
>> >> Hi Folks,
>> >> as part of my effort to improve the windowing in Flink [1] I also
>> thought
>> >> about lateness, accumulating/discarding and window cleanup. I have some
>> >> ideas on this but I would love to get feedback from the community as I
>> >> think that these things are important for everyone doing event-time
>> >> windowing on Flink.
>> >>
>> >> The basic problem is this: Some elements can arrive behind the
>> watermark
>> >> if the watermark is not 100 % correct (which it is not, in most cases,
>> I
>> >> would assume). We need to provide API that allows to specify what
>> happens
>> >> when these late elements arrive. There are two main knobs for the user
>> here:
>> >>
>> >> - Allowed Lateness: How late can an element be before it is completely
>> >> ignored, i.e. simply discarded
>> >>
>> >> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>> >> purge the contents or do we keep it around until the watermark passes
>> the
>> >> end of end window plus the allowed lateness? If we keep the window a
>> late
>> >> element will be added to the window and the window will be emitted
>> again.
>> >> If don't keep the window then the late element will essentially trigger
>> >> emission of a one-element window.
>> >>
>> >> This is somewhat straightforward to implement: If accumulating set a
>> timer
>> >> for the end of the window plus the allowed lateness. Cleanup the window
>> >> when that fires (basically). All in event-time with watermarks.
>> >>
>> >>  My problem is only this: what should happen if the user specifies some
>> >> allowed lateness and/or accumulating mode but uses processing-time
>> >> windowing. For processing-time windows these don't make sense because
>> >> elements cannot can be late by definition. The problem is, that we
>> cannot
>> >> figure out, by looking at a WindowAssigner or the Windows that it
>> assigns
>> >> to elements whether these windows are in event-time or processing-time
>> >> domain. At the API level this is also not easily visible, since a user
>> >> might have set the "stream-time-characteristic" to event-time but
>> still use
>> >> a processing-time window (plus trigger) in the program.
>> >>
>> >> Any ideas for solving this are extremely welcome. :-)
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> [1]
>> >>
>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>> >>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Gyula Fóra
Thanks Aljoscha :) I added some comments that might seem relevant from the
users point of view.

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. máj. 30.,
H, 10:33):

> Hi,
> I created a new doc specifically about the interplay of lateness and
> window state garbage collection:
> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing
>
> There is still some stuff that needs to be figured out, both in the new
> doc and the existing doc. For example, we need to decide whether to make
> accumulating/discarding behavior global for a window operation or
> controllable by triggers. Initially, I suggested to make
> accumulating/discarding a global setting for the window operation because
> we can get away with keeping less state if we know that we always discard
> when firing. Please take a look at the new doc to see what I'm talking
> about there.
>
> Feedback very welcome!
>
> Cheers,
> Aljoscha
>
> On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi Max,
>> thanks for the Feedback and suggestions! I'll try and address each
>> paragraph separately.
>>
>> I'm afraid deciding based on the "StreamTimeCharacteristic is not
>> possible since a user can use processing-time windows in their job even
>> though the set the characteristic to event-time. Enabling event time does
>> not disable processing time, it just enables an additional feature. (IMHO,
>> the handling of the StreamTimeCharacteristic is still somewhat problematic.)
>>
>> Making the decision based purely on the class of the WindowAssigner is
>> also not possible since we don't know in advance which WindowAssigners the
>> users will write and what time characteristic they will use.
>>
>> Regarding the third proposition. Removing 'System.currentTimeMillis()' is
>> very desirable and part of my proposal. However, it is still meant as being
>> separate from "event-time" since a Trigger/WindowAssigner might need both.
>> For example, a Trigger might want to do early triggering a few
>> (processing-time) seconds after the first elements arrive and proper
>> triggering once the watermark for the end of the window arrives.
>>
>> These are good ideas but I'm afraid we still don't have a good solution.
>> This whole processing time/event time business is just very tricky.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thank you for the detailed design document.
>>>
>>> Wouldn't it be ok to allow these new concepts regardless of the time
>>> semantics? For Event Time and Ingestion Time "Lateness" and
>>> "Accumulating/Discarding" make sense. If the user chooses Processing
>>> time then these can be ignored during translation of the StreamGraph
>>> (possibly with a warning).
>>>
>>> Detecting when these concepts make sense should be possible by
>>> checking the "Stream Charateristics" of the ExecutionEnvironment or
>>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
>>> StreamGraph. If the users uses a custom WindowAssigner then the user
>>> has to take care that it is used correctly. I don't like the
>>> "isEventTime()" method. Even with the additional method, users could
>>> return 'true' there although they meant 'false', right? So this does
>>> not really solve the problem that it is hard to distinguish Event Time
>>> and Processing Time semantics in Flink.
>>>
>>> Another approach that I could think of is getting rid of
>>> 'System.currentTimeMillis()' and only allow to get time via a special
>>> interface that WindowAssigners implement. Then we could determine what
>>> time is assigned and also verify that it is actually used (in contrast
>>> to the isEventTime() method). Would that be an option or would it
>>> break the API?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]>
>>> wrote:
>>> > By the way. The way I see to fixing this is extending WindowAssigner
>>> with
>>> > an "isEventTime()" method and then allow accumulating/lateness in the
>>> > WindowOperator only if this is true.
>>> >
>>> > But it seems a but hacky because it special cases event-time. But then
>>> > again, maybe we need to special case it ...
>>> >
>>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]>
>>> wrote:
>>> >
>>> >> Hi Folks,
>>> >> as part of my effort to improve the windowing in Flink [1] I also
>>> thought
>>> >> about lateness, accumulating/discarding and window cleanup. I have
>>> some
>>> >> ideas on this but I would love to get feedback from the community as I
>>> >> think that these things are important for everyone doing event-time
>>> >> windowing on Flink.
>>> >>
>>> >> The basic problem is this: Some elements can arrive behind the
>>> watermark
>>> >> if the watermark is not 100 % correct (which it is not, in most
>>> cases, I
>>> >> would assume). We need to provide API that allows to specify what
>>> happens
>>> >> when these late elements arrive. There are two main knobs for the
>>> user here:
>>> >>
>>> >> - Allowed Lateness: How late can an element be before it is completely
>>> >> ignored, i.e. simply discarded
>>> >>
>>> >> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>>> >> purge the contents or do we keep it around until the watermark passes
>>> the
>>> >> end of end window plus the allowed lateness? If we keep the window a
>>> late
>>> >> element will be added to the window and the window will be emitted
>>> again.
>>> >> If don't keep the window then the late element will essentially
>>> trigger
>>> >> emission of a one-element window.
>>> >>
>>> >> This is somewhat straightforward to implement: If accumulating set a
>>> timer
>>> >> for the end of the window plus the allowed lateness. Cleanup the
>>> window
>>> >> when that fires (basically). All in event-time with watermarks.
>>> >>
>>> >>  My problem is only this: what should happen if the user specifies
>>> some
>>> >> allowed lateness and/or accumulating mode but uses processing-time
>>> >> windowing. For processing-time windows these don't make sense because
>>> >> elements cannot can be late by definition. The problem is, that we
>>> cannot
>>> >> figure out, by looking at a WindowAssigner or the Windows that it
>>> assigns
>>> >> to elements whether these windows are in event-time or processing-time
>>> >> domain. At the API level this is also not easily visible, since a user
>>> >> might have set the "stream-time-characteristic" to event-time but
>>> still use
>>> >> a processing-time window (plus trigger) in the program.
>>> >>
>>> >> Any ideas for solving this are extremely welcome. :-)
>>> >>
>>> >> Cheers,
>>> >> Aljoscha
>>> >>
>>> >> [1]
>>> >>
>>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>> >>
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
Thanks for the feedback! :-) I already read the comments on the file.

On Mon, 30 May 2016 at 11:10 Gyula Fóra <[hidden email]> wrote:

> Thanks Aljoscha :) I added some comments that might seem relevant from the
> users point of view.
>
> Gyula
>
> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. máj. 30.,
> H, 10:33):
>
> > Hi,
> > I created a new doc specifically about the interplay of lateness and
> > window state garbage collection:
> >
> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing
> >
> > There is still some stuff that needs to be figured out, both in the new
> > doc and the existing doc. For example, we need to decide whether to make
> > accumulating/discarding behavior global for a window operation or
> > controllable by triggers. Initially, I suggested to make
> > accumulating/discarding a global setting for the window operation because
> > we can get away with keeping less state if we know that we always discard
> > when firing. Please take a look at the new doc to see what I'm talking
> > about there.
> >
> > Feedback very welcome!
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <[hidden email]>
> wrote:
> >
> >> Hi Max,
> >> thanks for the Feedback and suggestions! I'll try and address each
> >> paragraph separately.
> >>
> >> I'm afraid deciding based on the "StreamTimeCharacteristic is not
> >> possible since a user can use processing-time windows in their job even
> >> though the set the characteristic to event-time. Enabling event time
> does
> >> not disable processing time, it just enables an additional feature.
> (IMHO,
> >> the handling of the StreamTimeCharacteristic is still somewhat
> problematic.)
> >>
> >> Making the decision based purely on the class of the WindowAssigner is
> >> also not possible since we don't know in advance which WindowAssigners
> the
> >> users will write and what time characteristic they will use.
> >>
> >> Regarding the third proposition. Removing 'System.currentTimeMillis()'
> is
> >> very desirable and part of my proposal. However, it is still meant as
> being
> >> separate from "event-time" since a Trigger/WindowAssigner might need
> both.
> >> For example, a Trigger might want to do early triggering a few
> >> (processing-time) seconds after the first elements arrive and proper
> >> triggering once the watermark for the end of the window arrives.
> >>
> >> These are good ideas but I'm afraid we still don't have a good solution.
> >> This whole processing time/event time business is just very tricky.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:
> >>
> >>> Hi Aljoscha,
> >>>
> >>> Thank you for the detailed design document.
> >>>
> >>> Wouldn't it be ok to allow these new concepts regardless of the time
> >>> semantics? For Event Time and Ingestion Time "Lateness" and
> >>> "Accumulating/Discarding" make sense. If the user chooses Processing
> >>> time then these can be ignored during translation of the StreamGraph
> >>> (possibly with a warning).
> >>>
> >>> Detecting when these concepts make sense should be possible by
> >>> checking the "Stream Charateristics" of the ExecutionEnvironment or
> >>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
> >>> StreamGraph. If the users uses a custom WindowAssigner then the user
> >>> has to take care that it is used correctly. I don't like the
> >>> "isEventTime()" method. Even with the additional method, users could
> >>> return 'true' there although they meant 'false', right? So this does
> >>> not really solve the problem that it is hard to distinguish Event Time
> >>> and Processing Time semantics in Flink.
> >>>
> >>> Another approach that I could think of is getting rid of
> >>> 'System.currentTimeMillis()' and only allow to get time via a special
> >>> interface that WindowAssigners implement. Then we could determine what
> >>> time is assigned and also verify that it is actually used (in contrast
> >>> to the isEventTime() method). Would that be an option or would it
> >>> break the API?
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]
> >
> >>> wrote:
> >>> > By the way. The way I see to fixing this is extending WindowAssigner
> >>> with
> >>> > an "isEventTime()" method and then allow accumulating/lateness in the
> >>> > WindowOperator only if this is true.
> >>> >
> >>> > But it seems a but hacky because it special cases event-time. But
> then
> >>> > again, maybe we need to special case it ...
> >>> >
> >>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]>
> >>> wrote:
> >>> >
> >>> >> Hi Folks,
> >>> >> as part of my effort to improve the windowing in Flink [1] I also
> >>> thought
> >>> >> about lateness, accumulating/discarding and window cleanup. I have
> >>> some
> >>> >> ideas on this but I would love to get feedback from the community
> as I
> >>> >> think that these things are important for everyone doing event-time
> >>> >> windowing on Flink.
> >>> >>
> >>> >> The basic problem is this: Some elements can arrive behind the
> >>> watermark
> >>> >> if the watermark is not 100 % correct (which it is not, in most
> >>> cases, I
> >>> >> would assume). We need to provide API that allows to specify what
> >>> happens
> >>> >> when these late elements arrive. There are two main knobs for the
> >>> user here:
> >>> >>
> >>> >> - Allowed Lateness: How late can an element be before it is
> completely
> >>> >> ignored, i.e. simply discarded
> >>> >>
> >>> >> - Accumulating/Discarding Fired Windows: When we fire a window, do
> we
> >>> >> purge the contents or do we keep it around until the watermark
> passes
> >>> the
> >>> >> end of end window plus the allowed lateness? If we keep the window a
> >>> late
> >>> >> element will be added to the window and the window will be emitted
> >>> again.
> >>> >> If don't keep the window then the late element will essentially
> >>> trigger
> >>> >> emission of a one-element window.
> >>> >>
> >>> >> This is somewhat straightforward to implement: If accumulating set a
> >>> timer
> >>> >> for the end of the window plus the allowed lateness. Cleanup the
> >>> window
> >>> >> when that fires (basically). All in event-time with watermarks.
> >>> >>
> >>> >>  My problem is only this: what should happen if the user specifies
> >>> some
> >>> >> allowed lateness and/or accumulating mode but uses processing-time
> >>> >> windowing. For processing-time windows these don't make sense
> because
> >>> >> elements cannot can be late by definition. The problem is, that we
> >>> cannot
> >>> >> figure out, by looking at a WindowAssigner or the Windows that it
> >>> assigns
> >>> >> to elements whether these windows are in event-time or
> processing-time
> >>> >> domain. At the API level this is also not easily visible, since a
> user
> >>> >> might have set the "stream-time-characteristic" to event-time but
> >>> still use
> >>> >> a processing-time window (plus trigger) in the program.
> >>> >>
> >>> >> Any ideas for solving this are extremely welcome. :-)
> >>> >>
> >>> >> Cheers,
> >>> >> Aljoscha
> >>> >>
> >>> >> [1]
> >>> >>
> >>>
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
> >>> >>
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
Hi,
I cleaned up the document a bit and added sections to address comments on
the doc:
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing
(I
also marked proposed features that are already implemented as [done].)

The main thing that remains to be figured out is how we deal with purging,
i.e. whether the trigger can decide to purge a window or whether the
WindowOperator should do this and also what happens when window state is
garbage collected. The original proposal was to reduce the current set of
trigger results from (CONTINUE, FIRE, PURGE, FIRE_AND_PURGE) to (CONTINUE,
FIRE) and have a global flag in the WindowOperator that says whether firing
windows should be purged (DISCARDING) or kept for a bit, until the allowed
lateness expires (ACCUMULATING). Based on comments by Elias I added a
section that sketches an alternative where the triggers are in charge of
purging and also decide what should happen in case of window cleanup.

One thing we should also keep in mind is how we can make the windowing API
easy to use for people that don't need all the bells and whistles of custom
triggers, allowed lateness and so on. This is partially covered by the
proposal to add composite triggers but I feel we can go further there.

In the future, it might be good to to discussions directly on the ML and
then change the document accordingly. This way everyone can follow the
discussion on the ML. I also feel that Google Doc comments often don't give
enough space for expressing more complex opinions.

Cheers,
Aljoscha


On Mon, 30 May 2016 at 11:23 Aljoscha Krettek <[hidden email]> wrote:

> Thanks for the feedback! :-) I already read the comments on the file.
>
> On Mon, 30 May 2016 at 11:10 Gyula Fóra <[hidden email]> wrote:
>
>> Thanks Aljoscha :) I added some comments that might seem relevant from the
>> users point of view.
>>
>> Gyula
>>
>> Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. máj. 30.,
>> H, 10:33):
>>
>> > Hi,
>> > I created a new doc specifically about the interplay of lateness and
>> > window state garbage collection:
>> >
>> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing
>> >
>> > There is still some stuff that needs to be figured out, both in the new
>> > doc and the existing doc. For example, we need to decide whether to make
>> > accumulating/discarding behavior global for a window operation or
>> > controllable by triggers. Initially, I suggested to make
>> > accumulating/discarding a global setting for the window operation
>> because
>> > we can get away with keeping less state if we know that we always
>> discard
>> > when firing. Please take a look at the new doc to see what I'm talking
>> > about there.
>> >
>> > Feedback very welcome!
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <[hidden email]>
>> wrote:
>> >
>> >> Hi Max,
>> >> thanks for the Feedback and suggestions! I'll try and address each
>> >> paragraph separately.
>> >>
>> >> I'm afraid deciding based on the "StreamTimeCharacteristic is not
>> >> possible since a user can use processing-time windows in their job even
>> >> though the set the characteristic to event-time. Enabling event time
>> does
>> >> not disable processing time, it just enables an additional feature.
>> (IMHO,
>> >> the handling of the StreamTimeCharacteristic is still somewhat
>> problematic.)
>> >>
>> >> Making the decision based purely on the class of the WindowAssigner is
>> >> also not possible since we don't know in advance which WindowAssigners
>> the
>> >> users will write and what time characteristic they will use.
>> >>
>> >> Regarding the third proposition. Removing 'System.currentTimeMillis()'
>> is
>> >> very desirable and part of my proposal. However, it is still meant as
>> being
>> >> separate from "event-time" since a Trigger/WindowAssigner might need
>> both.
>> >> For example, a Trigger might want to do early triggering a few
>> >> (processing-time) seconds after the first elements arrive and proper
>> >> triggering once the watermark for the end of the window arrives.
>> >>
>> >> These are good ideas but I'm afraid we still don't have a good
>> solution.
>> >> This whole processing time/event time business is just very tricky.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]>
>> wrote:
>> >>
>> >>> Hi Aljoscha,
>> >>>
>> >>> Thank you for the detailed design document.
>> >>>
>> >>> Wouldn't it be ok to allow these new concepts regardless of the time
>> >>> semantics? For Event Time and Ingestion Time "Lateness" and
>> >>> "Accumulating/Discarding" make sense. If the user chooses Processing
>> >>> time then these can be ignored during translation of the StreamGraph
>> >>> (possibly with a warning).
>> >>>
>> >>> Detecting when these concepts make sense should be possible by
>> >>> checking the "Stream Charateristics" of the ExecutionEnvironment or
>> >>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
>> >>> StreamGraph. If the users uses a custom WindowAssigner then the user
>> >>> has to take care that it is used correctly. I don't like the
>> >>> "isEventTime()" method. Even with the additional method, users could
>> >>> return 'true' there although they meant 'false', right? So this does
>> >>> not really solve the problem that it is hard to distinguish Event Time
>> >>> and Processing Time semantics in Flink.
>> >>>
>> >>> Another approach that I could think of is getting rid of
>> >>> 'System.currentTimeMillis()' and only allow to get time via a special
>> >>> interface that WindowAssigners implement. Then we could determine what
>> >>> time is assigned and also verify that it is actually used (in contrast
>> >>> to the isEventTime() method). Would that be an option or would it
>> >>> break the API?
>> >>>
>> >>> Cheers,
>> >>> Max
>> >>>
>> >>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <
>> [hidden email]>
>> >>> wrote:
>> >>> > By the way. The way I see to fixing this is extending WindowAssigner
>> >>> with
>> >>> > an "isEventTime()" method and then allow accumulating/lateness in
>> the
>> >>> > WindowOperator only if this is true.
>> >>> >
>> >>> > But it seems a but hacky because it special cases event-time. But
>> then
>> >>> > again, maybe we need to special case it ...
>> >>> >
>> >>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]>
>> >>> wrote:
>> >>> >
>> >>> >> Hi Folks,
>> >>> >> as part of my effort to improve the windowing in Flink [1] I also
>> >>> thought
>> >>> >> about lateness, accumulating/discarding and window cleanup. I have
>> >>> some
>> >>> >> ideas on this but I would love to get feedback from the community
>> as I
>> >>> >> think that these things are important for everyone doing event-time
>> >>> >> windowing on Flink.
>> >>> >>
>> >>> >> The basic problem is this: Some elements can arrive behind the
>> >>> watermark
>> >>> >> if the watermark is not 100 % correct (which it is not, in most
>> >>> cases, I
>> >>> >> would assume). We need to provide API that allows to specify what
>> >>> happens
>> >>> >> when these late elements arrive. There are two main knobs for the
>> >>> user here:
>> >>> >>
>> >>> >> - Allowed Lateness: How late can an element be before it is
>> completely
>> >>> >> ignored, i.e. simply discarded
>> >>> >>
>> >>> >> - Accumulating/Discarding Fired Windows: When we fire a window, do
>> we
>> >>> >> purge the contents or do we keep it around until the watermark
>> passes
>> >>> the
>> >>> >> end of end window plus the allowed lateness? If we keep the window
>> a
>> >>> late
>> >>> >> element will be added to the window and the window will be emitted
>> >>> again.
>> >>> >> If don't keep the window then the late element will essentially
>> >>> trigger
>> >>> >> emission of a one-element window.
>> >>> >>
>> >>> >> This is somewhat straightforward to implement: If accumulating set
>> a
>> >>> timer
>> >>> >> for the end of the window plus the allowed lateness. Cleanup the
>> >>> window
>> >>> >> when that fires (basically). All in event-time with watermarks.
>> >>> >>
>> >>> >>  My problem is only this: what should happen if the user specifies
>> >>> some
>> >>> >> allowed lateness and/or accumulating mode but uses processing-time
>> >>> >> windowing. For processing-time windows these don't make sense
>> because
>> >>> >> elements cannot can be late by definition. The problem is, that we
>> >>> cannot
>> >>> >> figure out, by looking at a WindowAssigner or the Windows that it
>> >>> assigns
>> >>> >> to elements whether these windows are in event-time or
>> processing-time
>> >>> >> domain. At the API level this is also not easily visible, since a
>> user
>> >>> >> might have set the "stream-time-characteristic" to event-time but
>> >>> still use
>> >>> >> a processing-time window (plus trigger) in the program.
>> >>> >>
>> >>> >> Any ideas for solving this are extremely welcome. :-)
>> >>> >>
>> >>> >> Cheers,
>> >>> >> Aljoscha
>> >>> >>
>> >>> >> [1]
>> >>> >>
>> >>>
>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>> >>> >>
>> >>>
>> >>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Ufuk Celebi-2
On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <[hidden email]> wrote:
> In the future, it might be good to to discussions directly on the ML and
> then change the document accordingly. This way everyone can follow the
> discussion on the ML. I also feel that Google Doc comments often don't give
> enough space for expressing more complex opinions.

I agree! Would you mind raising this point as a separate discussion on dev@?
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
I did:
https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
 ;-)

On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]> wrote:

> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <[hidden email]>
> wrote:
> > In the future, it might be good to to discussions directly on the ML and
> > then change the document accordingly. This way everyone can follow the
> > discussion on the ML. I also feel that Google Doc comments often don't
> give
> > enough space for expressing more complex opinions.
>
> I agree! Would you mind raising this point as a separate discussion on dev@
> ?
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Vishnu Viswanath
Hi,

I was going through the suggested improvements in window, and I have
few questions/suggestion on improvement regarding the Evictor.

1) I am having a use case where I have to create a custom Evictor that will
evict elements from the window based on the value (e.g., if I have elements
are of case class Item(id: Int, type:String) then evict elements that has
type="a"). I believe this is not currently possible.
2) this is somewhat related to 1) where there should be an option to evict
elements from anywhere in the window. not only from the beginning of the
window. (e.g., apply the delta function to all elements and remove all
those don't pass. I checked the code and evict method just returns the
number of elements to be removed and processTriggerResult just skips those
many elements from the beginning.
3) Add an option to enables the user to decide if the eviction should
happen before the apply function or after the apply function. Currently it
is before the apply function, but I have a use case where I need to first
apply the function and evict afterward.

I am doing these for a POC so I think I can modify the flink code base to
make these changes and build, but I would appreciate any suggestion on
whether these are viable changes or will there any performance issue if
these are done. Also any pointer on where to start(e.g, do I create a new
class similar to EvictingWindowOperator that extends WindowOperator?)

Thanks and Regards,
Vishnu Viswanath,

On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <[hidden email]>
wrote:

> I did:
>
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
>  ;-)
>
> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]> wrote:
>
> > On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <[hidden email]>
> > wrote:
> > > In the future, it might be good to to discussions directly on the ML
> and
> > > then change the document accordingly. This way everyone can follow the
> > > discussion on the ML. I also feel that Google Doc comments often don't
> > give
> > > enough space for expressing more complex opinions.
> >
> > I agree! Would you mind raising this point as a separate discussion on
> dev@
> > ?
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
@Vishnu Funny you should ask that because I have a design doc lying around.
I'll open a new mail thread to not hijack this one.

On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <[hidden email]>
wrote:

> Hi,
>
> I was going through the suggested improvements in window, and I have
> few questions/suggestion on improvement regarding the Evictor.
>
> 1) I am having a use case where I have to create a custom Evictor that will
> evict elements from the window based on the value (e.g., if I have elements
> are of case class Item(id: Int, type:String) then evict elements that has
> type="a"). I believe this is not currently possible.
> 2) this is somewhat related to 1) where there should be an option to evict
> elements from anywhere in the window. not only from the beginning of the
> window. (e.g., apply the delta function to all elements and remove all
> those don't pass. I checked the code and evict method just returns the
> number of elements to be removed and processTriggerResult just skips those
> many elements from the beginning.
> 3) Add an option to enables the user to decide if the eviction should
> happen before the apply function or after the apply function. Currently it
> is before the apply function, but I have a use case where I need to first
> apply the function and evict afterward.
>
> I am doing these for a POC so I think I can modify the flink code base to
> make these changes and build, but I would appreciate any suggestion on
> whether these are viable changes or will there any performance issue if
> these are done. Also any pointer on where to start(e.g, do I create a new
> class similar to EvictingWindowOperator that extends WindowOperator?)
>
> Thanks and Regards,
> Vishnu Viswanath,
>
> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > I did:
> >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
> >  ;-)
> >
> > On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]> wrote:
> >
> > > On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <[hidden email]>
> > > wrote:
> > > > In the future, it might be good to to discussions directly on the ML
> > and
> > > > then change the document accordingly. This way everyone can follow
> the
> > > > discussion on the ML. I also feel that Google Doc comments often
> don't
> > > give
> > > > enough space for expressing more complex opinions.
> > >
> > > I agree! Would you mind raising this point as a separate discussion on
> > dev@
> > > ?
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Kostas Kloudas
Hi,

In the effort to move the discussion to the mailing list, rather than the doc,
there was a comment in the doc:

“It seems this proposal marries the allowed lateness of events and the discarding of window state. In most use cases this should be sufficient, but there are instances where having independent control of these may be useful.

For instance, you may have a job that computes some aggregate, like a sum. You may want to keep the window state around for a while, but not too long. Yet you may want to continue processing late events after you discarded the window state. It is possible that your stream sinks can make use of this data. For instance, they may be writing to a data store that returns an error if a row already exists, which allow the sink to read the existing row and update it with the new data."

To which I would like to reply:

If I understand your use-case correctly, I believe that the proposed binding of the allowed lateness to the state purging does not impose any problem. The lateness specifies the upper time bound, after which the state will be discarded. Between the start of a window and its (end + allowedLateness) you can write custom triggers that fire, purge the state, or do nothing. Given this, I suppose that, at the most extreme case, you can specify an allowed lateness of Long.MaxValue and do the purging of the state "manually". By doing this, you remove the safeguard of letting the system purge the state at some point in time, and you can do your own custom state management that fits your needs.

Thanks,
Kostas

> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <[hidden email]> wrote:
>
> @Vishnu Funny you should ask that because I have a design doc lying around.
> I'll open a new mail thread to not hijack this one.
>
> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <[hidden email]>
> wrote:
>
>> Hi,
>>
>> I was going through the suggested improvements in window, and I have
>> few questions/suggestion on improvement regarding the Evictor.
>>
>> 1) I am having a use case where I have to create a custom Evictor that will
>> evict elements from the window based on the value (e.g., if I have elements
>> are of case class Item(id: Int, type:String) then evict elements that has
>> type="a"). I believe this is not currently possible.
>> 2) this is somewhat related to 1) where there should be an option to evict
>> elements from anywhere in the window. not only from the beginning of the
>> window. (e.g., apply the delta function to all elements and remove all
>> those don't pass. I checked the code and evict method just returns the
>> number of elements to be removed and processTriggerResult just skips those
>> many elements from the beginning.
>> 3) Add an option to enables the user to decide if the eviction should
>> happen before the apply function or after the apply function. Currently it
>> is before the apply function, but I have a use case where I need to first
>> apply the function and evict afterward.
>>
>> I am doing these for a POC so I think I can modify the flink code base to
>> make these changes and build, but I would appreciate any suggestion on
>> whether these are viable changes or will there any performance issue if
>> these are done. Also any pointer on where to start(e.g, do I create a new
>> class similar to EvictingWindowOperator that extends WindowOperator?)
>>
>> Thanks and Regards,
>> Vishnu Viswanath,
>>
>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> I did:
>>>
>>>
>> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
>>> ;-)
>>>
>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]> wrote:
>>>
>>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <[hidden email]>
>>>> wrote:
>>>>> In the future, it might be good to to discussions directly on the ML
>>> and
>>>>> then change the document accordingly. This way everyone can follow
>> the
>>>>> discussion on the ML. I also feel that Google Doc comments often
>> don't
>>>> give
>>>>> enough space for expressing more complex opinions.
>>>>
>>>> I agree! Would you mind raising this point as a separate discussion on
>>> dev@
>>>> ?
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

chenqin
+1 for allowedLateness scenario.

The rationale behind is there are backfills or data issues hold in-window
data till watermark pass end time. It cause sink write partial output.

Allow high allowedLateness threshold makes life easier to merge those
results and overwrite partial output with correct output at sink. But yeah,
pipeline author is at risk of blow up statebackend with huge states.

Alternatively, In some case, if sink allows read-check-merge operation,
window can explicit call out events ingested after allowedLateness. It asks
pipeline author mitigated these events in a way outside of flink ecosystem.
The catch is that since everywhere in a flink job can replay and
checkpoint, notification should somehow includes these info as well.

Thanks
Chen

On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <[hidden email]
> wrote:

> Hi,
>
> In the effort to move the discussion to the mailing list, rather than the
> doc,
> there was a comment in the doc:
>
> “It seems this proposal marries the allowed lateness of events and the
> discarding of window state. In most use cases this should be sufficient,
> but there are instances where having independent control of these may be
> useful.
>
> For instance, you may have a job that computes some aggregate, like a sum.
> You may want to keep the window state around for a while, but not too long.
> Yet you may want to continue processing late events after you discarded the
> window state. It is possible that your stream sinks can make use of this
> data. For instance, they may be writing to a data store that returns an
> error if a row already exists, which allow the sink to read the existing
> row and update it with the new data."
>
> To which I would like to reply:
>
> If I understand your use-case correctly, I believe that the proposed
> binding of the allowed lateness to the state purging does not impose any
> problem. The lateness specifies the upper time bound, after which the state
> will be discarded. Between the start of a window and its (end +
> allowedLateness) you can write custom triggers that fire, purge the state,
> or do nothing. Given this, I suppose that, at the most extreme case, you
> can specify an allowed lateness of Long.MaxValue and do the purging of the
> state "manually". By doing this, you remove the safeguard of letting the
> system purge the state at some point in time, and you can do your own
> custom state management that fits your needs.
>
> Thanks,
> Kostas
>
> > On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <[hidden email]>
> wrote:
> >
> > @Vishnu Funny you should ask that because I have a design doc lying
> around.
> > I'll open a new mail thread to not hijack this one.
> >
> > On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
> [hidden email]>
> > wrote:
> >
> >> Hi,
> >>
> >> I was going through the suggested improvements in window, and I have
> >> few questions/suggestion on improvement regarding the Evictor.
> >>
> >> 1) I am having a use case where I have to create a custom Evictor that
> will
> >> evict elements from the window based on the value (e.g., if I have
> elements
> >> are of case class Item(id: Int, type:String) then evict elements that
> has
> >> type="a"). I believe this is not currently possible.
> >> 2) this is somewhat related to 1) where there should be an option to
> evict
> >> elements from anywhere in the window. not only from the beginning of the
> >> window. (e.g., apply the delta function to all elements and remove all
> >> those don't pass. I checked the code and evict method just returns the
> >> number of elements to be removed and processTriggerResult just skips
> those
> >> many elements from the beginning.
> >> 3) Add an option to enables the user to decide if the eviction should
> >> happen before the apply function or after the apply function. Currently
> it
> >> is before the apply function, but I have a use case where I need to
> first
> >> apply the function and evict afterward.
> >>
> >> I am doing these for a POC so I think I can modify the flink code base
> to
> >> make these changes and build, but I would appreciate any suggestion on
> >> whether these are viable changes or will there any performance issue if
> >> these are done. Also any pointer on where to start(e.g, do I create a
> new
> >> class similar to EvictingWindowOperator that extends WindowOperator?)
> >>
> >> Thanks and Regards,
> >> Vishnu Viswanath,
> >>
> >> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <[hidden email]>
> >> wrote:
> >>
> >>> I did:
> >>>
> >>>
> >>
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
> >>> ;-)
> >>>
> >>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]> wrote:
> >>>
> >>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <[hidden email]
> >
> >>>> wrote:
> >>>>> In the future, it might be good to to discussions directly on the ML
> >>> and
> >>>>> then change the document accordingly. This way everyone can follow
> >> the
> >>>>> discussion on the ML. I also feel that Google Doc comments often
> >> don't
> >>>> give
> >>>>> enough space for expressing more complex opinions.
> >>>>
> >>>> I agree! Would you mind raising this point as a separate discussion on
> >>> dev@
> >>>> ?
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
@Chen I added a section at the end of the document regarding access to the
elements that are dropped as late. Right now, the section just mentions
that we have to do this but there is no real proposal yet for how to do it.
Only a rough sketch so that we don't forget about it.

On Fri, 8 Jul 2016 at 07:47 Chen Qin <[hidden email]> wrote:

> +1 for allowedLateness scenario.
>
> The rationale behind is there are backfills or data issues hold in-window
> data till watermark pass end time. It cause sink write partial output.
>
> Allow high allowedLateness threshold makes life easier to merge those
> results and overwrite partial output with correct output at sink. But yeah,
> pipeline author is at risk of blow up statebackend with huge states.
>
> Alternatively, In some case, if sink allows read-check-merge operation,
> window can explicit call out events ingested after allowedLateness. It asks
> pipeline author mitigated these events in a way outside of flink ecosystem.
> The catch is that since everywhere in a flink job can replay and
> checkpoint, notification should somehow includes these info as well.
>
> Thanks
> Chen
>
> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> [hidden email]
> > wrote:
>
> > Hi,
> >
> > In the effort to move the discussion to the mailing list, rather than the
> > doc,
> > there was a comment in the doc:
> >
> > “It seems this proposal marries the allowed lateness of events and the
> > discarding of window state. In most use cases this should be sufficient,
> > but there are instances where having independent control of these may be
> > useful.
> >
> > For instance, you may have a job that computes some aggregate, like a
> sum.
> > You may want to keep the window state around for a while, but not too
> long.
> > Yet you may want to continue processing late events after you discarded
> the
> > window state. It is possible that your stream sinks can make use of this
> > data. For instance, they may be writing to a data store that returns an
> > error if a row already exists, which allow the sink to read the existing
> > row and update it with the new data."
> >
> > To which I would like to reply:
> >
> > If I understand your use-case correctly, I believe that the proposed
> > binding of the allowed lateness to the state purging does not impose any
> > problem. The lateness specifies the upper time bound, after which the
> state
> > will be discarded. Between the start of a window and its (end +
> > allowedLateness) you can write custom triggers that fire, purge the
> state,
> > or do nothing. Given this, I suppose that, at the most extreme case, you
> > can specify an allowed lateness of Long.MaxValue and do the purging of
> the
> > state "manually". By doing this, you remove the safeguard of letting the
> > system purge the state at some point in time, and you can do your own
> > custom state management that fits your needs.
> >
> > Thanks,
> > Kostas
> >
> > > On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <[hidden email]>
> > wrote:
> > >
> > > @Vishnu Funny you should ask that because I have a design doc lying
> > around.
> > > I'll open a new mail thread to not hijack this one.
> > >
> > > On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
> > [hidden email]>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> I was going through the suggested improvements in window, and I have
> > >> few questions/suggestion on improvement regarding the Evictor.
> > >>
> > >> 1) I am having a use case where I have to create a custom Evictor that
> > will
> > >> evict elements from the window based on the value (e.g., if I have
> > elements
> > >> are of case class Item(id: Int, type:String) then evict elements that
> > has
> > >> type="a"). I believe this is not currently possible.
> > >> 2) this is somewhat related to 1) where there should be an option to
> > evict
> > >> elements from anywhere in the window. not only from the beginning of
> the
> > >> window. (e.g., apply the delta function to all elements and remove all
> > >> those don't pass. I checked the code and evict method just returns the
> > >> number of elements to be removed and processTriggerResult just skips
> > those
> > >> many elements from the beginning.
> > >> 3) Add an option to enables the user to decide if the eviction should
> > >> happen before the apply function or after the apply function.
> Currently
> > it
> > >> is before the apply function, but I have a use case where I need to
> > first
> > >> apply the function and evict afterward.
> > >>
> > >> I am doing these for a POC so I think I can modify the flink code base
> > to
> > >> make these changes and build, but I would appreciate any suggestion on
> > >> whether these are viable changes or will there any performance issue
> if
> > >> these are done. Also any pointer on where to start(e.g, do I create a
> > new
> > >> class similar to EvictingWindowOperator that extends WindowOperator?)
> > >>
> > >> Thanks and Regards,
> > >> Vishnu Viswanath,
> > >>
> > >> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <[hidden email]
> >
> > >> wrote:
> > >>
> > >>> I did:
> > >>>
> > >>>
> > >>
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
> > >>> ;-)
> > >>>
> > >>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]> wrote:
> > >>>
> > >>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
> [hidden email]
> > >
> > >>>> wrote:
> > >>>>> In the future, it might be good to to discussions directly on the
> ML
> > >>> and
> > >>>>> then change the document accordingly. This way everyone can follow
> > >> the
> > >>>>> discussion on the ML. I also feel that Google Doc comments often
> > >> don't
> > >>>> give
> > >>>>> enough space for expressing more complex opinions.
> > >>>>
> > >>>> I agree! Would you mind raising this point as a separate discussion
> on
> > >>> dev@
> > >>>> ?
> > >>>>
> > >>>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

chenqin
Sure. Currently, it looks like any element assigned to a too late window
will be dropped silently😓 ?

Having a late window stream imply somehow Flink needs to add one more state
to window and split window state cleanup from window retirement.
I would suggest as simple as adding a function in trigger called
OnLateElement and always fire_purge it would enable developer aware and
handle this case.

Chen



On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <[hidden email]>
wrote:

> @Chen I added a section at the end of the document regarding access to the
> elements that are dropped as late. Right now, the section just mentions
> that we have to do this but there is no real proposal yet for how to do it.
> Only a rough sketch so that we don't forget about it.
>
> On Fri, 8 Jul 2016 at 07:47 Chen Qin <[hidden email]> wrote:
>
> > +1 for allowedLateness scenario.
> >
> > The rationale behind is there are backfills or data issues hold in-window
> > data till watermark pass end time. It cause sink write partial output.
> >
> > Allow high allowedLateness threshold makes life easier to merge those
> > results and overwrite partial output with correct output at sink. But
> yeah,
> > pipeline author is at risk of blow up statebackend with huge states.
> >
> > Alternatively, In some case, if sink allows read-check-merge operation,
> > window can explicit call out events ingested after allowedLateness. It
> asks
> > pipeline author mitigated these events in a way outside of flink
> ecosystem.
> > The catch is that since everywhere in a flink job can replay and
> > checkpoint, notification should somehow includes these info as well.
> >
> > Thanks
> > Chen
> >
> > On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> > [hidden email]
> > > wrote:
> >
> > > Hi,
> > >
> > > In the effort to move the discussion to the mailing list, rather than
> the
> > > doc,
> > > there was a comment in the doc:
> > >
> > > “It seems this proposal marries the allowed lateness of events and the
> > > discarding of window state. In most use cases this should be
> sufficient,
> > > but there are instances where having independent control of these may
> be
> > > useful.
> > >
> > > For instance, you may have a job that computes some aggregate, like a
> > sum.
> > > You may want to keep the window state around for a while, but not too
> > long.
> > > Yet you may want to continue processing late events after you discarded
> > the
> > > window state. It is possible that your stream sinks can make use of
> this
> > > data. For instance, they may be writing to a data store that returns an
> > > error if a row already exists, which allow the sink to read the
> existing
> > > row and update it with the new data."
> > >
> > > To which I would like to reply:
> > >
> > > If I understand your use-case correctly, I believe that the proposed
> > > binding of the allowed lateness to the state purging does not impose
> any
> > > problem. The lateness specifies the upper time bound, after which the
> > state
> > > will be discarded. Between the start of a window and its (end +
> > > allowedLateness) you can write custom triggers that fire, purge the
> > state,
> > > or do nothing. Given this, I suppose that, at the most extreme case,
> you
> > > can specify an allowed lateness of Long.MaxValue and do the purging of
> > the
> > > state "manually". By doing this, you remove the safeguard of letting
> the
> > > system purge the state at some point in time, and you can do your own
> > > custom state management that fits your needs.
> > >
> > > Thanks,
> > > Kostas
> > >
> > > > On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <[hidden email]>
> > > wrote:
> > > >
> > > > @Vishnu Funny you should ask that because I have a design doc lying
> > > around.
> > > > I'll open a new mail thread to not hijack this one.
> > > >
> > > > On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
> > > [hidden email]>
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I was going through the suggested improvements in window, and I have
> > > >> few questions/suggestion on improvement regarding the Evictor.
> > > >>
> > > >> 1) I am having a use case where I have to create a custom Evictor
> that
> > > will
> > > >> evict elements from the window based on the value (e.g., if I have
> > > elements
> > > >> are of case class Item(id: Int, type:String) then evict elements
> that
> > > has
> > > >> type="a"). I believe this is not currently possible.
> > > >> 2) this is somewhat related to 1) where there should be an option to
> > > evict
> > > >> elements from anywhere in the window. not only from the beginning of
> > the
> > > >> window. (e.g., apply the delta function to all elements and remove
> all
> > > >> those don't pass. I checked the code and evict method just returns
> the
> > > >> number of elements to be removed and processTriggerResult just skips
> > > those
> > > >> many elements from the beginning.
> > > >> 3) Add an option to enables the user to decide if the eviction
> should
> > > >> happen before the apply function or after the apply function.
> > Currently
> > > it
> > > >> is before the apply function, but I have a use case where I need to
> > > first
> > > >> apply the function and evict afterward.
> > > >>
> > > >> I am doing these for a POC so I think I can modify the flink code
> base
> > > to
> > > >> make these changes and build, but I would appreciate any suggestion
> on
> > > >> whether these are viable changes or will there any performance issue
> > if
> > > >> these are done. Also any pointer on where to start(e.g, do I create
> a
> > > new
> > > >> class similar to EvictingWindowOperator that extends
> WindowOperator?)
> > > >>
> > > >> Thanks and Regards,
> > > >> Vishnu Viswanath,
> > > >>
> > > >> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <
> [hidden email]
> > >
> > > >> wrote:
> > > >>
> > > >>> I did:
> > > >>>
> > > >>>
> > > >>
> > >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
> > > >>> ;-)
> > > >>>
> > > >>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]> wrote:
> > > >>>
> > > >>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
> > [hidden email]
> > > >
> > > >>>> wrote:
> > > >>>>> In the future, it might be good to to discussions directly on the
> > ML
> > > >>> and
> > > >>>>> then change the document accordingly. This way everyone can
> follow
> > > >> the
> > > >>>>> discussion on the ML. I also feel that Google Doc comments often
> > > >> don't
> > > >>>> give
> > > >>>>> enough space for expressing more complex opinions.
> > > >>>>
> > > >>>> I agree! Would you mind raising this point as a separate
> discussion
> > on
> > > >>> dev@
> > > >>>> ?
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
Hi,
Chen commented this on the doc (I'm mirroring here so everyone can follow):
"It would be cool to be able to access last snapshot of window states
before it get purged. Pipeline author might consider put it to external
storage and deal with late arriving events by restore corresponding window."

My answer:
This is partially covered by the section called "What Happens at
Window-Cleanup Time, Who Decides When to Purge". What I want to introduce
is that the window can have one final emission if there is new data in the
buffers at cleanup time.

The work on this will also depend on this proposal:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
With
this, the WindowFunction can get meta data about the window firing so it
could be informed that this is the last firing before a cleanup and that
there already was an earlier, on-time firing.

Does this cover your concerns, Chen?

Cheers,
Aljoscha

On Sun, 10 Jul 2016 at 21:24 Chen Qin <[hidden email]> wrote:

> Sure. Currently, it looks like any element assigned to a too late window
> will be dropped silently😓 ?
>
> Having a late window stream imply somehow Flink needs to add one more state
> to window and split window state cleanup from window retirement.
> I would suggest as simple as adding a function in trigger called
> OnLateElement and always fire_purge it would enable developer aware and
> handle this case.
>
> Chen
>
>
>
> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > @Chen I added a section at the end of the document regarding access to
> the
> > elements that are dropped as late. Right now, the section just mentions
> > that we have to do this but there is no real proposal yet for how to do
> it.
> > Only a rough sketch so that we don't forget about it.
> >
> > On Fri, 8 Jul 2016 at 07:47 Chen Qin <[hidden email]> wrote:
> >
> > > +1 for allowedLateness scenario.
> > >
> > > The rationale behind is there are backfills or data issues hold
> in-window
> > > data till watermark pass end time. It cause sink write partial output.
> > >
> > > Allow high allowedLateness threshold makes life easier to merge those
> > > results and overwrite partial output with correct output at sink. But
> > yeah,
> > > pipeline author is at risk of blow up statebackend with huge states.
> > >
> > > Alternatively, In some case, if sink allows read-check-merge operation,
> > > window can explicit call out events ingested after allowedLateness. It
> > asks
> > > pipeline author mitigated these events in a way outside of flink
> > ecosystem.
> > > The catch is that since everywhere in a flink job can replay and
> > > checkpoint, notification should somehow includes these info as well.
> > >
> > > Thanks
> > > Chen
> > >
> > > On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> > > [hidden email]
> > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > In the effort to move the discussion to the mailing list, rather than
> > the
> > > > doc,
> > > > there was a comment in the doc:
> > > >
> > > > “It seems this proposal marries the allowed lateness of events and
> the
> > > > discarding of window state. In most use cases this should be
> > sufficient,
> > > > but there are instances where having independent control of these may
> > be
> > > > useful.
> > > >
> > > > For instance, you may have a job that computes some aggregate, like a
> > > sum.
> > > > You may want to keep the window state around for a while, but not too
> > > long.
> > > > Yet you may want to continue processing late events after you
> discarded
> > > the
> > > > window state. It is possible that your stream sinks can make use of
> > this
> > > > data. For instance, they may be writing to a data store that returns
> an
> > > > error if a row already exists, which allow the sink to read the
> > existing
> > > > row and update it with the new data."
> > > >
> > > > To which I would like to reply:
> > > >
> > > > If I understand your use-case correctly, I believe that the proposed
> > > > binding of the allowed lateness to the state purging does not impose
> > any
> > > > problem. The lateness specifies the upper time bound, after which the
> > > state
> > > > will be discarded. Between the start of a window and its (end +
> > > > allowedLateness) you can write custom triggers that fire, purge the
> > > state,
> > > > or do nothing. Given this, I suppose that, at the most extreme case,
> > you
> > > > can specify an allowed lateness of Long.MaxValue and do the purging
> of
> > > the
> > > > state "manually". By doing this, you remove the safeguard of letting
> > the
> > > > system purge the state at some point in time, and you can do your own
> > > > custom state management that fits your needs.
> > > >
> > > > Thanks,
> > > > Kostas
> > > >
> > > > > On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <[hidden email]>
> > > > wrote:
> > > > >
> > > > > @Vishnu Funny you should ask that because I have a design doc lying
> > > > around.
> > > > > I'll open a new mail thread to not hijack this one.
> > > > >
> > > > > On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
> > > > [hidden email]>
> > > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> I was going through the suggested improvements in window, and I
> have
> > > > >> few questions/suggestion on improvement regarding the Evictor.
> > > > >>
> > > > >> 1) I am having a use case where I have to create a custom Evictor
> > that
> > > > will
> > > > >> evict elements from the window based on the value (e.g., if I have
> > > > elements
> > > > >> are of case class Item(id: Int, type:String) then evict elements
> > that
> > > > has
> > > > >> type="a"). I believe this is not currently possible.
> > > > >> 2) this is somewhat related to 1) where there should be an option
> to
> > > > evict
> > > > >> elements from anywhere in the window. not only from the beginning
> of
> > > the
> > > > >> window. (e.g., apply the delta function to all elements and remove
> > all
> > > > >> those don't pass. I checked the code and evict method just returns
> > the
> > > > >> number of elements to be removed and processTriggerResult just
> skips
> > > > those
> > > > >> many elements from the beginning.
> > > > >> 3) Add an option to enables the user to decide if the eviction
> > should
> > > > >> happen before the apply function or after the apply function.
> > > Currently
> > > > it
> > > > >> is before the apply function, but I have a use case where I need
> to
> > > > first
> > > > >> apply the function and evict afterward.
> > > > >>
> > > > >> I am doing these for a POC so I think I can modify the flink code
> > base
> > > > to
> > > > >> make these changes and build, but I would appreciate any
> suggestion
> > on
> > > > >> whether these are viable changes or will there any performance
> issue
> > > if
> > > > >> these are done. Also any pointer on where to start(e.g, do I
> create
> > a
> > > > new
> > > > >> class similar to EvictingWindowOperator that extends
> > WindowOperator?)
> > > > >>
> > > > >> Thanks and Regards,
> > > > >> Vishnu Viswanath,
> > > > >>
> > > > >> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <
> > [hidden email]
> > > >
> > > > >> wrote:
> > > > >>
> > > > >>> I did:
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
> > > > >>> ;-)
> > > > >>>
> > > > >>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]> wrote:
> > > > >>>
> > > > >>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
> > > [hidden email]
> > > > >
> > > > >>>> wrote:
> > > > >>>>> In the future, it might be good to to discussions directly on
> the
> > > ML
> > > > >>> and
> > > > >>>>> then change the document accordingly. This way everyone can
> > follow
> > > > >> the
> > > > >>>>> discussion on the ML. I also feel that Google Doc comments
> often
> > > > >> don't
> > > > >>>> give
> > > > >>>>> enough space for expressing more complex opinions.
> > > > >>>>
> > > > >>>> I agree! Would you mind raising this point as a separate
> > discussion
> > > on
> > > > >>> dev@
> > > > >>>> ?
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

chenqin
Aljoscha,

Yes, that would works for our case!

Chen


On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> Chen commented this on the doc (I'm mirroring here so everyone can follow):
> "It would be cool to be able to access last snapshot of window states
> before it get purged. Pipeline author might consider put it to external
> storage and deal with late arriving events by restore corresponding
> window."
>
> My answer:
> This is partially covered by the section called "What Happens at
> Window-Cleanup Time, Who Decides When to Purge". What I want to introduce
> is that the window can have one final emission if there is new data in the
> buffers at cleanup time.
>
> The work on this will also depend on this proposal:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
> With
> this, the WindowFunction can get meta data about the window firing so it
> could be informed that this is the last firing before a cleanup and that
> there already was an earlier, on-time firing.
>
> Does this cover your concerns, Chen?
>
> Cheers,
> Aljoscha
>
> On Sun, 10 Jul 2016 at 21:24 Chen Qin <[hidden email]> wrote:
>
> > Sure. Currently, it looks like any element assigned to a too late window
> > will be dropped silently😓 ?
> >
> > Having a late window stream imply somehow Flink needs to add one more
> state
> > to window and split window state cleanup from window retirement.
> > I would suggest as simple as adding a function in trigger called
> > OnLateElement and always fire_purge it would enable developer aware and
> > handle this case.
> >
> > Chen
> >
> >
> >
> > On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > @Chen I added a section at the end of the document regarding access to
> > the
> > > elements that are dropped as late. Right now, the section just mentions
> > > that we have to do this but there is no real proposal yet for how to do
> > it.
> > > Only a rough sketch so that we don't forget about it.
> > >
> > > On Fri, 8 Jul 2016 at 07:47 Chen Qin <[hidden email]> wrote:
> > >
> > > > +1 for allowedLateness scenario.
> > > >
> > > > The rationale behind is there are backfills or data issues hold
> > in-window
> > > > data till watermark pass end time. It cause sink write partial
> output.
> > > >
> > > > Allow high allowedLateness threshold makes life easier to merge those
> > > > results and overwrite partial output with correct output at sink. But
> > > yeah,
> > > > pipeline author is at risk of blow up statebackend with huge states.
> > > >
> > > > Alternatively, In some case, if sink allows read-check-merge
> operation,
> > > > window can explicit call out events ingested after allowedLateness.
> It
> > > asks
> > > > pipeline author mitigated these events in a way outside of flink
> > > ecosystem.
> > > > The catch is that since everywhere in a flink job can replay and
> > > > checkpoint, notification should somehow includes these info as well.
> > > >
> > > > Thanks
> > > > Chen
> > > >
> > > > On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> > > > [hidden email]
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > In the effort to move the discussion to the mailing list, rather
> than
> > > the
> > > > > doc,
> > > > > there was a comment in the doc:
> > > > >
> > > > > “It seems this proposal marries the allowed lateness of events and
> > the
> > > > > discarding of window state. In most use cases this should be
> > > sufficient,
> > > > > but there are instances where having independent control of these
> may
> > > be
> > > > > useful.
> > > > >
> > > > > For instance, you may have a job that computes some aggregate,
> like a
> > > > sum.
> > > > > You may want to keep the window state around for a while, but not
> too
> > > > long.
> > > > > Yet you may want to continue processing late events after you
> > discarded
> > > > the
> > > > > window state. It is possible that your stream sinks can make use of
> > > this
> > > > > data. For instance, they may be writing to a data store that
> returns
> > an
> > > > > error if a row already exists, which allow the sink to read the
> > > existing
> > > > > row and update it with the new data."
> > > > >
> > > > > To which I would like to reply:
> > > > >
> > > > > If I understand your use-case correctly, I believe that the
> proposed
> > > > > binding of the allowed lateness to the state purging does not
> impose
> > > any
> > > > > problem. The lateness specifies the upper time bound, after which
> the
> > > > state
> > > > > will be discarded. Between the start of a window and its (end +
> > > > > allowedLateness) you can write custom triggers that fire, purge the
> > > > state,
> > > > > or do nothing. Given this, I suppose that, at the most extreme
> case,
> > > you
> > > > > can specify an allowed lateness of Long.MaxValue and do the purging
> > of
> > > > the
> > > > > state "manually". By doing this, you remove the safeguard of
> letting
> > > the
> > > > > system purge the state at some point in time, and you can do your
> own
> > > > > custom state management that fits your needs.
> > > > >
> > > > > Thanks,
> > > > > Kostas
> > > > >
> > > > > > On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <
> [hidden email]>
> > > > > wrote:
> > > > > >
> > > > > > @Vishnu Funny you should ask that because I have a design doc
> lying
> > > > > around.
> > > > > > I'll open a new mail thread to not hijack this one.
> > > > > >
> > > > > > On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
> > > > > [hidden email]>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >>
> > > > > >> I was going through the suggested improvements in window, and I
> > have
> > > > > >> few questions/suggestion on improvement regarding the Evictor.
> > > > > >>
> > > > > >> 1) I am having a use case where I have to create a custom
> Evictor
> > > that
> > > > > will
> > > > > >> evict elements from the window based on the value (e.g., if I
> have
> > > > > elements
> > > > > >> are of case class Item(id: Int, type:String) then evict elements
> > > that
> > > > > has
> > > > > >> type="a"). I believe this is not currently possible.
> > > > > >> 2) this is somewhat related to 1) where there should be an
> option
> > to
> > > > > evict
> > > > > >> elements from anywhere in the window. not only from the
> beginning
> > of
> > > > the
> > > > > >> window. (e.g., apply the delta function to all elements and
> remove
> > > all
> > > > > >> those don't pass. I checked the code and evict method just
> returns
> > > the
> > > > > >> number of elements to be removed and processTriggerResult just
> > skips
> > > > > those
> > > > > >> many elements from the beginning.
> > > > > >> 3) Add an option to enables the user to decide if the eviction
> > > should
> > > > > >> happen before the apply function or after the apply function.
> > > > Currently
> > > > > it
> > > > > >> is before the apply function, but I have a use case where I need
> > to
> > > > > first
> > > > > >> apply the function and evict afterward.
> > > > > >>
> > > > > >> I am doing these for a POC so I think I can modify the flink
> code
> > > base
> > > > > to
> > > > > >> make these changes and build, but I would appreciate any
> > suggestion
> > > on
> > > > > >> whether these are viable changes or will there any performance
> > issue
> > > > if
> > > > > >> these are done. Also any pointer on where to start(e.g, do I
> > create
> > > a
> > > > > new
> > > > > >> class similar to EvictingWindowOperator that extends
> > > WindowOperator?)
> > > > > >>
> > > > > >> Thanks and Regards,
> > > > > >> Vishnu Viswanath,
> > > > > >>
> > > > > >> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <
> > > [hidden email]
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >>> I did:
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
> > > > > >>> ;-)
> > > > > >>>
> > > > > >>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]>
> wrote:
> > > > > >>>
> > > > > >>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
> > > > [hidden email]
> > > > > >
> > > > > >>>> wrote:
> > > > > >>>>> In the future, it might be good to to discussions directly on
> > the
> > > > ML
> > > > > >>> and
> > > > > >>>>> then change the document accordingly. This way everyone can
> > > follow
> > > > > >> the
> > > > > >>>>> discussion on the ML. I also feel that Google Doc comments
> > often
> > > > > >> don't
> > > > > >>>> give
> > > > > >>>>> enough space for expressing more complex opinions.
> > > > > >>>>
> > > > > >>>> I agree! Would you mind raising this point as a separate
> > > discussion
> > > > on
> > > > > >>> dev@
> > > > > >>>> ?
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

chenqin
In reply to this post by Aljoscha Krettek-2
BTW, do you have rough timeline in term of roll out it to production?

Thanks,
Chen


On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> Chen commented this on the doc (I'm mirroring here so everyone can follow):
> "It would be cool to be able to access last snapshot of window states
> before it get purged. Pipeline author might consider put it to external
> storage and deal with late arriving events by restore corresponding
> window."
>
> My answer:
> This is partially covered by the section called "What Happens at
> Window-Cleanup Time, Who Decides When to Purge". What I want to introduce
> is that the window can have one final emission if there is new data in the
> buffers at cleanup time.
>
> The work on this will also depend on this proposal:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
> With
> this, the WindowFunction can get meta data about the window firing so it
> could be informed that this is the last firing before a cleanup and that
> there already was an earlier, on-time firing.
>
> Does this cover your concerns, Chen?
>
> Cheers,
> Aljoscha
>
> On Sun, 10 Jul 2016 at 21:24 Chen Qin <[hidden email]> wrote:
>
> > Sure. Currently, it looks like any element assigned to a too late window
> > will be dropped silently😓 ?
> >
> > Having a late window stream imply somehow Flink needs to add one more
> state
> > to window and split window state cleanup from window retirement.
> > I would suggest as simple as adding a function in trigger called
> > OnLateElement and always fire_purge it would enable developer aware and
> > handle this case.
> >
> > Chen
> >
> >
> >
> > On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > @Chen I added a section at the end of the document regarding access to
> > the
> > > elements that are dropped as late. Right now, the section just mentions
> > > that we have to do this but there is no real proposal yet for how to do
> > it.
> > > Only a rough sketch so that we don't forget about it.
> > >
> > > On Fri, 8 Jul 2016 at 07:47 Chen Qin <[hidden email]> wrote:
> > >
> > > > +1 for allowedLateness scenario.
> > > >
> > > > The rationale behind is there are backfills or data issues hold
> > in-window
> > > > data till watermark pass end time. It cause sink write partial
> output.
> > > >
> > > > Allow high allowedLateness threshold makes life easier to merge those
> > > > results and overwrite partial output with correct output at sink. But
> > > yeah,
> > > > pipeline author is at risk of blow up statebackend with huge states.
> > > >
> > > > Alternatively, In some case, if sink allows read-check-merge
> operation,
> > > > window can explicit call out events ingested after allowedLateness.
> It
> > > asks
> > > > pipeline author mitigated these events in a way outside of flink
> > > ecosystem.
> > > > The catch is that since everywhere in a flink job can replay and
> > > > checkpoint, notification should somehow includes these info as well.
> > > >
> > > > Thanks
> > > > Chen
> > > >
> > > > On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> > > > [hidden email]
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > In the effort to move the discussion to the mailing list, rather
> than
> > > the
> > > > > doc,
> > > > > there was a comment in the doc:
> > > > >
> > > > > “It seems this proposal marries the allowed lateness of events and
> > the
> > > > > discarding of window state. In most use cases this should be
> > > sufficient,
> > > > > but there are instances where having independent control of these
> may
> > > be
> > > > > useful.
> > > > >
> > > > > For instance, you may have a job that computes some aggregate,
> like a
> > > > sum.
> > > > > You may want to keep the window state around for a while, but not
> too
> > > > long.
> > > > > Yet you may want to continue processing late events after you
> > discarded
> > > > the
> > > > > window state. It is possible that your stream sinks can make use of
> > > this
> > > > > data. For instance, they may be writing to a data store that
> returns
> > an
> > > > > error if a row already exists, which allow the sink to read the
> > > existing
> > > > > row and update it with the new data."
> > > > >
> > > > > To which I would like to reply:
> > > > >
> > > > > If I understand your use-case correctly, I believe that the
> proposed
> > > > > binding of the allowed lateness to the state purging does not
> impose
> > > any
> > > > > problem. The lateness specifies the upper time bound, after which
> the
> > > > state
> > > > > will be discarded. Between the start of a window and its (end +
> > > > > allowedLateness) you can write custom triggers that fire, purge the
> > > > state,
> > > > > or do nothing. Given this, I suppose that, at the most extreme
> case,
> > > you
> > > > > can specify an allowed lateness of Long.MaxValue and do the purging
> > of
> > > > the
> > > > > state "manually". By doing this, you remove the safeguard of
> letting
> > > the
> > > > > system purge the state at some point in time, and you can do your
> own
> > > > > custom state management that fits your needs.
> > > > >
> > > > > Thanks,
> > > > > Kostas
> > > > >
> > > > > > On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <
> [hidden email]>
> > > > > wrote:
> > > > > >
> > > > > > @Vishnu Funny you should ask that because I have a design doc
> lying
> > > > > around.
> > > > > > I'll open a new mail thread to not hijack this one.
> > > > > >
> > > > > > On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
> > > > > [hidden email]>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >>
> > > > > >> I was going through the suggested improvements in window, and I
> > have
> > > > > >> few questions/suggestion on improvement regarding the Evictor.
> > > > > >>
> > > > > >> 1) I am having a use case where I have to create a custom
> Evictor
> > > that
> > > > > will
> > > > > >> evict elements from the window based on the value (e.g., if I
> have
> > > > > elements
> > > > > >> are of case class Item(id: Int, type:String) then evict elements
> > > that
> > > > > has
> > > > > >> type="a"). I believe this is not currently possible.
> > > > > >> 2) this is somewhat related to 1) where there should be an
> option
> > to
> > > > > evict
> > > > > >> elements from anywhere in the window. not only from the
> beginning
> > of
> > > > the
> > > > > >> window. (e.g., apply the delta function to all elements and
> remove
> > > all
> > > > > >> those don't pass. I checked the code and evict method just
> returns
> > > the
> > > > > >> number of elements to be removed and processTriggerResult just
> > skips
> > > > > those
> > > > > >> many elements from the beginning.
> > > > > >> 3) Add an option to enables the user to decide if the eviction
> > > should
> > > > > >> happen before the apply function or after the apply function.
> > > > Currently
> > > > > it
> > > > > >> is before the apply function, but I have a use case where I need
> > to
> > > > > first
> > > > > >> apply the function and evict afterward.
> > > > > >>
> > > > > >> I am doing these for a POC so I think I can modify the flink
> code
> > > base
> > > > > to
> > > > > >> make these changes and build, but I would appreciate any
> > suggestion
> > > on
> > > > > >> whether these are viable changes or will there any performance
> > issue
> > > > if
> > > > > >> these are done. Also any pointer on where to start(e.g, do I
> > create
> > > a
> > > > > new
> > > > > >> class similar to EvictingWindowOperator that extends
> > > WindowOperator?)
> > > > > >>
> > > > > >> Thanks and Regards,
> > > > > >> Vishnu Viswanath,
> > > > > >>
> > > > > >> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <
> > > [hidden email]
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >>> I did:
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
> > > > > >>> ;-)
> > > > > >>>
> > > > > >>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]>
> wrote:
> > > > > >>>
> > > > > >>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
> > > > [hidden email]
> > > > > >
> > > > > >>>> wrote:
> > > > > >>>>> In the future, it might be good to to discussions directly on
> > the
> > > > ML
> > > > > >>> and
> > > > > >>>>> then change the document accordingly. This way everyone can
> > > follow
> > > > > >> the
> > > > > >>>>> discussion on the ML. I also feel that Google Doc comments
> > often
> > > > > >> don't
> > > > > >>>> give
> > > > > >>>>> enough space for expressing more complex opinions.
> > > > > >>>>
> > > > > >>>> I agree! Would you mind raising this point as a separate
> > > discussion
> > > > on
> > > > > >>> dev@
> > > > > >>>> ?
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek-2
Hi,
these new features should make it into the 1.2 release. We are already
working on releasing 1.1 so it won't make it for that one. unfortunately.

Cheers,
Aljoscha

On Mon, 18 Jul 2016 at 23:19 Chen Qin <[hidden email]> wrote:

> BTW, do you have rough timeline in term of roll out it to production?
>
> Thanks,
> Chen
>
>
> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > Chen commented this on the doc (I'm mirroring here so everyone can
> follow):
> > "It would be cool to be able to access last snapshot of window states
> > before it get purged. Pipeline author might consider put it to external
> > storage and deal with late arriving events by restore corresponding
> > window."
> >
> > My answer:
> > This is partially covered by the section called "What Happens at
> > Window-Cleanup Time, Who Decides When to Purge". What I want to introduce
> > is that the window can have one final emission if there is new data in
> the
> > buffers at cleanup time.
> >
> > The work on this will also depend on this proposal:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
> > With
> > this, the WindowFunction can get meta data about the window firing so it
> > could be informed that this is the last firing before a cleanup and that
> > there already was an earlier, on-time firing.
> >
> > Does this cover your concerns, Chen?
> >
> > Cheers,
> > Aljoscha
> >
> > On Sun, 10 Jul 2016 at 21:24 Chen Qin <[hidden email]> wrote:
> >
> > > Sure. Currently, it looks like any element assigned to a too late
> window
> > > will be dropped silently😓 ?
> > >
> > > Having a late window stream imply somehow Flink needs to add one more
> > state
> > > to window and split window state cleanup from window retirement.
> > > I would suggest as simple as adding a function in trigger called
> > > OnLateElement and always fire_purge it would enable developer aware and
> > > handle this case.
> > >
> > > Chen
> > >
> > >
> > >
> > > On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <[hidden email]>
> > > wrote:
> > >
> > > > @Chen I added a section at the end of the document regarding access
> to
> > > the
> > > > elements that are dropped as late. Right now, the section just
> mentions
> > > > that we have to do this but there is no real proposal yet for how to
> do
> > > it.
> > > > Only a rough sketch so that we don't forget about it.
> > > >
> > > > On Fri, 8 Jul 2016 at 07:47 Chen Qin <[hidden email]> wrote:
> > > >
> > > > > +1 for allowedLateness scenario.
> > > > >
> > > > > The rationale behind is there are backfills or data issues hold
> > > in-window
> > > > > data till watermark pass end time. It cause sink write partial
> > output.
> > > > >
> > > > > Allow high allowedLateness threshold makes life easier to merge
> those
> > > > > results and overwrite partial output with correct output at sink.
> But
> > > > yeah,
> > > > > pipeline author is at risk of blow up statebackend with huge
> states.
> > > > >
> > > > > Alternatively, In some case, if sink allows read-check-merge
> > operation,
> > > > > window can explicit call out events ingested after allowedLateness.
> > It
> > > > asks
> > > > > pipeline author mitigated these events in a way outside of flink
> > > > ecosystem.
> > > > > The catch is that since everywhere in a flink job can replay and
> > > > > checkpoint, notification should somehow includes these info as
> well.
> > > > >
> > > > > Thanks
> > > > > Chen
> > > > >
> > > > > On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> > > > > [hidden email]
> > > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > In the effort to move the discussion to the mailing list, rather
> > than
> > > > the
> > > > > > doc,
> > > > > > there was a comment in the doc:
> > > > > >
> > > > > > “It seems this proposal marries the allowed lateness of events
> and
> > > the
> > > > > > discarding of window state. In most use cases this should be
> > > > sufficient,
> > > > > > but there are instances where having independent control of these
> > may
> > > > be
> > > > > > useful.
> > > > > >
> > > > > > For instance, you may have a job that computes some aggregate,
> > like a
> > > > > sum.
> > > > > > You may want to keep the window state around for a while, but not
> > too
> > > > > long.
> > > > > > Yet you may want to continue processing late events after you
> > > discarded
> > > > > the
> > > > > > window state. It is possible that your stream sinks can make use
> of
> > > > this
> > > > > > data. For instance, they may be writing to a data store that
> > returns
> > > an
> > > > > > error if a row already exists, which allow the sink to read the
> > > > existing
> > > > > > row and update it with the new data."
> > > > > >
> > > > > > To which I would like to reply:
> > > > > >
> > > > > > If I understand your use-case correctly, I believe that the
> > proposed
> > > > > > binding of the allowed lateness to the state purging does not
> > impose
> > > > any
> > > > > > problem. The lateness specifies the upper time bound, after which
> > the
> > > > > state
> > > > > > will be discarded. Between the start of a window and its (end +
> > > > > > allowedLateness) you can write custom triggers that fire, purge
> the
> > > > > state,
> > > > > > or do nothing. Given this, I suppose that, at the most extreme
> > case,
> > > > you
> > > > > > can specify an allowed lateness of Long.MaxValue and do the
> purging
> > > of
> > > > > the
> > > > > > state "manually". By doing this, you remove the safeguard of
> > letting
> > > > the
> > > > > > system purge the state at some point in time, and you can do your
> > own
> > > > > > custom state management that fits your needs.
> > > > > >
> > > > > > Thanks,
> > > > > > Kostas
> > > > > >
> > > > > > > On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek <
> > [hidden email]>
> > > > > > wrote:
> > > > > > >
> > > > > > > @Vishnu Funny you should ask that because I have a design doc
> > lying
> > > > > > around.
> > > > > > > I'll open a new mail thread to not hijack this one.
> > > > > > >
> > > > > > > On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
> > > > > > [hidden email]>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi,
> > > > > > >>
> > > > > > >> I was going through the suggested improvements in window, and
> I
> > > have
> > > > > > >> few questions/suggestion on improvement regarding the Evictor.
> > > > > > >>
> > > > > > >> 1) I am having a use case where I have to create a custom
> > Evictor
> > > > that
> > > > > > will
> > > > > > >> evict elements from the window based on the value (e.g., if I
> > have
> > > > > > elements
> > > > > > >> are of case class Item(id: Int, type:String) then evict
> elements
> > > > that
> > > > > > has
> > > > > > >> type="a"). I believe this is not currently possible.
> > > > > > >> 2) this is somewhat related to 1) where there should be an
> > option
> > > to
> > > > > > evict
> > > > > > >> elements from anywhere in the window. not only from the
> > beginning
> > > of
> > > > > the
> > > > > > >> window. (e.g., apply the delta function to all elements and
> > remove
> > > > all
> > > > > > >> those don't pass. I checked the code and evict method just
> > returns
> > > > the
> > > > > > >> number of elements to be removed and processTriggerResult just
> > > skips
> > > > > > those
> > > > > > >> many elements from the beginning.
> > > > > > >> 3) Add an option to enables the user to decide if the eviction
> > > > should
> > > > > > >> happen before the apply function or after the apply function.
> > > > > Currently
> > > > > > it
> > > > > > >> is before the apply function, but I have a use case where I
> need
> > > to
> > > > > > first
> > > > > > >> apply the function and evict afterward.
> > > > > > >>
> > > > > > >> I am doing these for a POC so I think I can modify the flink
> > code
> > > > base
> > > > > > to
> > > > > > >> make these changes and build, but I would appreciate any
> > > suggestion
> > > > on
> > > > > > >> whether these are viable changes or will there any performance
> > > issue
> > > > > if
> > > > > > >> these are done. Also any pointer on where to start(e.g, do I
> > > create
> > > > a
> > > > > > new
> > > > > > >> class similar to EvictingWindowOperator that extends
> > > > WindowOperator?)
> > > > > > >>
> > > > > > >> Thanks and Regards,
> > > > > > >> Vishnu Viswanath,
> > > > > > >>
> > > > > > >> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek <
> > > > [hidden email]
> > > > > >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> I did:
> > > > > > >>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCANMXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@...%3e
> > > > > > >>> ;-)
> > > > > > >>>
> > > > > > >>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <[hidden email]>
> > wrote:
> > > > > > >>>
> > > > > > >>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
> > > > > [hidden email]
> > > > > > >
> > > > > > >>>> wrote:
> > > > > > >>>>> In the future, it might be good to to discussions directly
> on
> > > the
> > > > > ML
> > > > > > >>> and
> > > > > > >>>>> then change the document accordingly. This way everyone can
> > > > follow
> > > > > > >> the
> > > > > > >>>>> discussion on the ML. I also feel that Google Doc comments
> > > often
> > > > > > >> don't
> > > > > > >>>> give
> > > > > > >>>>> enough space for expressing more complex opinions.
> > > > > > >>>>
> > > > > > >>>> I agree! Would you mind raising this point as a separate
> > > > discussion
> > > > > on
> > > > > > >>> dev@
> > > > > > >>>> ?
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
12