Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Posted by Aljoscha Krettek-2 on
URL: http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-FLIP-147-Support-Checkpoints-After-Tasks-Finished-tp45522p47577.html

On 2021/01/05 10:16, Arvid Heise wrote:
>1. I'd think that this is an orthogonal issue, which I'd solve separately.
>My gut feeling says that this is something we should only address for new
>sinks where we decouple the semantics of commits and checkpoints
>anyways. @Aljoscha
>Krettek <[hidden email]> any idea on this one?

I also think it's somewhat orthogonal, let's see where we land here once
the other issues are hammered out.

>2. I'm not sure I get it completely. Let's assume we have a source
>partition that is finished before the first checkpoint. Then, we would need
>to store the finished state of the subtask somehow. So I'm assuming, we
>still need to trigger some checkpointing code on finished subtasks.

What he's talking about here is the race condition between a) checkpoint
coordinator decides to do a checkpoint and b) a source operator shuts
down.

Normally, the checkpoint coordinator only needs to trigger sources, and
not intermediate operators. When we allow sources to shut down,
intermediate operators now can become the "head" of a pipeline and
become the things that need to be triggered.

One thought here is this: will there ever be intermediate operators that
should be running that are not connected to at least once source? The
only case I can think of right now is async I/O. Or are there others? If
we think that there will never be intermediate operators that are not
connected to at least once source we might come up with a simpler
solution.

>3. Do we really want to store the finished flag in OperatorState? I was
>assuming we want to have it more fine-grained on OperatorSubtaskState.
>Maybe we can store the flag inside managed or raw state without changing
>the format?

I think we cannot store it in `OperatorSubtaskState` because of how
operator state (the actual `ListState` that operators use) is reshuffled
on restore to all operators. So normally it doesn't make sense to say
that one of the subtasks is done when operator state is involved. Only
when all subtasks are done can we record this operator as done, I think.

Best,
Aljoscha