Hi,
I am using Flink 1.1.3 and following example doesn't work for me as expected. I've got three input elements with similar timestamp (equaling to window maxTimestamp). I'm using event time notion of time with TumblingEventTimeWindows. I would expect all three elements to be processed in the same window, because they all have the identical event time timestamp. But the result I'm getting is just the first element that triggers the window. The rest of elements are considered as late-comers and discarded. From my point of view this is definitely not correct and should be fixed. Could you clarify if this is correct behavior or bug? I think the problem is in WindowOperator#processWatermark. Timer should be fired if and only if the current watermark is strictly larger than registered timer. Timer<K, W> timer = watermarkTimersQueue.peek(); if (timer != null && timer.timestamp <= mark.getTimestamp()) { Thanks Jaromir Vanek public class WindowingTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); List<Tuple2<Instant, Integer>> elements = Arrays.asList( new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100), new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200), new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300) ); DataStreamSource<Tuple2<Instant, Integer>> input = env.fromCollection(elements); SingleOutputStreamOperator<Tuple2<Instant, Integer>> timestamped = input.assignTimestampsAndWatermarks(new PunctuatedAssigner()); timestamped.timeWindowAll(Time.minutes(1)) .sum(1) .print(); // printed result // (2016-12-19T10:59:59.999Z,100) env.execute(); } private static class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<Tuple2<Instant, Integer>> { @Override public long extractTimestamp(Tuple2<Instant, Integer> element, long previousElementTimestamp) { return element.f0.toEpochMilli(); } @Override public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer> lastElement, long extractedTimestamp) { return new Watermark(extractedTimestamp); } } } |
Hi Jaromir,
thank you very much for reporting this issue. The behavior you are describing is not in line with the documentation of watermarks [1] which clearly states that a watermark of time t tells the system that no more events with a timestamp < t will occur (otherwise they would be considered as late events). Hence, events with a timestamp = t as in your case should be OK and not be considered late. I think this is not intended and probably a bug. I'll loop in some contributors who are more familiar with watermarks and event-time (cc Aljoscha, Kostas K, Stephan). Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <[hidden email]>: > Hi, > > I am using Flink 1.1.3 and following example doesn't work for me as > expected. > > I've got three input elements with similar timestamp (equaling to window > maxTimestamp). I'm using /event time/ notion of time with > /TumblingEventTimeWindows/. > > I would expect all three elements to be processed in the same window, > because they all have the identical event time timestamp. But the result > I'm > getting is just the first element that triggers the window. The rest of > elements are considered as late-comers and discarded. > > From my point of view this is definitely not correct and should be fixed. > Could you clarify if this is correct behavior or bug? > > I think the problem is in /WindowOperator#processWatermark/. Timer should > be > fired if and only if the current watermark is strictly larger than > registered timer. > > / > Timer<K, W> timer = watermarkTimersQueue.peek(); > if (timer != null && timer.timestamp <= mark.getTimestamp()) { > / > > Thanks > Jaromir Vanek > > / > public class WindowingTest { > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > List<Tuple2<Instant, Integer>> elements = Arrays.asList( > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100), > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200), > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300) > ); > > DataStreamSource<Tuple2<Instant, Integer>> input = > env.fromCollection(elements); > > SingleOutputStreamOperator<Tuple2<Instant, Integer>> timestamped = > input.assignTimestampsAndWatermarks(new PunctuatedAssigner()); > > timestamped.timeWindowAll(Time.minutes(1)) > .sum(1) > .print(); > > // printed result > // (2016-12-19T10:59:59.999Z,100) > > env.execute(); > } > > private static class PunctuatedAssigner > implements AssignerWithPunctuatedWatermarks<Tuple2<Instant, > Integer>> { > > @Override > public long extractTimestamp(Tuple2<Instant, Integer> element, long > previousElementTimestamp) { > return element.f0.toEpochMilli(); > } > > @Override > public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer> > lastElement, long extractedTimestamp) { > return new Watermark(extractedTimestamp); > } > } > } > / > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/Flink-gives- > incorrect-result-when-event-time-windowing-used-tp15058.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about
watermarks: "A Watermark tells operators that receive it that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator." The system also relies on this fact, as visible in how timers are read from the watermark timers queue and in AscendingTimestampExtractor, which has this code: public final Watermark getCurrentWatermark() { return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1); } Notice, how the watermark is "currentTimestamp - 1" where current timestamp is the highest seen timestamp so far and where we assume monotonically ascending timestamps. Cheers, Aljoscha On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <[hidden email]> wrote: > Hi Jaromir, > > thank you very much for reporting this issue. > The behavior you are describing is not in line with the documentation of > watermarks [1] which clearly states that a watermark of time t tells the > system that no more events with a timestamp < t will occur (otherwise they > would be considered as late events). Hence, events with a timestamp = t as > in your case should be OK and not be considered late. > > I think this is not intended and probably a bug. > > I'll loop in some contributors who are more familiar with watermarks and > event-time (cc Aljoscha, Kostas K, Stephan). > > Best, Fabian > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks > > 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <[hidden email]>: > > > Hi, > > > > I am using Flink 1.1.3 and following example doesn't work for me as > > expected. > > > > I've got three input elements with similar timestamp (equaling to window > > maxTimestamp). I'm using /event time/ notion of time with > > /TumblingEventTimeWindows/. > > > > I would expect all three elements to be processed in the same window, > > because they all have the identical event time timestamp. But the result > > I'm > > getting is just the first element that triggers the window. The rest of > > elements are considered as late-comers and discarded. > > > > From my point of view this is definitely not correct and should be fixed. > > Could you clarify if this is correct behavior or bug? > > > > I think the problem is in /WindowOperator#processWatermark/. Timer should > > be > > fired if and only if the current watermark is strictly larger than > > registered timer. > > > > / > > Timer<K, W> timer = watermarkTimersQueue.peek(); > > if (timer != null && timer.timestamp <= mark.getTimestamp()) { > > / > > > > Thanks > > Jaromir Vanek > > > > / > > public class WindowingTest { > > > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.createLocalEnvironment(); > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > List<Tuple2<Instant, Integer>> elements = Arrays.asList( > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100), > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200), > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300) > > ); > > > > DataStreamSource<Tuple2<Instant, Integer>> input = > > env.fromCollection(elements); > > > > SingleOutputStreamOperator<Tuple2<Instant, Integer>> timestamped = > > input.assignTimestampsAndWatermarks(new > PunctuatedAssigner()); > > > > timestamped.timeWindowAll(Time.minutes(1)) > > .sum(1) > > .print(); > > > > // printed result > > // (2016-12-19T10:59:59.999Z,100) > > > > env.execute(); > > } > > > > private static class PunctuatedAssigner > > implements AssignerWithPunctuatedWatermarks<Tuple2<Instant, > > Integer>> { > > > > @Override > > public long extractTimestamp(Tuple2<Instant, Integer> element, long > > previousElementTimestamp) { > > return element.f0.toEpochMilli(); > > } > > > > @Override > > public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer> > > lastElement, long extractedTimestamp) { > > return new Watermark(extractedTimestamp); > > } > > } > > } > > / > > > > > > > > -- > > View this message in context: http://apache-flink-mailing- > > list-archive.1008284.n3.nabble.com/Flink-gives- > > incorrect-result-when-event-time-windowing-used-tp15058.html > > Sent from the Apache Flink Mailing List archive. mailing list archive at > > Nabble.com. > > > |
Thanks for the clarification Aljoscha.
I added https://issues.apache.org/jira/browse/FLINK-5375 to fix this issue. Best, Fabian 2016-12-20 17:58 GMT+01:00 Aljoscha Krettek <[hidden email]>: > I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about > watermarks: > > "A Watermark tells operators that receive it that no elements with a > timestamp older or equal to the watermark timestamp should arrive at the > operator." > > The system also relies on this fact, as visible in how timers are read from > the watermark timers queue and in AscendingTimestampExtractor, which has > this code: > > public final Watermark getCurrentWatermark() { > return new Watermark(currentTimestamp == Long.MIN_VALUE ? > Long.MIN_VALUE : currentTimestamp - 1); > } > > Notice, how the watermark is "currentTimestamp - 1" where current timestamp > is the highest seen timestamp so far and where we assume monotonically > ascending timestamps. > > Cheers, > Aljoscha > > On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <[hidden email]> wrote: > > > Hi Jaromir, > > > > thank you very much for reporting this issue. > > The behavior you are describing is not in line with the documentation of > > watermarks [1] which clearly states that a watermark of time t tells the > > system that no more events with a timestamp < t will occur (otherwise > they > > would be considered as late events). Hence, events with a timestamp = t > as > > in your case should be OK and not be considered late. > > > > I think this is not intended and probably a bug. > > > > I'll loop in some contributors who are more familiar with watermarks and > > event-time (cc Aljoscha, Kostas K, Stephan). > > > > Best, Fabian > > > > [1] > > > > https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/event_time.html#event-time-and-watermarks > > > > 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <[hidden email]>: > > > > > Hi, > > > > > > I am using Flink 1.1.3 and following example doesn't work for me as > > > expected. > > > > > > I've got three input elements with similar timestamp (equaling to > window > > > maxTimestamp). I'm using /event time/ notion of time with > > > /TumblingEventTimeWindows/. > > > > > > I would expect all three elements to be processed in the same window, > > > because they all have the identical event time timestamp. But the > result > > > I'm > > > getting is just the first element that triggers the window. The rest of > > > elements are considered as late-comers and discarded. > > > > > > From my point of view this is definitely not correct and should be > fixed. > > > Could you clarify if this is correct behavior or bug? > > > > > > I think the problem is in /WindowOperator#processWatermark/. Timer > should > > > be > > > fired if and only if the current watermark is strictly larger than > > > registered timer. > > > > > > / > > > Timer<K, W> timer = watermarkTimersQueue.peek(); > > > if (timer != null && timer.timestamp <= mark.getTimestamp()) { > > > / > > > > > > Thanks > > > Jaromir Vanek > > > > > > / > > > public class WindowingTest { > > > > > > public static void main(String[] args) throws Exception { > > > StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.createLocalEnvironment(); > > > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > > > List<Tuple2<Instant, Integer>> elements = Arrays.asList( > > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > 100), > > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > 200), > > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > 300) > > > ); > > > > > > DataStreamSource<Tuple2<Instant, Integer>> input = > > > env.fromCollection(elements); > > > > > > SingleOutputStreamOperator<Tuple2<Instant, Integer>> > timestamped = > > > input.assignTimestampsAndWatermarks(new > > PunctuatedAssigner()); > > > > > > timestamped.timeWindowAll(Time.minutes(1)) > > > .sum(1) > > > .print(); > > > > > > // printed result > > > // (2016-12-19T10:59:59.999Z,100) > > > > > > env.execute(); > > > } > > > > > > private static class PunctuatedAssigner > > > implements AssignerWithPunctuatedWatermar > ks<Tuple2<Instant, > > > Integer>> { > > > > > > @Override > > > public long extractTimestamp(Tuple2<Instant, Integer> element, > long > > > previousElementTimestamp) { > > > return element.f0.toEpochMilli(); > > > } > > > > > > @Override > > > public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer> > > > lastElement, long extractedTimestamp) { > > > return new Watermark(extractedTimestamp); > > > } > > > } > > > } > > > / > > > > > > > > > > > > -- > > > View this message in context: http://apache-flink-mailing- > > > list-archive.1008284.n3.nabble.com/Flink-gives- > > > incorrect-result-when-event-time-windowing-used-tp15058.html > > > Sent from the Apache Flink Mailing List archive. mailing list archive > at > > > Nabble.com. > > > > > > |
In reply to this post by Aljoscha Krettek-2
Aljoscha, thank you very much for explanation.
It seems that using AscendingTimestampExtractor would really solve my problem, because reading a watermark with "currentTimestamp - 1" is correct way to wait for all elements with identical timestamp. But I can see this is not true for BoundedOutOfOrdernessTimestampExtractorwhere the watermark is used as is without "-1". public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if(potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } I think those two implementation should use the same principle.
|
Yes, it would seem that the bounded-out-of-orderness extractor has an
off-by-one error. We should fix it. In most practical cases these errors should not change results by much, however (IMHO). Cheers, Aljoscha On Wed, 21 Dec 2016 at 22:43 Jaromir Vanek <[hidden email]> wrote: > Aljoscha, thank you very much for explanation. > > It seems that using /AscendingTimestampExtractor/ would really solve my > problem, because reading a watermark with "currentTimestamp - 1" is correct > way to wait for all elements with identical timestamp. > > But I can see this is not true for > /BoundedOutOfOrdernessTimestampExtractor/where the watermark is used as is > without "-1". > > public final Watermark getCurrentWatermark() { > // this guarantees that the watermark never goes backwards. > long potentialWM = currentMaxTimestamp - maxOutOfOrderness; > if(potentialWM >= lastEmittedWatermark) { > lastEmittedWatermark = potentialWM; > } > return new Watermark(lastEmittedWatermark); > } > > I think those two implementation should use the same principle. > > > Aljoscha Krettek-2 wrote > > I'm afraid the doc is wrong here. The JavaDoc on Watermark says this > about > > watermarks: > > > > "A Watermark tells operators that receive it that no elements with a > > timestamp older or equal to the watermark timestamp should arrive at the > > operator." > > > > The system also relies on this fact, as visible in how timers are read > > from > > the watermark timers queue and in AscendingTimestampExtractor, which has > > this code: > > > > public final Watermark getCurrentWatermark() { > > return new Watermark(currentTimestamp == Long.MIN_VALUE ? > > Long.MIN_VALUE : currentTimestamp - 1); > > } > > > > Notice, how the watermark is "currentTimestamp - 1" where current > > timestamp > > is the highest seen timestamp so far and where we assume monotonically > > ascending timestamps. > > > > Cheers, > > Aljoscha > > > > On Tue, 20 Dec 2016 at 15:28 Fabian Hueske < > > > fhueske@ > > > > wrote: > > > >> Hi Jaromir, > >> > >> thank you very much for reporting this issue. > >> The behavior you are describing is not in line with the documentation of > >> watermarks [1] which clearly states that a watermark of time t tells the > >> system that no more events with a timestamp < t will occur (otherwise > >> they > >> would be considered as late events). Hence, events with a timestamp = t > >> as > >> in your case should be OK and not be considered late. > >> > >> I think this is not intended and probably a bug. > >> > >> I'll loop in some contributors who are more familiar with watermarks and > >> event-time (cc Aljoscha, Kostas K, Stephan). > >> > >> Best, Fabian > >> > >> [1] > >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks > >> > >> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek < > > > vanek.jaromir@ > > > >: > >> > >> > Hi, > >> > > >> > I am using Flink 1.1.3 and following example doesn't work for me as > >> > expected. > >> > > >> > I've got three input elements with similar timestamp (equaling to > >> window > >> > maxTimestamp). I'm using /event time/ notion of time with > >> > /TumblingEventTimeWindows/. > >> > > >> > I would expect all three elements to be processed in the same window, > >> > because they all have the identical event time timestamp. But the > >> result > >> > I'm > >> > getting is just the first element that triggers the window. The rest > of > >> > elements are considered as late-comers and discarded. > >> > > >> > From my point of view this is definitely not correct and should be > >> fixed. > >> > Could you clarify if this is correct behavior or bug? > >> > > >> > I think the problem is in /WindowOperator#processWatermark/. Timer > >> should > >> > be > >> > fired if and only if the current watermark is strictly larger than > >> > registered timer. > >> > > >> > / > >> > Timer<K, W> timer = watermarkTimersQueue.peek(); > >> > if (timer != null && timer.timestamp <= mark.getTimestamp()) { > >> > / > >> > > >> > Thanks > >> > Jaromir Vanek > >> > > >> > / > >> > public class WindowingTest { > >> > > >> > public static void main(String[] args) throws Exception { > >> > StreamExecutionEnvironment env = > >> > StreamExecutionEnvironment.createLocalEnvironment(); > >> > > >> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >> > > >> > List<Tuple2&lt;Instant, Integer>> elements = > >> Arrays.asList( > >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > >> 100), > >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > >> 200), > >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > >> 300) > >> > ); > >> > > >> > DataStreamSource<Tuple2&lt;Instant, Integer>> input = > >> > env.fromCollection(elements); > >> > > >> > SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> > >> timestamped = > >> > input.assignTimestampsAndWatermarks(new > >> PunctuatedAssigner()); > >> > > >> > timestamped.timeWindowAll(Time.minutes(1)) > >> > .sum(1) > >> > .print(); > >> > > >> > // printed result > >> > // (2016-12-19T10:59:59.999Z,100) > >> > > >> > env.execute(); > >> > } > >> > > >> > private static class PunctuatedAssigner > >> > implements > >> AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant, > > > > Integer>> { > >> > > >> > @Override > >> > public long extractTimestamp(Tuple2<Instant, Integer> > >> element, long > >> > previousElementTimestamp) { > >> > return element.f0.toEpochMilli(); > >> > } > >> > > >> > @Override > >> > public Watermark checkAndGetNextWatermark(Tuple2<Instant, > >> Integer> > >> > lastElement, long extractedTimestamp) { > >> > return new Watermark(extractedTimestamp); > >> > } > >> > } > >> > } > >> > / > >> > > >> > > >> > > >> > -- > >> > View this message in context: http://apache-flink-mailing- > >> > list-archive.1008284.n3.nabble.com/Flink-gives- > >> > incorrect-result-when-event-time-windowing-used-tp15058.html > >> > Sent from the Apache Flink Mailing List archive. mailing list archive > >> at > >> > Nabble.com. > >> > > >> > > > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-gives-incorrect-result-when-event-time-windowing-used-tp15058p15093.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Free forum by Nabble | Edit this page |