Piotr Nowojski created FLINK-12872:
--------------------------------------
Summary: WindowOperator may fail with UnsupportedOperationException when merging windows
Key: FLINK-12872
URL:
https://issues.apache.org/jira/browse/FLINK-12872 Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.8.0, 1.7.2, 1.6.4
Reporter: Piotr Nowojski
[Reported |
http://mail-archives.apache.org/mod_mbox/flink-user/201906.mbox/%3CCALDWsfhbP6D9+pnTzYuGaP0V4nReKJ4s9VsG_Xe1hZJq4O=z9g@...%3E] by a user.
{noformat}
I have a job that uses processing time session window with inactivity gap of 60ms where I intermittently run into the following exception. I'm trying to figure out what happened here. Haven't been able to reproduce this scenario. Any thoughts?
java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1560493731808 window: TimeWindow{start=1560493731654, end=1560493731778}
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
{noformat}
This is happening probably because {{System.currentTimeMillis()}} is not a monotonic function and {{WindowOperator}} accesses it at least twice: once when it creates a window and second time during performing the above mentioned check (that has failed). However I would guess there are more places like this, not only in {{WindowOperator}}.
The fix could be either to make sure that processing time is monotonic, or to access it only once per operator per record or to drop processing time in favour of ingestion time.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)