[jira] [Created] (FLINK-2589) Threads created in TimeTriggerPolicy don't end properly

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-2589) Threads created in TimeTriggerPolicy don't end properly

Shang Yuanchun (Jira)
Arnaud Linz created FLINK-2589:
----------------------------------

             Summary: Threads created in TimeTriggerPolicy don't end properly
                 Key: FLINK-2589
                 URL: https://issues.apache.org/jira/browse/FLINK-2589
             Project: Flink
          Issue Type: Bug
          Components: Streaming
            Reporter: Arnaud Linz
            Priority: Minor


TimeTriggerPolicy uses a thread (TimeCheck) to push fake events in case of time out. However, this threads implements a infinite loop and ignore InterruptExceptions : it never ends properly.

Once created, it continues to push fake events even if the execution is over, polluting the standard error with stacktraces because the fake element post fails. This  especially occurs in unit tests using local clusters, because the JVM does not end.

Stack trace extract :

java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:291)
        at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:276)
        at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
        at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
        at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:44)
        at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:57)
        at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:62)
        at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:47)
        at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288)
        ... 9 more
Caused by: java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:291)
        at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:276)
        at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
        at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
        at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288)
        ... 17 more
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:37)
        at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
        at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
        at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:43)
        at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288)
        ... 22 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
        at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
        ... 27 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)