[DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

Senhong Liu
This post was updated on .
Hi guys,

We would like to start a discussion on the new FLIP about rejecting
checkpoints on the operator level. The basic idea is to allow the operator
to reject a checkpoint when it is not under a proper situation and therefore return
a proper failure reason.

http://cwiki.apache.org/confluence/display/FLINK/FLIP-170+Adding+Checkpoint+Rejection+Mechanism


Looking forward to your participation!

Best,
Senhong



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

Senhong Liu
Here is some brief context about the new feature.

1. Actively checkpoint rejecting by the operator. Follow by the current
checkpoint mechanism, one more preliminary step is added to help the
operator determine that if it is able to take snapshots. The preliminary
step is a new API provided to the users/developers. The new API will be
implemented in the Source API (the new one based on FLIP-27) for CDC
implementation. The new API can also be implemented in other operator if
necessary.

2. Handling the failure returned from the operator. If the checkpoint is
rejected by the operator, an appropriate failure reason needs to be returned
from the operator as well. In the current design, two failure reasons are
listed, soft failure and hard failure. The previous one would be ignored by
the Flink and the later one would be counted as continuous checkpoint
failure according to the current checkpoint failure manager mechanism.

3. To prevent that the operator keeps reporting soft failure and therefore
no checkpoint can be completed for a long time, we introduce a new
configuration about the tolerable checkpoint failure timeout, which is a
timer that starts with the checkpoint scheduler. Overall, the timer would
only be reset if and only if the checkpoint completes. Otherwise, it would
do nothing until the tolerable timeout is hit. If the timer rings, it would
then trigger the current checkpoint failover.

Question:
a. According to the current design, the checkpoint might fail for a possibly
long time with a large checkpoint interval, for example. Is there any better
idea to make the checkpoint more likely to succeed? For example, trigger the
checkpoint immediately after the last one is rejected. But it seems
unappropriate because that would increase the overhead.
b. Is there any better idea on handling the soft failure?





--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

Piotr Nowojski-5
Hi Senhong,

Thanks for the proposal. I have a couple of questions.

Have you seen
`org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource` (for
the legacy SourceFunction) and
`org.apache.flink.api.connector.source.ExternallyInducedSourceReader` (for
FLIP-27) interfaces? They work the other way around, by letting the source
to trigger/initiate a checkpoint, instead of declining it. Could it be made
to work in your use case? If not, can you explain why?

Regarding declining/failing the checkpoint (without blocking the barrier
waiting for snapshot availability), can not you achieve the same thing by a
combination of throwing an exception in for example
`org.apache.flink.api.connector.source.SourceReader#snapshotState` call and
setting the tolerable checkpoint failure number? [1]

Best, Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-

śr., 9 cze 2021 o 09:11 Senhong Liu <[hidden email]> napisał(a):

> Here is some brief context about the new feature.
>
> 1. Actively checkpoint rejecting by the operator. Follow by the current
> checkpoint mechanism, one more preliminary step is added to help the
> operator determine that if it is able to take snapshots. The preliminary
> step is a new API provided to the users/developers. The new API will be
> implemented in the Source API (the new one based on FLIP-27) for CDC
> implementation. The new API can also be implemented in other operator if
> necessary.
>
> 2. Handling the failure returned from the operator. If the checkpoint is
> rejected by the operator, an appropriate failure reason needs to be
> returned
> from the operator as well. In the current design, two failure reasons are
> listed, soft failure and hard failure. The previous one would be ignored by
> the Flink and the later one would be counted as continuous checkpoint
> failure according to the current checkpoint failure manager mechanism.
>
> 3. To prevent that the operator keeps reporting soft failure and therefore
> no checkpoint can be completed for a long time, we introduce a new
> configuration about the tolerable checkpoint failure timeout, which is a
> timer that starts with the checkpoint scheduler. Overall, the timer would
> only be reset if and only if the checkpoint completes. Otherwise, it would
> do nothing until the tolerable timeout is hit. If the timer rings, it would
> then trigger the current checkpoint failover.
>
> Question:
> a. According to the current design, the checkpoint might fail for a
> possibly
> long time with a large checkpoint interval, for example. Is there any
> better
> idea to make the checkpoint more likely to succeed? For example, trigger
> the
> checkpoint immediately after the last one is rejected. But it seems
> unappropriate because that would increase the overhead.
> b. Is there any better idea on handling the soft failure?
>
>
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

Senhong Liu
Hi Piotrek,

Thanks for your feedback!

1. Why not ExternallyInducedSourceReader/ExternallyInducedSource?
a. The
`org.apache.flink.api.connector.source.ExternallyInducedSourceReader` and
`org.apache.flink.api.connector.source.ExternallyInducedSource` seems like
playing the role of checkpoint coordinator. Once it is implmented, the
source reader might need to design the similar logic as what the checkpoint
coordinator does. I think it would be better to let the checkpoint
coordinator plays its own role of triggering the checkpoint.
b. The new interface can not only implemented in the source operator, but
also the other operators. However, I am not having a solid use case about
implementing it to downstream operator so far. So basically it's for the
future compatibility.

2. Why not exception?
Actually, I don't think rejecting a checkpoint is an exception. Just like
the soft failure I introduced in the FLIP, the rejection and
therefore checkpoint failure could be acceptable to the user. However, the
tolerable checkpoint number is only counting on those failures that are NOT
acceptable to the users, e.g., checkpoint expiration.



Piotr Nowojski <[hidden email]> 于2021年6月9日周三 下午3:37写道:

> Hi Senhong,
>
> Thanks for the proposal. I have a couple of questions.
>
> Have you seen
> `org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource` (for
> the legacy SourceFunction) and
> `org.apache.flink.api.connector.source.ExternallyInducedSourceReader` (for
> FLIP-27) interfaces? They work the other way around, by letting the source
> to trigger/initiate a checkpoint, instead of declining it. Could it be made
> to work in your use case? If not, can you explain why?
>
> Regarding declining/failing the checkpoint (without blocking the barrier
> waiting for snapshot availability), can not you achieve the same thing by a
> combination of throwing an exception in for example
> `org.apache.flink.api.connector.source.SourceReader#snapshotState` call and
> setting the tolerable checkpoint failure number? [1]
>
> Best, Piotrek
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-
>
> śr., 9 cze 2021 o 09:11 Senhong Liu <[hidden email]> napisał(a):
>
> > Here is some brief context about the new feature.
> >
> > 1. Actively checkpoint rejecting by the operator. Follow by the current
> > checkpoint mechanism, one more preliminary step is added to help the
> > operator determine that if it is able to take snapshots. The preliminary
> > step is a new API provided to the users/developers. The new API will be
> > implemented in the Source API (the new one based on FLIP-27) for CDC
> > implementation. The new API can also be implemented in other operator if
> > necessary.
> >
> > 2. Handling the failure returned from the operator. If the checkpoint is
> > rejected by the operator, an appropriate failure reason needs to be
> > returned
> > from the operator as well. In the current design, two failure reasons are
> > listed, soft failure and hard failure. The previous one would be ignored
> by
> > the Flink and the later one would be counted as continuous checkpoint
> > failure according to the current checkpoint failure manager mechanism.
> >
> > 3. To prevent that the operator keeps reporting soft failure and
> therefore
> > no checkpoint can be completed for a long time, we introduce a new
> > configuration about the tolerable checkpoint failure timeout, which is a
> > timer that starts with the checkpoint scheduler. Overall, the timer would
> > only be reset if and only if the checkpoint completes. Otherwise, it
> would
> > do nothing until the tolerable timeout is hit. If the timer rings, it
> would
> > then trigger the current checkpoint failover.
> >
> > Question:
> > a. According to the current design, the checkpoint might fail for a
> > possibly
> > long time with a large checkpoint interval, for example. Is there any
> > better
> > idea to make the checkpoint more likely to succeed? For example, trigger
> > the
> > checkpoint immediately after the last one is rejected. But it seems
> > unappropriate because that would increase the overhead.
> > b. Is there any better idea on handling the soft failure?
> >
> >
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >
>