[DISCUSS] Watermark boundary condition

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Watermark boundary condition

Eron Wright-2
Hello,

I think I see a bug in a few places related to determining whether an input
is to be considered late.  Some components use the logic that (timestamp <=
watermark) is considered late.  Others use (timestamp < watermark).  I
think the former is correct according to the definition in Watermark.java.

Compare:
https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L574

https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala#L116

https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java#L168

I might be misunderstanding the above snippets, it was a cursory look.

-Eron
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Watermark boundary condition

Aljoscha Krettek-2
Hi,

Excellent, thanks for pointing this out! The definition in Watermark.java, which is used by HeapInternalTimerService.java, WindowOperator.isWindowLate() and WindowOperator.isElementLate() and is also documented in [1] is the definitive (ha!) definition.

I created Jira issues for fixing these:
 - FLINK-7563 - Fix watermark semantics in CEP operators <https://issues.apache.org/jira/browse/FLINK-7563>
 - FLINK-7564 - Fix Watermark semantics in Table API <https://issues.apache.org/jira/browse/FLINK-7564>

Best,
Aljoscha

> On 30. Aug 2017, at 20:37, Eron Wright <[hidden email]> wrote:
>
> Hello,
>
> I think I see a bug in a few places related to determining whether an input
> is to be considered late.  Some components use the logic that (timestamp <=
> watermark) is considered late.  Others use (timestamp < watermark).  I
> think the former is correct according to the definition in Watermark.java.
>
> Compare:
> https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L574
>
> https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala#L116
>
> https://github.com/apache/flink/blob/df7452d9811b0aa88919d7e3c1f6c34b36ac9b38/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java#L168
>
> I might be misunderstanding the above snippets, it was a cursory look.
>
> -Eron