Hey Flink developers,
I'm trying to understand the behavior of timestamp assignments in Apache Flink. Here's my current understanding: 1. A *TimestampAssigner* returns the timestamp of each incoming message, while a *WatermarkAssigner* emits the watermark for a source. (If per-operator watermarks are defined, they over-ride the watermarks generated from the source) 2. For each operator, its watermark is determined as the minimum of all the incoming watermarks from its prior operators. I was not sure on how the *timestamp* of the output from an operator is determined? For instance, let's say, for a pane emitted from an *EventTimeTumblingWindow*, what is its "timestamp"? What is the timestamp of a late-arrival? If I perform another EventTime window, on the output from a previous window, is it defined? If there are pointers to the source code I need to look at, I'd appreciate that as well. Thank you for the help. -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University |
Each event in the stream has an associated timestamp as metadata. A
timestamp assigner simply extracts the timestamp from the user object for that purpose. There is no per-operator watermark, but with `assignTimestampsAndWatermarks` you may insert an operator that overrides upstream watermarks. Watermarks are markers that flow through the stream to inform operators about the progression of time. Time-sensitive operators (e.g. window operators) use the watermark to take action at specific points in time, such as at the time boundary of a window. When a window function is invoked for a given window, any elements emitted by the function take the 'max timestamp' of the window. For example, if a given window spans 11:00 to 12:00, any elements produced by the window function will have a timestamp of 12:00. The window function is fired when the current time (as indicated by the watermark) reaches 12:00. Now, imagine that an event arrives after 12:00 that has a timestamp of, say 11:55. That record would be considered late, but logically still belongs in the 11:00-12:00 window. Assuming the window operator was configured with allowed lateness of at least 5 minutes, the window would be re-evaluated, and any elements produced would have a timestamp of 12:00. You can chain together window operators. For example, we described an "hourly" window above. A subsequent "daily" window could further aggregate the results of each hour. If a late firing were to occur in the "hourly" window operator, the subsequent "daily" window operator would observe a late element and apply its own lateness logic. Hope this helps! Eron On Wed, Dec 20, 2017 at 11:10 PM, Jagadish Venkatraman < [hidden email]> wrote: > Hey Flink developers, > > I'm trying to understand the behavior of timestamp assignments in Apache > Flink. > > Here's my current understanding: > > 1. A *TimestampAssigner* returns the timestamp of each incoming message, > while a *WatermarkAssigner* emits the watermark for a source. (If > per-operator watermarks are defined, they over-ride the watermarks > generated from the source) > 2. For each operator, its watermark is determined as the minimum of all the > incoming watermarks from its prior operators. > > I was not sure on how the *timestamp* of the output from an operator is > determined? For instance, let's say, for a pane emitted from an > *EventTimeTumblingWindow*, what is its "timestamp"? > > What is the timestamp of a late-arrival? > > If I perform another EventTime window, on the output from a previous > window, is it defined? > > If there are pointers to the source code I need to look at, I'd appreciate > that as well. Thank you for the help. > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University > |
Thanks for the super-helpful reply!
Would the late-firing from the "hourly" window be considered an on-time arrival for the subsequent "daily" window operator? Since, its windowing duration is much longer (1 day), I'd expect the "on-time/default" result emitted from the "daily" window to include the late-firings previous hourly windows? -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
That is correct, the window operator happily accepts late elements into the
applicable window(s). Though the element is late (with respect to the current watermark) it might still be included in the main firing, or provoke a late firing. Elements occurring downstream of an operator that uses late firing must undergo de-duplication by whatever means. Other solutions: leverage the side output feature of the window operator, or be more conservative with your watermarks (decreasing complexity at the expense of latency). -Eron On Fri, Dec 22, 2017 at 3:59 PM, Jagadish <[hidden email]> wrote: > Thanks for the super-helpful reply! > > Would the late-firing from the "hourly" window be considered an on-time > arrival for the subsequent "daily" window operator? Since, its windowing > duration is much longer (1 day), I'd expect the "on-time/default" result > emitted from the "daily" window to include the late-firings previous hourly > windows? > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
Free forum by Nabble | Edit this page |