Time-stamp of output from a window operator

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Time-stamp of output from a window operator

Jagadish
Hey Flink developers,

I'm trying to understand the behavior of timestamp assignments in Apache
Flink.

Here's my current understanding:

1. A *TimestampAssigner* returns the timestamp of each incoming message,
while a *WatermarkAssigner* emits the watermark for a source. (If
per-operator watermarks are defined, they over-ride the watermarks
generated from the source)
2. For each operator, its watermark is determined as the minimum of all the
incoming watermarks from its prior operators.

I was not sure on how the *timestamp* of the output from an operator is
determined? For instance, let's say, for a pane emitted from an
*EventTimeTumblingWindow*, what is its "timestamp"?

What is the timestamp of a late-arrival?

If I perform another EventTime window, on the output from a previous
window, is it defined?

If there are pointers to the source code I need to look at, I'd appreciate
that as well. Thank you for the help.

--
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University
Reply | Threaded
Open this post in threaded view
|

Re: Time-stamp of output from a window operator

Eron Wright-2
Each event in the stream has an associated timestamp as metadata.  A
timestamp assigner simply extracts the timestamp from the user object for
that purpose.

There is no per-operator watermark, but with
`assignTimestampsAndWatermarks` you may insert an operator that overrides
upstream watermarks.  Watermarks are markers that flow through the stream
to inform operators about the progression of time.   Time-sensitive
operators (e.g. window operators) use the watermark to take action at
specific points in time, such as at the time boundary of a window.

When a window function is invoked for a given window, any elements emitted
by the function take the 'max timestamp' of the window.   For example, if a
given window spans 11:00 to 12:00, any elements produced by the window
function will have a timestamp of 12:00.   The window function is fired
when the current time (as indicated by the watermark) reaches 12:00.   Now,
imagine that an event arrives after 12:00 that has a timestamp of, say
11:55.   That record would be considered late, but logically still belongs
in the 11:00-12:00 window.   Assuming the window operator was configured
with allowed lateness of at least 5 minutes, the window would be
re-evaluated, and any elements produced would have a timestamp of 12:00.

You can chain together window operators.  For example, we described an
"hourly" window above.  A subsequent "daily" window could further aggregate
the results of each hour.    If a late firing were to occur in the "hourly"
window operator, the subsequent "daily" window operator would observe a
late element and apply its own lateness logic.

Hope this helps!
Eron



On Wed, Dec 20, 2017 at 11:10 PM, Jagadish Venkatraman <
[hidden email]> wrote:

> Hey Flink developers,
>
> I'm trying to understand the behavior of timestamp assignments in Apache
> Flink.
>
> Here's my current understanding:
>
> 1. A *TimestampAssigner* returns the timestamp of each incoming message,
> while a *WatermarkAssigner* emits the watermark for a source. (If
> per-operator watermarks are defined, they over-ride the watermarks
> generated from the source)
> 2. For each operator, its watermark is determined as the minimum of all the
> incoming watermarks from its prior operators.
>
> I was not sure on how the *timestamp* of the output from an operator is
> determined? For instance, let's say, for a pane emitted from an
> *EventTimeTumblingWindow*, what is its "timestamp"?
>
> What is the timestamp of a late-arrival?
>
> If I perform another EventTime window, on the output from a previous
> window, is it defined?
>
> If there are pointers to the source code I need to look at, I'd appreciate
> that as well. Thank you for the help.
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>
Reply | Threaded
Open this post in threaded view
|

Re: Time-stamp of output from a window operator

Jagadish
Thanks for the super-helpful reply!

Would the late-firing from the "hourly" window be considered an on-time
arrival for the subsequent "daily" window operator?   Since, its windowing
duration is much longer (1 day), I'd expect the "on-time/default" result
emitted from the "daily" window to include the late-firings previous hourly
windows?



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Time-stamp of output from a window operator

Eron Wright-2
That is correct, the window operator happily accepts late elements into the
applicable window(s).  Though the element is late (with respect to the
current watermark) it might still be included in the main firing, or
provoke a late firing.

Elements occurring downstream of an operator that uses late firing must
undergo de-duplication by whatever means.  Other solutions: leverage the
side output feature of the window operator, or be more conservative with
your watermarks (decreasing complexity at the expense of latency).

-Eron

On Fri, Dec 22, 2017 at 3:59 PM, Jagadish <[hidden email]> wrote:

> Thanks for the super-helpful reply!
>
> Would the late-firing from the "hourly" window be considered an on-time
> arrival for the subsequent "daily" window operator?   Since, its windowing
> duration is much longer (1 day), I'd expect the "on-time/default" result
> emitted from the "daily" window to include the late-firings previous hourly
> windows?
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>