Xiaojun Jin created FLINK-5947:
----------------------------------
Summary: NullPointerException in ContinuousProcessingTimeTrigger.clear()
Key: FLINK-5947
URL:
https://issues.apache.org/jira/browse/FLINK-5947 Project: Flink
Issue Type: Bug
Components: DataStream API
Affects Versions: 1.2.0
Reporter: Xiaojun Jin
Priority: Critical
The fireTimestamp may be null when deleting processing timer in the ContinuousProcessingTimerTrigger. Exception stack is as follows:
{quote}
Caused by: java.lang.NullPointerException
at org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger.clear(ContinuousProcessingTimeTrigger.java:89)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:761)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:348)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:336)
at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:210)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:336)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:70)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
at java.lang.Thread.run(Thread.java:745)
{quote}
The patch is as follows:
{code}
@@ -86,9 +86,10 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
- long timestamp = fireTimestamp.get();
- ctx.deleteProcessingTimeTimer(timestamp);
- fireTimestamp.clear();
+ if (fireTimestamp.get() != null) {
+ ctx.deleteProcessingTimeTimer(fireTimestamp.get());
+ fireTimestamp.clear();
+ }
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)