This post was updated on .
Hi guys,
We would like to start a discussion on the new FLIP about rejecting checkpoints on the operator level. The basic idea is to allow the operator to reject a checkpoint when it is not under a proper situation and therefore return a proper failure reason. http://cwiki.apache.org/confluence/display/FLINK/FLIP-170+Adding+Checkpoint+Rejection+Mechanism Looking forward to your participation! Best, Senhong -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Here is some brief context about the new feature.
1. Actively checkpoint rejecting by the operator. Follow by the current checkpoint mechanism, one more preliminary step is added to help the operator determine that if it is able to take snapshots. The preliminary step is a new API provided to the users/developers. The new API will be implemented in the Source API (the new one based on FLIP-27) for CDC implementation. The new API can also be implemented in other operator if necessary. 2. Handling the failure returned from the operator. If the checkpoint is rejected by the operator, an appropriate failure reason needs to be returned from the operator as well. In the current design, two failure reasons are listed, soft failure and hard failure. The previous one would be ignored by the Flink and the later one would be counted as continuous checkpoint failure according to the current checkpoint failure manager mechanism. 3. To prevent that the operator keeps reporting soft failure and therefore no checkpoint can be completed for a long time, we introduce a new configuration about the tolerable checkpoint failure timeout, which is a timer that starts with the checkpoint scheduler. Overall, the timer would only be reset if and only if the checkpoint completes. Otherwise, it would do nothing until the tolerable timeout is hit. If the timer rings, it would then trigger the current checkpoint failover. Question: a. According to the current design, the checkpoint might fail for a possibly long time with a large checkpoint interval, for example. Is there any better idea to make the checkpoint more likely to succeed? For example, trigger the checkpoint immediately after the last one is rejected. But it seems unappropriate because that would increase the overhead. b. Is there any better idea on handling the soft failure? -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Hi Senhong,
Thanks for the proposal. I have a couple of questions. Have you seen `org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource` (for the legacy SourceFunction) and `org.apache.flink.api.connector.source.ExternallyInducedSourceReader` (for FLIP-27) interfaces? They work the other way around, by letting the source to trigger/initiate a checkpoint, instead of declining it. Could it be made to work in your use case? If not, can you explain why? Regarding declining/failing the checkpoint (without blocking the barrier waiting for snapshot availability), can not you achieve the same thing by a combination of throwing an exception in for example `org.apache.flink.api.connector.source.SourceReader#snapshotState` call and setting the tolerable checkpoint failure number? [1] Best, Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int- śr., 9 cze 2021 o 09:11 Senhong Liu <[hidden email]> napisał(a): > Here is some brief context about the new feature. > > 1. Actively checkpoint rejecting by the operator. Follow by the current > checkpoint mechanism, one more preliminary step is added to help the > operator determine that if it is able to take snapshots. The preliminary > step is a new API provided to the users/developers. The new API will be > implemented in the Source API (the new one based on FLIP-27) for CDC > implementation. The new API can also be implemented in other operator if > necessary. > > 2. Handling the failure returned from the operator. If the checkpoint is > rejected by the operator, an appropriate failure reason needs to be > returned > from the operator as well. In the current design, two failure reasons are > listed, soft failure and hard failure. The previous one would be ignored by > the Flink and the later one would be counted as continuous checkpoint > failure according to the current checkpoint failure manager mechanism. > > 3. To prevent that the operator keeps reporting soft failure and therefore > no checkpoint can be completed for a long time, we introduce a new > configuration about the tolerable checkpoint failure timeout, which is a > timer that starts with the checkpoint scheduler. Overall, the timer would > only be reset if and only if the checkpoint completes. Otherwise, it would > do nothing until the tolerable timeout is hit. If the timer rings, it would > then trigger the current checkpoint failover. > > Question: > a. According to the current design, the checkpoint might fail for a > possibly > long time with a large checkpoint interval, for example. Is there any > better > idea to make the checkpoint more likely to succeed? For example, trigger > the > checkpoint immediately after the last one is rejected. But it seems > unappropriate because that would increase the overhead. > b. Is there any better idea on handling the soft failure? > > > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
Hi Piotrek,
Thanks for your feedback! 1. Why not ExternallyInducedSourceReader/ExternallyInducedSource? a. The `org.apache.flink.api.connector.source.ExternallyInducedSourceReader` and `org.apache.flink.api.connector.source.ExternallyInducedSource` seems like playing the role of checkpoint coordinator. Once it is implmented, the source reader might need to design the similar logic as what the checkpoint coordinator does. I think it would be better to let the checkpoint coordinator plays its own role of triggering the checkpoint. b. The new interface can not only implemented in the source operator, but also the other operators. However, I am not having a solid use case about implementing it to downstream operator so far. So basically it's for the future compatibility. 2. Why not exception? Actually, I don't think rejecting a checkpoint is an exception. Just like the soft failure I introduced in the FLIP, the rejection and therefore checkpoint failure could be acceptable to the user. However, the tolerable checkpoint number is only counting on those failures that are NOT acceptable to the users, e.g., checkpoint expiration. Piotr Nowojski <[hidden email]> 于2021年6月9日周三 下午3:37写道: > Hi Senhong, > > Thanks for the proposal. I have a couple of questions. > > Have you seen > `org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource` (for > the legacy SourceFunction) and > `org.apache.flink.api.connector.source.ExternallyInducedSourceReader` (for > FLIP-27) interfaces? They work the other way around, by letting the source > to trigger/initiate a checkpoint, instead of declining it. Could it be made > to work in your use case? If not, can you explain why? > > Regarding declining/failing the checkpoint (without blocking the barrier > waiting for snapshot availability), can not you achieve the same thing by a > combination of throwing an exception in for example > `org.apache.flink.api.connector.source.SourceReader#snapshotState` call and > setting the tolerable checkpoint failure number? [1] > > Best, Piotrek > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int- > > śr., 9 cze 2021 o 09:11 Senhong Liu <[hidden email]> napisał(a): > > > Here is some brief context about the new feature. > > > > 1. Actively checkpoint rejecting by the operator. Follow by the current > > checkpoint mechanism, one more preliminary step is added to help the > > operator determine that if it is able to take snapshots. The preliminary > > step is a new API provided to the users/developers. The new API will be > > implemented in the Source API (the new one based on FLIP-27) for CDC > > implementation. The new API can also be implemented in other operator if > > necessary. > > > > 2. Handling the failure returned from the operator. If the checkpoint is > > rejected by the operator, an appropriate failure reason needs to be > > returned > > from the operator as well. In the current design, two failure reasons are > > listed, soft failure and hard failure. The previous one would be ignored > by > > the Flink and the later one would be counted as continuous checkpoint > > failure according to the current checkpoint failure manager mechanism. > > > > 3. To prevent that the operator keeps reporting soft failure and > therefore > > no checkpoint can be completed for a long time, we introduce a new > > configuration about the tolerable checkpoint failure timeout, which is a > > timer that starts with the checkpoint scheduler. Overall, the timer would > > only be reset if and only if the checkpoint completes. Otherwise, it > would > > do nothing until the tolerable timeout is hit. If the timer rings, it > would > > then trigger the current checkpoint failover. > > > > Question: > > a. According to the current design, the checkpoint might fail for a > > possibly > > long time with a large checkpoint interval, for example. Is there any > > better > > idea to make the checkpoint more likely to succeed? For example, trigger > > the > > checkpoint immediately after the last one is rejected. But it seems > > unappropriate because that would increase the overhead. > > b. Is there any better idea on handling the soft failure? > > > > > > > > > > > > -- > > Sent from: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > |
Free forum by Nabble | Edit this page |