ming li created FLINK-21990:
------------------------------- Summary: Double check Task status when perform checkpoint. Key: FLINK-21990 URL: https://issues.apache.org/jira/browse/FLINK-21990 Project: Flink Issue Type: Bug Affects Versions: 1.11.0 Reporter: ming li We need to double check Task status when making Checkpoint. Otherwise, after a Task failed, the checkpoint may still be made successfully. For example, I try to throw an exception at 17:10:24.069, get the lock at 17:10:24.070 and start making Checkpoint, and finish making Checkpoint at 17:10:24.373. {code:java} 17:10:24.069 [Legacy Source Thread - Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO org.apache.flink.test.checkpointing.RegionCheckpointITCase - throw expected exception 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0,5,Flink Task Threads] took 0 ms. 17:10:24.070 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO org.apache.flink.test.checkpointing.RegionCheckpointITCase - sleep 300 ms 17:10:24.372 [Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0] INFO org.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Source_Custom_Source -> Sink_Unnamed (2/4)- execution # 0,5,Flink Task Threads] took 0 ms. 17:10:24.373 [jobmanager-future-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job 4a08c4a50d00dfd56f86eb6ccb83b89c (0 bytes in 1137 ms). {code} From the code point of view, we only judged the state of the task at the beginning, and when the lock was obtained, we directly started to make the Checkpoint. {code:java} private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws Exception { if (isRunning) { actionExecutor.runThrowing( () -> {//do checkpoint}); return true; } else { ... } }{code} However, during the period of acquiring the lock, the task state is likely to change. Compared with the Flink 1.9 version code, the 1.9 version judges the task status after acquiring the lock. {code:java} private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics, boolean advanceToEndOfTime) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); final long checkpointId = checkpointMetaData.getCheckpointId(); synchronized (lock) { if (isRunning) { //do checkpoint } else { ... } }{code} Therefore, I think we need to double check the task status to avoid the situation where the task fails but the Checkpoint can still succeed in the process of acquiring the lock. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |