Flink gives incorrect result when event time windowing used

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink gives incorrect result when event time windowing used

Jaromir Vanek
Hi,

I am using Flink 1.1.3 and following example doesn't work for me as expected.

I've got three input elements with similar timestamp (equaling to window maxTimestamp). I'm using event time notion of time with TumblingEventTimeWindows.

I would expect all three elements to be processed in the same window, because they all have the identical event time timestamp. But the result I'm getting is just the first element that triggers the window. The rest of elements are considered as late-comers and discarded.

From my point of view this is definitely not correct and should be fixed. Could you clarify if this is correct behavior or bug?

I think the problem is in WindowOperator#processWatermark. Timer should be fired if and only if the current watermark is strictly larger than registered timer.


Timer<K, W> timer = watermarkTimersQueue.peek();
if (timer != null && timer.timestamp <= mark.getTimestamp()) {


Thanks
Jaromir Vanek


public class WindowingTest {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.createLocalEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    List<Tuple2<Instant, Integer>> elements = Arrays.asList(
            new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100),
            new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200),
            new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300)
    );

    DataStreamSource<Tuple2<Instant, Integer>> input = env.fromCollection(elements);

    SingleOutputStreamOperator<Tuple2<Instant, Integer>> timestamped =
            input.assignTimestampsAndWatermarks(new PunctuatedAssigner());

    timestamped.timeWindowAll(Time.minutes(1))
         .sum(1)
         .print();

    // printed result
    // (2016-12-19T10:59:59.999Z,100)

    env.execute();
  }

  private static class PunctuatedAssigner
          implements AssignerWithPunctuatedWatermarks<Tuple2<Instant, Integer>> {

    @Override
    public long extractTimestamp(Tuple2<Instant, Integer> element, long previousElementTimestamp) {
      return element.f0.toEpochMilli();
    }

    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer> lastElement, long extractedTimestamp) {
      return new Watermark(extractedTimestamp);
    }
  }
}
Reply | Threaded
Open this post in threaded view
|

Re: Flink gives incorrect result when event time windowing used

Fabian Hueske-2
Hi Jaromir,

thank you very much for reporting this issue.
The behavior you are describing is not in line with the documentation of
watermarks [1] which clearly states that a watermark of time t tells the
system that no more events with a timestamp < t will occur (otherwise they
would be considered as late events). Hence, events with a timestamp = t as
in your case should be OK and not be considered late.

I think this is not intended and probably a bug.

I'll loop in some contributors who are more familiar with watermarks and
event-time (cc Aljoscha, Kostas K, Stephan).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks

2016-12-20 14:56 GMT+01:00 Jaromir Vanek <[hidden email]>:

> Hi,
>
> I am using Flink 1.1.3 and following example doesn't work for me as
> expected.
>
> I've got three input elements with similar timestamp (equaling to window
> maxTimestamp). I'm using /event time/ notion of time with
> /TumblingEventTimeWindows/.
>
> I would expect all three elements to be processed in the same window,
> because they all have the identical event time timestamp. But the result
> I'm
> getting is just the first element that triggers the window. The rest of
> elements are considered as late-comers and discarded.
>
> From my point of view this is definitely not correct and should be fixed.
> Could you clarify if this is correct behavior or bug?
>
> I think the problem is in /WindowOperator#processWatermark/. Timer should
> be
> fired if and only if the current watermark is strictly larger than
> registered timer.
>
> /
> Timer<K, W> timer = watermarkTimersQueue.peek();
> if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> /
>
> Thanks
> Jaromir Vanek
>
> /
> public class WindowingTest {
>
>   public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
>             StreamExecutionEnvironment.createLocalEnvironment();
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>     List<Tuple2&lt;Instant, Integer>> elements = Arrays.asList(
>             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100),
>             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200),
>             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300)
>     );
>
>     DataStreamSource<Tuple2&lt;Instant, Integer>> input =
> env.fromCollection(elements);
>
>     SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> timestamped =
>             input.assignTimestampsAndWatermarks(new PunctuatedAssigner());
>
>     timestamped.timeWindowAll(Time.minutes(1))
>          .sum(1)
>          .print();
>
>     // printed result
>     // (2016-12-19T10:59:59.999Z,100)
>
>     env.execute();
>   }
>
>   private static class PunctuatedAssigner
>           implements AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant,
> Integer>> {
>
>     @Override
>     public long extractTimestamp(Tuple2<Instant, Integer> element, long
> previousElementTimestamp) {
>       return element.f0.toEpochMilli();
>     }
>
>     @Override
>     public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer>
> lastElement, long extractedTimestamp) {
>       return new Watermark(extractedTimestamp);
>     }
>   }
> }
> /
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Flink-gives-
> incorrect-result-when-event-time-windowing-used-tp15058.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink gives incorrect result when event time windowing used

Aljoscha Krettek-2
I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about
watermarks:

"A Watermark tells operators that receive it that no elements with a
timestamp older or equal to the watermark timestamp should arrive at the
operator."

The system also relies on this fact, as visible in how timers are read from
the watermark timers queue and in AscendingTimestampExtractor, which has
this code:

public final Watermark getCurrentWatermark() {
    return new Watermark(currentTimestamp == Long.MIN_VALUE ?
Long.MIN_VALUE : currentTimestamp - 1);
}

Notice, how the watermark is "currentTimestamp - 1" where current timestamp
is the highest seen timestamp so far and where we assume monotonically
ascending timestamps.

Cheers,
Aljoscha

On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <[hidden email]> wrote:

> Hi Jaromir,
>
> thank you very much for reporting this issue.
> The behavior you are describing is not in line with the documentation of
> watermarks [1] which clearly states that a watermark of time t tells the
> system that no more events with a timestamp < t will occur (otherwise they
> would be considered as late events). Hence, events with a timestamp = t as
> in your case should be OK and not be considered late.
>
> I think this is not intended and probably a bug.
>
> I'll loop in some contributors who are more familiar with watermarks and
> event-time (cc Aljoscha, Kostas K, Stephan).
>
> Best, Fabian
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
>
> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <[hidden email]>:
>
> > Hi,
> >
> > I am using Flink 1.1.3 and following example doesn't work for me as
> > expected.
> >
> > I've got three input elements with similar timestamp (equaling to window
> > maxTimestamp). I'm using /event time/ notion of time with
> > /TumblingEventTimeWindows/.
> >
> > I would expect all three elements to be processed in the same window,
> > because they all have the identical event time timestamp. But the result
> > I'm
> > getting is just the first element that triggers the window. The rest of
> > elements are considered as late-comers and discarded.
> >
> > From my point of view this is definitely not correct and should be fixed.
> > Could you clarify if this is correct behavior or bug?
> >
> > I think the problem is in /WindowOperator#processWatermark/. Timer should
> > be
> > fired if and only if the current watermark is strictly larger than
> > registered timer.
> >
> > /
> > Timer<K, W> timer = watermarkTimersQueue.peek();
> > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> > /
> >
> > Thanks
> > Jaromir Vanek
> >
> > /
> > public class WindowingTest {
> >
> >   public static void main(String[] args) throws Exception {
> >     StreamExecutionEnvironment env =
> >             StreamExecutionEnvironment.createLocalEnvironment();
> >
> >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >     List<Tuple2&lt;Instant, Integer>> elements = Arrays.asList(
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100),
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200),
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300)
> >     );
> >
> >     DataStreamSource<Tuple2&lt;Instant, Integer>> input =
> > env.fromCollection(elements);
> >
> >     SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> timestamped =
> >             input.assignTimestampsAndWatermarks(new
> PunctuatedAssigner());
> >
> >     timestamped.timeWindowAll(Time.minutes(1))
> >          .sum(1)
> >          .print();
> >
> >     // printed result
> >     // (2016-12-19T10:59:59.999Z,100)
> >
> >     env.execute();
> >   }
> >
> >   private static class PunctuatedAssigner
> >           implements AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant,
> > Integer>> {
> >
> >     @Override
> >     public long extractTimestamp(Tuple2<Instant, Integer> element, long
> > previousElementTimestamp) {
> >       return element.f0.toEpochMilli();
> >     }
> >
> >     @Override
> >     public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer>
> > lastElement, long extractedTimestamp) {
> >       return new Watermark(extractedTimestamp);
> >     }
> >   }
> > }
> > /
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-mailing-
> > list-archive.1008284.n3.nabble.com/Flink-gives-
> > incorrect-result-when-event-time-windowing-used-tp15058.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink gives incorrect result when event time windowing used

Fabian Hueske-2
Thanks for the clarification Aljoscha.

I added https://issues.apache.org/jira/browse/FLINK-5375 to fix this issue.

Best, Fabian

2016-12-20 17:58 GMT+01:00 Aljoscha Krettek <[hidden email]>:

> I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about
> watermarks:
>
> "A Watermark tells operators that receive it that no elements with a
> timestamp older or equal to the watermark timestamp should arrive at the
> operator."
>
> The system also relies on this fact, as visible in how timers are read from
> the watermark timers queue and in AscendingTimestampExtractor, which has
> this code:
>
> public final Watermark getCurrentWatermark() {
>     return new Watermark(currentTimestamp == Long.MIN_VALUE ?
> Long.MIN_VALUE : currentTimestamp - 1);
> }
>
> Notice, how the watermark is "currentTimestamp - 1" where current timestamp
> is the highest seen timestamp so far and where we assume monotonically
> ascending timestamps.
>
> Cheers,
> Aljoscha
>
> On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <[hidden email]> wrote:
>
> > Hi Jaromir,
> >
> > thank you very much for reporting this issue.
> > The behavior you are describing is not in line with the documentation of
> > watermarks [1] which clearly states that a watermark of time t tells the
> > system that no more events with a timestamp < t will occur (otherwise
> they
> > would be considered as late events). Hence, events with a timestamp = t
> as
> > in your case should be OK and not be considered late.
> >
> > I think this is not intended and probably a bug.
> >
> > I'll loop in some contributors who are more familiar with watermarks and
> > event-time (cc Aljoscha, Kostas K, Stephan).
> >
> > Best, Fabian
> >
> > [1]
> >
> > https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
> >
> > 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <[hidden email]>:
> >
> > > Hi,
> > >
> > > I am using Flink 1.1.3 and following example doesn't work for me as
> > > expected.
> > >
> > > I've got three input elements with similar timestamp (equaling to
> window
> > > maxTimestamp). I'm using /event time/ notion of time with
> > > /TumblingEventTimeWindows/.
> > >
> > > I would expect all three elements to be processed in the same window,
> > > because they all have the identical event time timestamp. But the
> result
> > > I'm
> > > getting is just the first element that triggers the window. The rest of
> > > elements are considered as late-comers and discarded.
> > >
> > > From my point of view this is definitely not correct and should be
> fixed.
> > > Could you clarify if this is correct behavior or bug?
> > >
> > > I think the problem is in /WindowOperator#processWatermark/. Timer
> should
> > > be
> > > fired if and only if the current watermark is strictly larger than
> > > registered timer.
> > >
> > > /
> > > Timer<K, W> timer = watermarkTimersQueue.peek();
> > > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> > > /
> > >
> > > Thanks
> > > Jaromir Vanek
> > >
> > > /
> > > public class WindowingTest {
> > >
> > >   public static void main(String[] args) throws Exception {
> > >     StreamExecutionEnvironment env =
> > >             StreamExecutionEnvironment.createLocalEnvironment();
> > >
> > >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >
> > >     List<Tuple2&lt;Instant, Integer>> elements = Arrays.asList(
> > >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> 100),
> > >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> 200),
> > >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> 300)
> > >     );
> > >
> > >     DataStreamSource<Tuple2&lt;Instant, Integer>> input =
> > > env.fromCollection(elements);
> > >
> > >     SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>>
> timestamped =
> > >             input.assignTimestampsAndWatermarks(new
> > PunctuatedAssigner());
> > >
> > >     timestamped.timeWindowAll(Time.minutes(1))
> > >          .sum(1)
> > >          .print();
> > >
> > >     // printed result
> > >     // (2016-12-19T10:59:59.999Z,100)
> > >
> > >     env.execute();
> > >   }
> > >
> > >   private static class PunctuatedAssigner
> > >           implements AssignerWithPunctuatedWatermar
> ks<Tuple2&lt;Instant,
> > > Integer>> {
> > >
> > >     @Override
> > >     public long extractTimestamp(Tuple2<Instant, Integer> element,
> long
> > > previousElementTimestamp) {
> > >       return element.f0.toEpochMilli();
> > >     }
> > >
> > >     @Override
> > >     public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer>
> > > lastElement, long extractedTimestamp) {
> > >       return new Watermark(extractedTimestamp);
> > >     }
> > >   }
> > > }
> > > /
> > >
> > >
> > >
> > > --
> > > View this message in context: http://apache-flink-mailing-
> > > list-archive.1008284.n3.nabble.com/Flink-gives-
> > > incorrect-result-when-event-time-windowing-used-tp15058.html
> > > Sent from the Apache Flink Mailing List archive. mailing list archive
> at
> > > Nabble.com.
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink gives incorrect result when event time windowing used

Jaromir Vanek
In reply to this post by Aljoscha Krettek-2
Aljoscha, thank you very much for explanation.

It seems that using AscendingTimestampExtractor would really solve my problem, because reading a watermark with "currentTimestamp - 1" is correct way to wait for all elements with identical timestamp.

But I can see this is not true for BoundedOutOfOrdernessTimestampExtractorwhere the watermark is used as is without "-1".

public final Watermark getCurrentWatermark() {
  // this guarantees that the watermark never goes backwards.
  long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
  if(potentialWM >= lastEmittedWatermark) {
    lastEmittedWatermark = potentialWM;
  }
  return new Watermark(lastEmittedWatermark);
}

I think those two implementation should use the same principle.

Aljoscha Krettek-2 wrote
I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about
watermarks:

"A Watermark tells operators that receive it that no elements with a
timestamp older or equal to the watermark timestamp should arrive at the
operator."

The system also relies on this fact, as visible in how timers are read from
the watermark timers queue and in AscendingTimestampExtractor, which has
this code:

public final Watermark getCurrentWatermark() {
    return new Watermark(currentTimestamp == Long.MIN_VALUE ?
Long.MIN_VALUE : currentTimestamp - 1);
}

Notice, how the watermark is "currentTimestamp - 1" where current timestamp
is the highest seen timestamp so far and where we assume monotonically
ascending timestamps.

Cheers,
Aljoscha

On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <[hidden email]> wrote:

> Hi Jaromir,
>
> thank you very much for reporting this issue.
> The behavior you are describing is not in line with the documentation of
> watermarks [1] which clearly states that a watermark of time t tells the
> system that no more events with a timestamp < t will occur (otherwise they
> would be considered as late events). Hence, events with a timestamp = t as
> in your case should be OK and not be considered late.
>
> I think this is not intended and probably a bug.
>
> I'll loop in some contributors who are more familiar with watermarks and
> event-time (cc Aljoscha, Kostas K, Stephan).
>
> Best, Fabian
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
>
> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <[hidden email]>:
>
> > Hi,
> >
> > I am using Flink 1.1.3 and following example doesn't work for me as
> > expected.
> >
> > I've got three input elements with similar timestamp (equaling to window
> > maxTimestamp). I'm using /event time/ notion of time with
> > /TumblingEventTimeWindows/.
> >
> > I would expect all three elements to be processed in the same window,
> > because they all have the identical event time timestamp. But the result
> > I'm
> > getting is just the first element that triggers the window. The rest of
> > elements are considered as late-comers and discarded.
> >
> > From my point of view this is definitely not correct and should be fixed.
> > Could you clarify if this is correct behavior or bug?
> >
> > I think the problem is in /WindowOperator#processWatermark/. Timer should
> > be
> > fired if and only if the current watermark is strictly larger than
> > registered timer.
> >
> > /
> > Timer<K, W> timer = watermarkTimersQueue.peek();
> > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> > /
> >
> > Thanks
> > Jaromir Vanek
> >
> > /
> > public class WindowingTest {
> >
> >   public static void main(String[] args) throws Exception {
> >     StreamExecutionEnvironment env =
> >             StreamExecutionEnvironment.createLocalEnvironment();
> >
> >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >     List<Tuple2&lt;Instant, Integer>> elements = Arrays.asList(
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100),
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200),
> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300)
> >     );
> >
> >     DataStreamSource<Tuple2&lt;Instant, Integer>> input =
> > env.fromCollection(elements);
> >
> >     SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> timestamped =
> >             input.assignTimestampsAndWatermarks(new
> PunctuatedAssigner());
> >
> >     timestamped.timeWindowAll(Time.minutes(1))
> >          .sum(1)
> >          .print();
> >
> >     // printed result
> >     // (2016-12-19T10:59:59.999Z,100)
> >
> >     env.execute();
> >   }
> >
> >   private static class PunctuatedAssigner
> >           implements AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant,
> > Integer>> {
> >
> >     @Override
> >     public long extractTimestamp(Tuple2<Instant, Integer> element, long
> > previousElementTimestamp) {
> >       return element.f0.toEpochMilli();
> >     }
> >
> >     @Override
> >     public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer>
> > lastElement, long extractedTimestamp) {
> >       return new Watermark(extractedTimestamp);
> >     }
> >   }
> > }
> > /
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-mailing-
> > list-archive.1008284.n3.nabble.com/Flink-gives-
> > incorrect-result-when-event-time-windowing-used-tp15058.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink gives incorrect result when event time windowing used

Aljoscha Krettek-2
Yes, it would seem that the bounded-out-of-orderness extractor has an
off-by-one error. We should fix it. In most practical cases these errors
should not change results by much, however (IMHO).

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 22:43 Jaromir Vanek <[hidden email]> wrote:

> Aljoscha, thank you very much for explanation.
>
> It seems that using /AscendingTimestampExtractor/ would really solve my
> problem, because reading a watermark with "currentTimestamp - 1" is correct
> way to wait for all elements with identical timestamp.
>
> But I can see this is not true for
> /BoundedOutOfOrdernessTimestampExtractor/where the watermark is used as is
> without "-1".
>
> public final Watermark getCurrentWatermark() {
>   // this guarantees that the watermark never goes backwards.
>   long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
>   if(potentialWM >= lastEmittedWatermark) {
>     lastEmittedWatermark = potentialWM;
>   }
>   return new Watermark(lastEmittedWatermark);
> }
>
> I think those two implementation should use the same principle.
>
>
> Aljoscha Krettek-2 wrote
> > I'm afraid the doc is wrong here. The JavaDoc on Watermark says this
> about
> > watermarks:
> >
> > "A Watermark tells operators that receive it that no elements with a
> > timestamp older or equal to the watermark timestamp should arrive at the
> > operator."
> >
> > The system also relies on this fact, as visible in how timers are read
> > from
> > the watermark timers queue and in AscendingTimestampExtractor, which has
> > this code:
> >
> > public final Watermark getCurrentWatermark() {
> >     return new Watermark(currentTimestamp == Long.MIN_VALUE ?
> > Long.MIN_VALUE : currentTimestamp - 1);
> > }
> >
> > Notice, how the watermark is "currentTimestamp - 1" where current
> > timestamp
> > is the highest seen timestamp so far and where we assume monotonically
> > ascending timestamps.
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 20 Dec 2016 at 15:28 Fabian Hueske &lt;
>
> > fhueske@
>
> > &gt; wrote:
> >
> >> Hi Jaromir,
> >>
> >> thank you very much for reporting this issue.
> >> The behavior you are describing is not in line with the documentation of
> >> watermarks [1] which clearly states that a watermark of time t tells the
> >> system that no more events with a timestamp < t will occur (otherwise
> >> they
> >> would be considered as late events). Hence, events with a timestamp = t
> >> as
> >> in your case should be OK and not be considered late.
> >>
> >> I think this is not intended and probably a bug.
> >>
> >> I'll loop in some contributors who are more familiar with watermarks and
> >> event-time (cc Aljoscha, Kostas K, Stephan).
> >>
> >> Best, Fabian
> >>
> >> [1]
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
> >>
> >> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek &lt;
>
> > vanek.jaromir@
>
> > &gt;:
> >>
> >> > Hi,
> >> >
> >> > I am using Flink 1.1.3 and following example doesn't work for me as
> >> > expected.
> >> >
> >> > I've got three input elements with similar timestamp (equaling to
> >> window
> >> > maxTimestamp). I'm using /event time/ notion of time with
> >> > /TumblingEventTimeWindows/.
> >> >
> >> > I would expect all three elements to be processed in the same window,
> >> > because they all have the identical event time timestamp. But the
> >> result
> >> > I'm
> >> > getting is just the first element that triggers the window. The rest
> of
> >> > elements are considered as late-comers and discarded.
> >> >
> >> > From my point of view this is definitely not correct and should be
> >> fixed.
> >> > Could you clarify if this is correct behavior or bug?
> >> >
> >> > I think the problem is in /WindowOperator#processWatermark/. Timer
> >> should
> >> > be
> >> > fired if and only if the current watermark is strictly larger than
> >> > registered timer.
> >> >
> >> > /
> >> > Timer&lt;K, W&gt; timer = watermarkTimersQueue.peek();
> >> > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> >> > /
> >> >
> >> > Thanks
> >> > Jaromir Vanek
> >> >
> >> > /
> >> > public class WindowingTest {
> >> >
> >> >   public static void main(String[] args) throws Exception {
> >> >     StreamExecutionEnvironment env =
> >> >             StreamExecutionEnvironment.createLocalEnvironment();
> >> >
> >> >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> >
> >> >     List&lt;Tuple2&amp;lt;Instant, Integer&gt;> elements =
> >> Arrays.asList(
> >> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> >> 100),
> >> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> >> 200),
> >> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> >> 300)
> >> >     );
> >> >
> >> >     DataStreamSource&lt;Tuple2&amp;lt;Instant, Integer&gt;> input =
> >> > env.fromCollection(elements);
> >> >
> >> >     SingleOutputStreamOperator&lt;Tuple2&amp;lt;Instant, Integer&gt;>
> >> timestamped =
> >> >             input.assignTimestampsAndWatermarks(new
> >> PunctuatedAssigner());
> >> >
> >> >     timestamped.timeWindowAll(Time.minutes(1))
> >> >          .sum(1)
> >> >          .print();
> >> >
> >> >     // printed result
> >> >     // (2016-12-19T10:59:59.999Z,100)
> >> >
> >> >     env.execute();
> >> >   }
> >> >
> >> >   private static class PunctuatedAssigner
> >> >           implements
> >> AssignerWithPunctuatedWatermarks&lt;Tuple2&amp;lt;Instant,
> > &gt; > Integer>> {
> >> >
> >> >     @Override
> >> >     public long extractTimestamp(Tuple2&lt;Instant, Integer&gt;
> >> element, long
> >> > previousElementTimestamp) {
> >> >       return element.f0.toEpochMilli();
> >> >     }
> >> >
> >> >     @Override
> >> >     public Watermark checkAndGetNextWatermark(Tuple2&lt;Instant,
> >> Integer&gt;
> >> > lastElement, long extractedTimestamp) {
> >> >       return new Watermark(extractedTimestamp);
> >> >     }
> >> >   }
> >> > }
> >> > /
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context: http://apache-flink-mailing-
> >> > list-archive.1008284.n3.nabble.com/Flink-gives-
> >> > incorrect-result-when-event-time-windowing-used-tp15058.html
> >> > Sent from the Apache Flink Mailing List archive. mailing list archive
> >> at
> >> > Nabble.com.
> >> >
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-gives-incorrect-result-when-event-time-windowing-used-tp15058p15093.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>