Hey all,
I've been trying to debug a job recovery performance issue and I'm noticing some interesting events in the timeline that seem unexpected to me. Here's a brief outline of the first checkpoint following a job restart: 1. All tasks are deployed and transition into the RUNNING state. 2. I see logs for a subset of initializeState calls ("{} - restoring state" from TwoPhaseCommitSinkFunction) 3. A checkpoint gets triggered "Triggering checkpoint {} @ {} for job {}." 4. I see more "{} - restoring state" logs. 5. Checkpoint completes "Completed checkpoint {} for job {} ({} bytes in {} ms)." The 2 questions I have are: Are the initializations in 4) in the middle of a checkpoint expected? Since all the tasks transition in 1) I would think that they are initialized there as well. Are the initializations in 4) causing the checkpoint to take longer to complete? During the checkpoint, I do see "{} - checkpoint {} complete, committing transaction {} from checkpoint {}" logs (TwoPhaseCommitSinkFunction's notifyCheckpointComplete method) which suggests that the kafka producers in 2) and 4) are contributing to the checkpoint. Thanks! -Teng |
Hi Teng,
I think if the system is slowed down enough it can happen that some parts of the graph are still restoring while others are already taking a checkpoint. By virtue of how checkpointing works (by sending barriers along the network connections between tasks) this should not be a problem, though. It would be good to check in the logs if for all individual tasks it holds that "restoring" comes before "checkpointing". Best, Aljoscha On 29.09.20 04:00, Teng Fei Liao wrote: > Hey all, > > I've been trying to debug a job recovery performance issue and I'm noticing > some interesting events in the timeline that seem unexpected to me. Here's > a brief outline of the first checkpoint following a job restart: > > 1. All tasks are deployed and transition into the RUNNING state. > 2. I see logs for a subset of initializeState calls ("{} - restoring state" > from TwoPhaseCommitSinkFunction) > 3. A checkpoint gets triggered "Triggering checkpoint {} @ {} for job {}." > 4. I see more "{} - restoring state" logs. > 5. Checkpoint completes "Completed checkpoint {} for job {} ({} bytes in {} > ms)." > > The 2 questions I have are: > Are the initializations in 4) in the middle of a checkpoint expected? Since > all the tasks transition in 1) I would think that they are initialized > there as well. > > Are the initializations in 4) causing the checkpoint to take longer to > complete? During the checkpoint, I do see "{} - checkpoint {} complete, > committing transaction {} from checkpoint {}" logs > (TwoPhaseCommitSinkFunction's notifyCheckpointComplete method) which > suggests that the kafka producers in 2) and 4) are contributing to the > checkpoint. > > Thanks! > > -Teng > |
Hi Teng
As Aljoscha said, this should not be a problem as some parts are processing records while some are still restoring in the whole graph. For the 2nd question "Are the initializations in 4) causing the checkpoint to take longer to complete", the answer is yes. As the stream task turns into RUNNING stats [1] first and then initializeStateAndOpenOperators [2] before processing any input record or barrier. In other words, if the operator cannot finish the process to restore state, the operator cannot process input checkpoint barriers. [1] https://github.com/apache/flink/blob/713d02ef5cc5c668ecaef700257c893201080657/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L711 [2] https://github.com/apache/flink/blob/713d02ef5cc5c668ecaef700257c893201080657/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L496 Best Yun Tang ________________________________ From: Aljoscha Krettek <[hidden email]> Sent: Tuesday, September 29, 2020 20:51 To: [hidden email] <[hidden email]> Subject: Re: CheckpointedFunction initialization during checkpoint Hi Teng, I think if the system is slowed down enough it can happen that some parts of the graph are still restoring while others are already taking a checkpoint. By virtue of how checkpointing works (by sending barriers along the network connections between tasks) this should not be a problem, though. It would be good to check in the logs if for all individual tasks it holds that "restoring" comes before "checkpointing". Best, Aljoscha On 29.09.20 04:00, Teng Fei Liao wrote: > Hey all, > > I've been trying to debug a job recovery performance issue and I'm noticing > some interesting events in the timeline that seem unexpected to me. Here's > a brief outline of the first checkpoint following a job restart: > > 1. All tasks are deployed and transition into the RUNNING state. > 2. I see logs for a subset of initializeState calls ("{} - restoring state" > from TwoPhaseCommitSinkFunction) > 3. A checkpoint gets triggered "Triggering checkpoint {} @ {} for job {}." > 4. I see more "{} - restoring state" logs. > 5. Checkpoint completes "Completed checkpoint {} for job {} ({} bytes in {} > ms)." > > The 2 questions I have are: > Are the initializations in 4) in the middle of a checkpoint expected? Since > all the tasks transition in 1) I would think that they are initialized > there as well. > > Are the initializations in 4) causing the checkpoint to take longer to > complete? During the checkpoint, I do see "{} - checkpoint {} complete, > committing transaction {} from checkpoint {}" logs > (TwoPhaseCommitSinkFunction's notifyCheckpointComplete method) which > suggests that the kafka producers in 2) and 4) are contributing to the > checkpoint. > > Thanks! > > -Teng > |
Free forum by Nabble | Edit this page |