Anton Kalashnikov created FLINK-22379:
----------------------------------------- Summary: Introduce a new JobStatus to avoid premature checkpoint triggering Key: FLINK-22379 URL: https://issues.apache.org/jira/browse/FLINK-22379 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Anton Kalashnikov Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator to trigger checkpoint which is ok. But unfortunately, JobStatus switches to RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this leads to several problems, one of them you can see in the log: {noformat} WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint for job bc943302f92d979824fbc8f4cabc5db3.)org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint triggering task Source: EventSource -> Timestamps/Watermarks (1/7) of job bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_272] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]{noformat} To avoid this problem, it is a good idea to introduce new JobStatus between CREATED and RUNNING(RESTORING?). And then: * JobStatus CREATED switches to RESTORING at the same time when right now CREATED switches to RUNNING * JobStatus RESTORING switches to RUNNING when all tasks switched their states from INITIALIZING to RUNNING It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order to have the same name for job and task. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |