Roman Khachatryan created FLINK-22377:
----------------------------------------- Summary: Ignore if writer is stopped when aborting channel state writing Key: FLINK-22377 URL: https://issues.apache.org/jira/browse/FLINK-22377 Project: Flink Issue Type: Improvement Components: Runtime / Network, Runtime / Task Affects Versions: 1.13.0 Reporter: Roman Khachatryan When channel state write is being aborted it can happen that writer is already stopped. In that case, an error will be thrown: {code} 2021-04-19 09:05:52,716 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask Could not perform checkpoint 401 for operator Source: Custom Source (5/6)#0 while the invokable was not in state running. java.lang.RuntimeException: unable to send request to worker at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:229) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.abort(ChannelStateWriterImpl.java:190) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.cleanup(SubtaskCheckpointCoordinatorImpl.java:472) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1062) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1046) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:964) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:936) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) [flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) [flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) [flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) [?:?] Caused by: java.lang.IllegalStateException: not running at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitPriority(ChannelStateWriteRequestExecutorImpl.java:133) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:224) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] ... 18 more {code} Need to - check whether abort calls are valid - don't throw an exception if not running for abort calls - add exception as suppressed (if any) in SubtaskCheckpointCoordinatorImpl.cleanup -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |