[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

classic Classic list List threaded Threaded
77 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek-2
On 2021/01/06 11:30, Arvid Heise wrote:
>I'm assuming that this is the normal case. In a A->B graph, as soon as A
>finishes, B still has a couple of input buffers to process. If you add
>backpressure or longer pipelines into the mix, it's quite likely that a
>checkpoint may occur with B being the head.

Ahh, I think I know what you mean. This can happen when the checkpoint
coordinator issues concurrent checkpoint without waiting for older ones
to finish. My head is mostly operating under the premise that there is
at most one concurrent checkpoint.

In the current code base the race conditions that Yun and I are talking
about cannot occur. Checkpoints can only be triggered at sources and
they will then travel through the graph. Intermediate operators are
never directly triggered from the JobManager/CheckpointCoordinator.

When source start to shut down, the JM has to directly inject/trigger
checkpoints at the now new "sources" of the graph, which have previously
been intermediate operators.

I want to repeat that I have a suspicion that maybe this is a degenerate
case and we never want to allow operators to be doing checkpoints when
they are not connected to at least one running source.  Which means that
we have to find a solution for declined checkpoints, missing sources.

I'll first show an example where I think we will never have intermediate
operators running without the sources being running:

Source -> Map -> Sink

Here, when the Source does its final checkpoint and then shuts down,
that same final checkpoint would travel downstream ahead of the EOF,
which would in turn cause Map and Sink to also shut down. *We can't have
the case that Map is still running when we want to take a checkpoint and
Source is not running*.

A similar case is this one:

Source1 --+
           |->Map -> Sink
Source2 --+

Here, if Source1 is finished but Source2 is not, Map is still connected
to at least one upstream source that is still running. Again. Map would
never be running and doing checkpoints if neither of Source1 or Source2
are online.

The cases I see where intermediate operators would keep running despite
not being connected to any upstream operators are when we purposefully
keep an operator online despite all inputs having seen EOF. One example
is async I/O, another is what Yun mentioned where a sink might want to
wait for another checkpoint to confirm some data. Example:

Source -> Async I/O -> Sink

Here, Async I/O will stay online as long as there are some scheduled
requests outstanding, even when the Source has shut down. In those
cases, the checkpoint coordinator would have to trigger new checkpoints
at Async I/O and not Source, because it has become the new "head" of the
graph.

For Async I/O at least, we could say that the operator will wait for all
outstanding requests to finish before it allows the final checkpoint and
passes the barrier forward.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
I was actually not thinking about concurrent checkpoints (and actually want
to get rid of them once UC is established, since they are addressing the
same thing).

But your explanation definitely helped me to better understand the race
condition.

However, I have the impression that you think mostly in terms of tasks and
I mostly think in terms of subtasks. I especially want to have proper
support for bounded sources where one partition is much larger than the
other partitions (might be in conjunction with unbounded sources such that
checkpointing is plausible to begin with). Hence, most of the subtasks are
finished with one struggler remaining. In this case, the barriers are
inserted now only in the struggling source subtask and potentially in any
running downstream subtask.
As far as I have understood, this would require barriers to be inserted
downstream leading to similar race conditions.

I'm also concerned about the notion of a final checkpoint. What happens
when this final checkpoint times out (checkpoint timeout > async timeout)
or fails for a different reason? I'm currently more inclined to just let
checkpoints work until the whole graph is completed (and thought this was
the initial goal of the whole FLIP to being with). However, that would
require subtasks to stay alive until they receive checkpiontCompleted
callback (which is currently also not guaranteed)...

On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <[hidden email]>
wrote:

> On 2021/01/06 11:30, Arvid Heise wrote:
> >I'm assuming that this is the normal case. In a A->B graph, as soon as A
> >finishes, B still has a couple of input buffers to process. If you add
> >backpressure or longer pipelines into the mix, it's quite likely that a
> >checkpoint may occur with B being the head.
>
> Ahh, I think I know what you mean. This can happen when the checkpoint
> coordinator issues concurrent checkpoint without waiting for older ones
> to finish. My head is mostly operating under the premise that there is
> at most one concurrent checkpoint.
>
> In the current code base the race conditions that Yun and I are talking
> about cannot occur. Checkpoints can only be triggered at sources and
> they will then travel through the graph. Intermediate operators are
> never directly triggered from the JobManager/CheckpointCoordinator.
>
> When source start to shut down, the JM has to directly inject/trigger
> checkpoints at the now new "sources" of the graph, which have previously
> been intermediate operators.
>
> I want to repeat that I have a suspicion that maybe this is a degenerate
> case and we never want to allow operators to be doing checkpoints when
> they are not connected to at least one running source.  Which means that
> we have to find a solution for declined checkpoints, missing sources.
>
> I'll first show an example where I think we will never have intermediate
> operators running without the sources being running:
>
> Source -> Map -> Sink
>
> Here, when the Source does its final checkpoint and then shuts down,
> that same final checkpoint would travel downstream ahead of the EOF,
> which would in turn cause Map and Sink to also shut down. *We can't have
> the case that Map is still running when we want to take a checkpoint and
> Source is not running*.
>
> A similar case is this one:
>
> Source1 --+
>            |->Map -> Sink
> Source2 --+
>
> Here, if Source1 is finished but Source2 is not, Map is still connected
> to at least one upstream source that is still running. Again. Map would
> never be running and doing checkpoints if neither of Source1 or Source2
> are online.
>
> The cases I see where intermediate operators would keep running despite
> not being connected to any upstream operators are when we purposefully
> keep an operator online despite all inputs having seen EOF. One example
> is async I/O, another is what Yun mentioned where a sink might want to
> wait for another checkpoint to confirm some data. Example:
>
> Source -> Async I/O -> Sink
>
> Here, Async I/O will stay online as long as there are some scheduled
> requests outstanding, even when the Source has shut down. In those
> cases, the checkpoint coordinator would have to trigger new checkpoints
> at Async I/O and not Source, because it has become the new "head" of the
> graph.
>
> For Async I/O at least, we could say that the operator will wait for all
> outstanding requests to finish before it allows the final checkpoint and
> passes the barrier forward.
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi Arvid,

Very thanks for the feedbacks! I'll try to answer the questions inline:

> I'm also concerned about the notion of a final checkpoint. What happens
> when this final checkpoint times out (checkpoint timeout > async timeout)
> or fails for a different reason? I'm currently more inclined to just let
> checkpoints work until the whole graph is completed (and thought this was
> the initial goal of the whole FLIP to being with).

I think we are still on the same page that we would like to trigger checkpoint periodically until the whole job is finished.
I think in generaly we do not must force the checkpoint aligned with subtask finished, namely for example one operator
might have the lifecycle that "taking one checkpoint -> emit some records -> taking another checkpoint -> emit more records -> finish",
and do not need to must have to wait for one more checkpoint before finished. The second checkpoint just happens to be the "final" checkpoint of this operator.
The only exception is that for sink operator that must wait for one more checkpoint to commit the last piece of data before finished, this kind of operators
would be dealt with separately to force them to wait for checkpont before finished.

> However, I have the impression that you think mostly in terms of tasks and
> I mostly think in terms of subtasks. I especially want to have proper
> support for bounded sources where one partition is much larger than the
>  other partitions (might be in conjunction with unbounded sources such that
> checkpointing is plausible to begin with). Hence, most of the subtasks are
> finished with one struggler remaining. In this case, the barriers are
> inserted now only in the struggling source subtask and potentially in any
> running downstream subtask.
> As far as I have understood, this would require barriers to be inserted
> downstream leading to similar race conditions.

I might not fully understand the issue, but I'd like to further detail
the expected process here:

Source (subtask 0) ---+
                                     |
Source (subtask 1) ---+--> Async I/O (subtask 0) -> Sink (subtask 0).
                                    |
Source (subtask 2) ---+


The async I/O subtask would have three input channels.

case 1) Support source subtask 0 and 1 are finished and the Async I/O would received EndOfPartition from the corresponding
channels. Now we happen to trigger a checkpoint, we in the remaining execution graph, the subtask 2 is the "source" of the
graph. Then we would trigger source subtask 2 to start the checkpoint, source subtask 2 takes snapshot and emit barriers to
Async I/O sutask. Async I/O subtask would found that 2/3 of its input channels have received Eof and received barrier from
the remaining channel, then it knows the barriers are aligned, then it takes the snapshot and emit the barrier to the sink subtasks.

case 2) Suppose the job continue to run and now source subtask 2 is also finished and now we are going to take another checkpoint,
then we found that in the remaining execution graph the new "source" now is the Async I/O subtask. Then we would trigger this
Async I/O instead (this is different from the current implementation). The Async I/O received the trigger and take its snapshot and
emit barrier to the following sink subtask. (Of couse here the Async I/O subtask should have some method to wait till it received EoF
from all the input channels before taking snapshot to keep consistent, but I think we could ignore the detail implementations first).

For the race condition, it might happen if
a) in case 1, the CheckpontCoordinator trigger Source subtask 2, but source subtask 2 report finished before the trigger RPC gets into the resided TaskManager.
b) in case 2, the CheckpointCoordinator trigger Async I/O, but Async I/O subtask report finished before the trigger RPC gets into the resided TaskManager.

In this case, if we do not deal with specially, based on the current implementation, the trigger RPC would just be ignored, and the checkpoint would finally
failed due to timeout since no tasks would report its state. But we would be able to remedy this checkpont: since the Source subtask 2 and the Async I/O
subtask would report FINISHED status to JobMaster after we tries to trigger the tasks, and before the task has reported its snapshot for this checkpont.
The CheckpontCoordinator would listen to the notification, when it received the notification, it would iterates its pending checkpoints to see
if it has trigger this task but received FINISHED before its snapshot. If so, it would recompute the subtasks to trigger, and re-trigger the following tasks.
Of couse this is one possible implementation and we might have other solutions to this problem. Do you think the process would still have some problems ?

> However, that would
> require subtasks to stay alive until they receive checkpiontCompleted
> callback (which is currently also not guaranteed)

With the above process, I think the task would do not need to wait for receiving the checkpontCompleted callback? If it finished, the above process
would try to trigger its following tasks.

Best,
Yun








 ------------------Original Mail ------------------
Sender:Arvid Heise <[hidden email]>
Send Date:Wed Jan 6 20:42:56 2021
Recipients:Aljoscha Krettek <[hidden email]>
CC:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
I was actually not thinking about concurrent checkpoints (and actually want
to get rid of them once UC is established, since they are addressing the
same thing).

But your explanation definitely helped me to better understand the race
condition.

However, I have the impression that you think mostly in terms of tasks and
I mostly think in terms of subtasks. I especially want to have proper
support for bounded sources where one partition is much larger than the
other partitions (might be in conjunction with unbounded sources such that
checkpointing is plausible to begin with). Hence, most of the subtasks are
finished with one struggler remaining. In this case, the barriers are
inserted now only in the struggling source subtask and potentially in any
running downstream subtask.
As far as I have understood, this would require barriers to be inserted
downstream leading to similar race conditions.

I'm also concerned about the notion of a final checkpoint. What happens
when this final checkpoint times out (checkpoint timeout > async timeout)
or fails for a different reason? I'm currently more inclined to just let
checkpoints work until the whole graph is completed (and thought this was
the initial goal of the whole FLIP to being with). However, that would
require subtasks to stay alive until they receive checkpiontCompleted
callback (which is currently also not guaranteed)...

On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <[hidden email]>
wrote:

> On 2021/01/06 11:30, Arvid Heise wrote:
> >I'm assuming that this is the normal case. In a A->B graph, as soon as A
> >finishes, B still has a couple of input buffers to process. If you add
> >backpressure or longer pipelines into the mix, it's quite likely that a
> >checkpoint may occur with B being the head.
>
> Ahh, I think I know what you mean. This can happen when the checkpoint
> coordinator issues concurrent checkpoint without waiting for older ones
> to finish. My head is mostly operating under the premise that there is
> at most one concurrent checkpoint.
>
> In the current code base the race conditions that Yun and I are talking
> about cannot occur. Checkpoints can only be triggered at sources and
> they will then travel through the graph. Intermediate operators are
> never directly triggered from the JobManager/CheckpointCoordinator.
>
> When source start to shut down, the JM has to directly inject/trigger
> checkpoints at the now new "sources" of the graph, which have previously
> been intermediate operators.
>
> I want to repeat that I have a suspicion that maybe this is a degenerate
> case and we never want to allow operators to be doing checkpoints when
> they are not connected to at least one running source.  Which means that
> we have to find a solution for declined checkpoints, missing sources.
>
> I'll first show an example where I think we will never have intermediate
> operators running without the sources being running:
>
> Source -> Map -> Sink
>
> Here, when the Source does its final checkpoint and then shuts down,
> that same final checkpoint would travel downstream ahead of the EOF,
> which would in turn cause Map and Sink to also shut down. *We can't have
> the case that Map is still running when we want to take a checkpoint and
> Source is not running*.
>
> A similar case is this one:
>
> Source1 --+
>            |->Map -> Sink
> Source2 --+
>
> Here, if Source1 is finished but Source2 is not, Map is still connected
> to at least one upstream source that is still running. Again. Map would
> never be running and doing checkpoints if neither of Source1 or Source2
> are online.
>
> The cases I see where intermediate operators would keep running despite
> not being connected to any upstream operators are when we purposefully
> keep an operator online despite all inputs having seen EOF. One example
> is async I/O, another is what Yun mentioned where a sink might want to
> wait for another checkpoint to confirm some data. Example:
>
> Source -> Async I/O -> Sink
>
> Here, Async I/O will stay online as long as there are some scheduled
> requests outstanding, even when the Source has shut down. In those
> cases, the checkpoint coordinator would have to trigger new checkpoints
> at Async I/O and not Source, because it has become the new "head" of the
> graph.
>
> For Async I/O at least, we could say that the operator will wait for all
> outstanding requests to finish before it allows the final checkpoint and
> passes the barrier forward.
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
Hi Yun,

thanks for the detailed example. It feels like Aljoscha and you are also
not fully aligned yet. For me, it sounded as if Aljoscha would like to
avoid sending RPC to non-source subtasks.

I think we are still on the same page that we would like to trigger

> checkpoint periodically until the whole job is finished.
> I think in generaly we do not must force the checkpoint aligned with
> subtask finished, namely for example one operator
> might have the lifecycle that "taking one checkpoint -> emit some records
> -> taking another checkpoint -> emit more records -> finish",
> and do not need to must have to wait for one more checkpoint before
> finished. The second checkpoint just happens to be the "final" checkpoint
> of this operator.
> The only exception is that for sink operator that must wait for one more
> checkpoint to commit the last piece of data before finished, this kind of
> operators
> would be dealt with separately to force them to wait for checkpont before
> finished
>

Yes that sounds good. I was concerned of any "holding back" of barriers in
async I/O. I'd just hold back the EOP until all async threads finished and
forward barriers in the normal way.

That would then also be my solution for sinks - hold back EOP (=finish)
until checkpoint is done. My concern here is still that we would need to
have a reliable mechanism to notify checkpoint completed. Maybe we can use
the GlobalCommitter?

In this case, if we do not deal with specially, based on the current

> implementation, the trigger RPC would just be ignored, and the checkpoint
> would finally
> failed due to timeout since no tasks would report its state. But we would
> be able to remedy this checkpont: since the Source subtask 2 and the Async
> I/O
> subtask would report FINISHED status to JobMaster after we tries to
> trigger the tasks, and before the task has reported its snapshot for this
> checkpont.
> The CheckpontCoordinator would listen to the notification, when it
> received the notification, it would iterates its pending checkpoints to see
> if it has trigger this task but received FINISHED before its snapshot. If
> so, it would recompute the subtasks to trigger, and re-trigger the
> following tasks.
> Of couse this is one possible implementation and we might have other
> solutions to this problem. Do you think the process would still have some
> problems ?
>

Here I'm just concerned that we would overload JM. Especially if it's
cascading: A is triggered in A->B->C but finishes, JM computes B and
resends RPC but at that time B is also finished. Hence I was thinking of
using TMs instead and only fall back to JM if TM has exited.

On Wed, Jan 6, 2021 at 3:29 PM Yun Gao <[hidden email]> wrote:

> Hi Arvid,
>
> Very thanks for the feedbacks! I'll try to answer the questions inline:
>
> > I'm also concerned about the notion of a final checkpoint. What happens
> > when this final checkpoint times out (checkpoint timeout > async timeout)
> > or fails for a different reason? I'm currently more inclined to just let
> > checkpoints work until the whole graph is completed (and thought this was
> > the initial goal of the whole FLIP to being with).
>
> I think we are still on the same page that we would like to trigger
> checkpoint periodically until the whole job is finished.
> I think in generaly we do not must force the checkpoint aligned with
> subtask finished, namely for example one operator
> might have the lifecycle that "taking one checkpoint -> emit some records
> -> taking another checkpoint -> emit more records -> finish",
> and do not need to must have to wait for one more checkpoint before
> finished. The second checkpoint just happens to be the "final" checkpoint
> of this operator.
> The only exception is that for sink operator that must wait for one more
> checkpoint to commit the last piece of data before finished, this kind of
> operators
> would be dealt with separately to force them to wait for checkpont before
> finished.
>
> >
> However, I have the impression that you think mostly in terms of tasks and
> > I mostly think in terms of subtasks. I especially want to have proper
> > support for bounded sources where one partition is much larger than the
> >
>  other partitions (might be in conjunction with unbounded sources such that
>
> > checkpointing is plausible to begin with). Hence, most of the subtasks are
> > finished with one struggler remaining. In this case, the barriers are
> > inserted now only in the struggling source subtask and potentially in any
> > running downstream subtask.
> > As far as I have understood, this would require barriers to be inserted
> > downstream leading to similar race conditions.
>
> I might not fully understand the issue, but I'd like to further detail
> the expected process here:
>
> Source (subtask 0) ---+
>                                      |
> Source (subtask 1) ---+--> Async I/O (subtask 0) -> Sink (subtask 0).
>                                     |
> Source (subtask 2) ---+
>
>
> The async I/O subtask would have three input channels.
>
> case 1) Support source subtask 0 and 1 are finished and the Async I/O
> would received EndOfPartition from the corresponding
> channels. Now we happen to trigger a checkpoint, we in the remaining
> execution graph, the subtask 2 is the "source" of the
> graph. Then we would trigger source subtask 2 to start the checkpoint,
> source subtask 2 takes snapshot and emit barriers to
> Async I/O sutask. Async I/O subtask would found that 2/3 of its input
> channels have received Eof and received barrier from
> the remaining channel, then it knows the barriers are aligned, then it
> takes the snapshot and emit the barrier to the sink subtasks.
>
> case 2) Suppose the job continue to run and now source subtask 2 is also
> finished and now we are going to take another checkpoint,
> then we found that in the remaining execution graph the new "source" now
> is the Async I/O subtask. Then we would trigger this
> Async I/O instead (this is different from the current implementation).
> The Async I/O received the trigger and take its snapshot and
> emit barrier to the following sink subtask. (Of couse here the Async I/O
> subtask should have some method to wait till it received EoF
> from all the input channels before taking snapshot to keep consistent, but
> I think we could ignore the detail implementations first).
>
> For the race condition, it might happen if
> a) in case 1, the CheckpontCoordinator trigger Source subtask 2, but
> source subtask 2 report finished before the trigger RPC gets into the
> resided TaskManager.
> b) in case 2, the CheckpointCoordinator trigger Async I/O, but Async I/O
> subtask report finished before the trigger RPC gets into the resided
> TaskManager.
>
> In this case, if we do not deal with specially, based on the current
> implementation, the trigger RPC would just be ignored, and the checkpoint
> would finally
> failed due to timeout since no tasks would report its state. But we would
> be able to remedy this checkpont: since the Source subtask 2 and the Async
> I/O
> subtask would report FINISHED status to JobMaster after we tries to
> trigger the tasks, and before the task has reported its snapshot for this
> checkpont.
> The CheckpontCoordinator would listen to the notification, when it
> received the notification, it would iterates its pending checkpoints to see
> if it has trigger this task but received FINISHED before its snapshot. If
> so, it would recompute the subtasks to trigger, and re-trigger the
> following tasks.
> Of couse this is one possible implementation and we might have other
> solutions to this problem. Do you think the process would still have some
> problems ?
>
> > However, that would
> > require subtasks to stay alive until they receive checkpiontCompleted
> > callback (which is currently also not guaranteed)
>
> With the above process, I think the task would do not need to wait for
> receiving the checkpontCompleted callback? If it finished, the above
> process
> would try to trigger its following tasks.
>
> Best,
> Yun
>
>
>
>
>
>
>
> ------------------Original Mail ------------------
> *Sender:*Arvid Heise <[hidden email]>
> *Send Date:*Wed Jan 6 20:42:56 2021
> *Recipients:*Aljoscha Krettek <[hidden email]>
> *CC:*dev <[hidden email]>
> *Subject:*Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>>
>> I was actually not thinking about concurrent checkpoints (and actually want
>> to get rid of them once UC is established, since they are addressing the
>> same thing).
>>
>> But your explanation definitely helped me to better understand the race
>> condition.
>>
>> However, I have the impression that you think mostly in terms of tasks and
>> I mostly think in terms of subtasks. I especially want to have proper
>> support for bounded sources where one partition is much larger than the
>> other partitions (might be in conjunction with unbounded sources such that
>> checkpointing is plausible to begin with). Hence, most of the subtasks are
>> finished with one struggler remaining. In this case, the barriers are
>> inserted now only in the struggling source subtask and potentially in any
>> running downstream subtask.
>> As far as I have understood, this would require barriers to be inserted
>> downstream leading to similar race conditions.
>>
>> I'm also concerned about the notion of a final checkpoint. What happens
>> when this final checkpoint times out (checkpoint timeout > async timeout)
>> or fails for a different reason? I'm currently more inclined to just let
>> checkpoints work until the whole graph is completed (and thought this was
>> the initial goal of the whole FLIP to being with). However, that would
>> require subtasks to stay alive until they receive checkpiontCompleted
>> callback (which is currently also not guaranteed)...
>>
>> On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>> > On 2021/01/06 11:30, Arvid Heise wrote:
>>
>> > >I'm assuming that this is the normal case. In a A->B graph, as soon as A
>> > >finishes, B still has a couple of input buffers to process. If you add
>> > >backpressure or longer pipelines into the mix, it's quite likely that a
>> > >checkpoint may occur with B being the head.
>> >
>> > Ahh, I think I know what you mean. This can happen when the checkpoint
>> > coordinator issues concurrent checkpoint without waiting for older ones
>> > to finish. My head is mostly operating under the premise that there is
>> > at most one concurrent checkpoint.
>> >
>> > In the current code base the race conditions that Yun and I are talking
>> > about cannot occur. Checkpoints can only be triggered at sources and
>> > they will then travel through the graph. Intermediate operators are
>> > never directly triggered from the JobManager/CheckpointCoordinator.
>> >
>> > When source start to shut down, the JM has to directly inject/trigger
>> > checkpoints at the now new "sources" of the graph, which have previously
>> > been intermediate operators.
>> >
>> > I want to repeat that I have a suspicion that maybe this is a degenerate
>> > case and we never want to allow operators to be doing checkpoints when
>> > they are not connected to at least one running source.  Which means that
>> > we have to find a solution for declined checkpoints, missing sources.
>> >
>> > I'll first show an example where I think we will never have intermediate
>> > operators running without the sources being running:
>> >
>> > Source -> Map -> Sink
>> >
>> > Here, when the Source does its final checkpoint and then shuts down,
>> > that same final checkpoint would travel downstream ahead of the EOF,
>> > which would in turn cause Map and Sink to also shut down. *We can't have
>> > the case that Map is still running when we want to take a checkpoint and
>> > Source is not running*.
>> >
>> > A similar case is this one:
>> >
>> > Source1 --+
>> >            |->Map -> Sink
>> > Source2 --+
>> >
>> > Here, if Source1 is finished but Source2 is not, Map is still connected
>> > to at least one upstream source that is still running. Again. Map would
>> > never be running and doing checkpoints if neither of Source1 or Source2
>> > are online.
>> >
>> > The cases I see where intermediate operators would keep running despite
>> > not being connected to any upstream operators are when we purposefully
>> > keep an operator online despite all inputs having seen EOF. One example
>> > is async I/O, another is what Yun mentioned where a sink might want to
>> > wait for another checkpoint to confirm some data. Example:
>> >
>> > Source -> Async I/O -> Sink
>> >
>> > Here, Async I/O will stay online as long as there are some scheduled
>> > requests outstanding, even when the Source has shut down. In those
>> > cases, the checkpoint coordinator would have to trigger new checkpoints
>> > at Async I/O and not Source, because it has become the new "head" of the
>> > graph.
>> >
>> > For Async I/O at least, we could say that the operator will wait for all
>> > outstanding requests to finish before it allows the final checkpoint and
>> > passes the barrier forward.
>> >
>> > Best,
>> > Aljoscha
>> >
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek-2
In reply to this post by Arvid Heise-3
On 2021/01/06 13:35, Arvid Heise wrote:
>I was actually not thinking about concurrent checkpoints (and actually want
>to get rid of them once UC is established, since they are addressing the
>same thing).

I would give a yuge +1 to that. I don't see why we would need concurrent
checkpoints in most cases. (Any case even?)

>However, I have the impression that you think mostly in terms of tasks and
>I mostly think in terms of subtasks. I especially want to have proper
>support for bounded sources where one partition is much larger than the
>other partitions (might be in conjunction with unbounded sources such that
>checkpointing is plausible to begin with). Hence, most of the subtasks are
>finished with one struggler remaining. In this case, the barriers are
>inserted now only in the struggling source subtask and potentially in any
>running downstream subtask.
>As far as I have understood, this would require barriers to be inserted
>downstream leading to similar race conditions.

No, I'm also thinking in terms of subtasks when it comes to triggering.  
As long as a subtask has at least one upstream task we don't need to
manually trigger that task. A task will know which of its inputs have
finished, so it will take those out of the calculation that waits for
barriers from all upstream tasks. In the case where only a single
upstream source is remaining the barriers from that task will then
trigger checkpointing at the downstream task.

>I'm also concerned about the notion of a final checkpoint. What happens
>when this final checkpoint times out (checkpoint timeout > async timeout)
>or fails for a different reason? I'm currently more inclined to just let
>checkpoints work until the whole graph is completed (and thought this was
>the initial goal of the whole FLIP to being with). However, that would
>require subtasks to stay alive until they receive checkpiontCompleted
>callback (which is currently also not guaranteed)...

The idea is that the final checkpoint is whatever checkpoint succeeds in
the end. When a task (and I mostly mean subtask when I say task) knows
that it is done it waits for the next successful checkpoint and then
shuts down.

This is a basic question, though: should we simply keep all tasks
(subtasks) around forever until the whole graph shuts down? Our answer
for this was *no*, so far. We would like to allow tasks to shut down,
such that the resources are freed at that point.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek-2
In reply to this post by Arvid Heise-3
On 2021/01/06 16:05, Arvid Heise wrote:
>thanks for the detailed example. It feels like Aljoscha and you are also
>not fully aligned yet. For me, it sounded as if Aljoscha would like to
>avoid sending RPC to non-source subtasks.

No, I think we need the triggering of intermediate operators.

I was just thinking out loud about the potential scenarios where
intermediate operators will in fact stay online, and how common they
are.

Also, I sent an explanation that is similar to Yuns. It seems we always
write out mails in parallel and then sent them before checking. :-) So
you always get two explanations of roughly the same thing.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
Okay then at least you guys are in sync ;) (Although I'm also not too far
away)

I hope I'm not super derailing but could we reiterate why it's good to get
rid of finished tasks (note: I'm also mostly in favor of that):
1. We can free all acquired resources including buffer pools, state
backend(?), threads.
2. TM can forget about the subtask entirely.
3. We can subsequently downscale.
4. What more?

I'm assuming it's not needed to execute the application at all: The
application at one point had all subtasks running, so it's not a resource
issue per se (ignoring rescaling).

My idea is not to let the task live longer (except for final checkpoints
where we are all on the same page I guess). I'm just thinking out loud if
we can avoid 2. while still doing 1.+3.

So can TM retain some slim information about a finished task to still
process RPCs in a potentially different way?
Thus, without keeping the costly task thread and operator chains, could we
implement some RPC handler that knows this is a finished task and forward
the barrier to the next task/TM?
Can we store this slim information in a checkpoint as an operator subtask
state?
Could we transfer this slim information in case of (dynamic) downscaling?

If this somehow works, we would not need to change much in the checkpoint
coordinator. He would always inject into sources. We could also ignore the
race conditions as long as the TM lives. Checkpointing times are also not
worse as with the live task.
Clear downside (assuming feasibility) is that we have two code paths that
would deal with barriers. We would also need to keep more information in
the TM but again at some point the complete subtask fitted.

On Wed, Jan 6, 2021 at 4:39 PM Aljoscha Krettek <[hidden email]> wrote:

> On 2021/01/06 16:05, Arvid Heise wrote:
> >thanks for the detailed example. It feels like Aljoscha and you are also
> >not fully aligned yet. For me, it sounded as if Aljoscha would like to
> >avoid sending RPC to non-source subtasks.
>
> No, I think we need the triggering of intermediate operators.
>
> I was just thinking out loud about the potential scenarios where
> intermediate operators will in fact stay online, and how common they
> are.
>
> Also, I sent an explanation that is similar to Yuns. It seems we always
> write out mails in parallel and then sent them before checking. :-) So
> you always get two explanations of roughly the same thing.
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
In reply to this post by Yun Gao
Hi Arvid,

Very thanks for the deep thoughts !

> If this somehow works, we would not need to change much in the checkpoint
> coordinator. He would always inject into sources. We could also ignore the
> race conditions as long as the TM lives. Checkpointing times are also not
> worse as with the live task.
> Clear downside (assuming feasibility) is that we have two code paths that
> would deal with barriers. We would also need to keep more information in
> the TM but again at some point the complete subtask fitted.

I also agree with that with the slim information the checkpoint was more unified with the normal process,
and it could simplify the changes to CheckpointCoordinator. I thought about some rought design
with this direction, and I still have some concerns:

1. If we want to ensure we could always trigged the slim finished sources, we must ensure the TaskExecutor
keeping this information not be released due to idle timeout. Thus the slim information would still pin some
resources under some scenarios. If we have mixed jobs with both bounded and unbounded sources, the
resources would be kept pinned.  

2. I still have some concerns in introducing a new rpc communication network between TaskManagers. We might
need to intiate the rpc connection together with the netty channels and close the rpc channels on job finished, this
indicates that on job finished the JM need to try to notify TM to clean these connections, which requires reliable
RPC message from JM to TM, which involves timeout & resent, and JM might be blocked before finished to close
these connections. This would also complicate failover, since on failover we need to close the original connections
and reconnect according to the new deployment. And for the RPC channels themselves, the TM also need to maintain heartbeat
to ensure the channel is still opened, this would increase the burden for the TM rpc services when we have a lot of TMs.

3. For the slim information, we would need to clear the information on failover to avoid the TM get pinned wrongly. This would
also requires scheduler to introduce a new process to clear the slim information that is similar to cancel a running task.

4. Since the checkpoint would only be available to streaming jobs, we might only want to keep the slim information and create
the rpc channels between tasksmanagers for streaming jobs. Currently the batch jobs roughly only differ in that it does not have
a CheckpontCoordinator to trigger the checkpoint and other components are mostly unified. If we also want to consider whether
to keep slim information and create the rpc channels, the scheduler and the network layer would then also need to be aware of
the execution mode and have special code path for streaming mode.

For the JM-based method to retry triggering the following tasks:

> Here I'm just concerned that we would overload JM. Especially if it's cascading: A is triggered in A->B->C but finishes,
> JM computes B and resends RPC but at that time B is also finished. Hence I was thinking of using TMs instead and only
> fall back to JM if TM has exited.

If the overhead is the main concerns, I roughly think that we might avoid too much failed retriggers by make CheckpointCoordinator
to wait a show period to accumulate more finished notification before taking actions. The JM would have heavy burden if in a short
time there are a large batch of tasks get finished, with the show wait time the JM would be able to smooth the overload and avoid
repeat trying.

Best,
 Yun
 ------------------Original Mail ------------------
Sender:Arvid Heise <[hidden email]>
Send Date:Thu Jan 7 00:52:27 2021
Recipients:Aljoscha Krettek <[hidden email]>
CC:dev <[hidden email]>, Yun Gao <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Okay then at least you guys are in sync ;) (Although I'm also not too far
away)

I hope I'm not super derailing but could we reiterate why it's good to get
rid of finished tasks (note: I'm also mostly in favor of that):
1. We can free all acquired resources including buffer pools, state
backend(?), threads.
2. TM can forget about the subtask entirely.
3. We can subsequently downscale.
4. What more?

I'm assuming it's not needed to execute the application at all: The
application at one point had all subtasks running, so it's not a resource
issue per se (ignoring rescaling).

My idea is not to let the task live longer (except for final checkpoints
where we are all on the same page I guess). I'm just thinking out loud if
we can avoid 2. while still doing 1.+3.

So can TM retain some slim information about a finished task to still
process RPCs in a potentially different way?
Thus, without keeping the costly task thread and operator chains, could we
implement some RPC handler that knows this is a finished task and forward
the barrier to the next task/TM?
Can we store this slim information in a checkpoint as an operator subtask
state?
Could we transfer this slim information in case of (dynamic) downscaling?

If this somehow works, we would not need to change much in the checkpoint
coordinator. He would always inject into sources. We could also ignore the
race conditions as long as the TM lives. Checkpointing times are also not
worse as with the live task.
Clear downside (assuming feasibility) is that we have two code paths that
would deal with barriers. We would also need to keep more information in
the TM but again at some point the complete subtask fitted.

On Wed, Jan 6, 2021 at 4:39 PM Aljoscha Krettek <[hidden email]> wrote:

> On 2021/01/06 16:05, Arvid Heise wrote:
> >thanks for the detailed example. It feels like Aljoscha and you are also
> >not fully aligned yet. For me, it sounded as if Aljoscha would like to
> >avoid sending RPC to non-source subtasks.
>
> No, I think we need the triggering of intermediate operators.
>
> I was just thinking out loud about the potential scenarios where
> intermediate operators will in fact stay online, and how common they
> are.
>
> Also, I sent an explanation that is similar to Yuns. It seems we always
> write out mails in parallel and then sent them before checking. :-) So
> you always get two explanations of roughly the same thing.
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek-2
In reply to this post by Yun Gao
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I
just wanted to get this quick thought out.

For letting operators decide whether they actually want to wait for a
final checkpoint, which is relevant at least for Async I/O and
potentially for sinks.

We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

This way we would decouple that logic from things that don't actually
need it. What do you think?

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
>
> We could introduce an interface, sth like `RequiresFinalization` or
> `FinalizationListener` (all bad names). The operator itself knows when
> it is ready to completely shut down, Async I/O would wait for all
> requests, sink would potentially wait for a given number of checkpoints.
> The interface would have a method like `isFinalized()` that the
> framework can call after each checkpoint (and potentially at other
> points)


I think we are mixing two different things here that may require different
solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not
depend on data flow (async I/O, but I could also think of some timer
actions in process functions).

Your proposal would help most for the first case. The second case can
solved entirely with current methods without being especially complicated:
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to
fire immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to
wait for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I
don't see a use case outside sinks, we could simply add this method to the
new sink interface.
- We implicitly assume that a sink is done after having a successful
checkpoint at the end. Then we just need a tag interface
`RequiresFinalization`. It also feels like we should add the property
`final` to checkpoint options to help the sink detect that this is the last
checkpoint to be taken. We could also try to always have the final
checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]>
wrote:

> This is somewhat unrelated to the discussion about how to actually do
> the triggering when sources shut down, I'll write on that separately. I
> just wanted to get this quick thought out.
>
> For letting operators decide whether they actually want to wait for a
> final checkpoint, which is relevant at least for Async I/O and
> potentially for sinks.
>
> We could introduce an interface, sth like `RequiresFinalization` or
> `FinalizationListener` (all bad names). The operator itself knows when
> it is ready to completely shut down, Async I/O would wait for all
> requests, sink would potentially wait for a given number of checkpoints.
> The interface would have a method like `isFinalized()` that the
> framework can call after each checkpoint (and potentially at other
> points)
>
> This way we would decouple that logic from things that don't actually
> need it. What do you think?
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Khachatryan Roman
Thanks for starting this discussion (and sorry for probably duplicated
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and
adds complexity (new event).
However, the resources would be wasted for a relatively short time until
the job finishes completely.
And compared to other options, complexity seems much lower. Or are
differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in
Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's
true for operator state, but what about channel state (captured by
unaligned checkpoint)? I think it still has to be sent downstream which
invalidates this Option.

Regards,
Roman


On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote:

> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>
>
> I think we are mixing two different things here that may require different
> solutions:
> 1. Tasks (=sink) that may need to do something with the final checkpoint.
> 2. Tasks that only finish after having finished operations that do not
> depend on data flow (async I/O, but I could also think of some timer
> actions in process functions).
>
> Your proposal would help most for the first case. The second case can
> solved entirely with current methods without being especially complicated:
> - EOP is only emitted once Async I/O is done with all background tasks
> - All timers are fired in a process function (I think we rather want to
> fire immediately on EOP but that's a different discussion)
> The advantage of this approach over your idea is that you don't need to
> wait for a checkpoint to complete to check for finalization.
>
> Now let's look at the first case. I see two alternatives:
> - The new sink interface implicitly incorporates this listener. Since I
> don't see a use case outside sinks, we could simply add this method to the
> new sink interface.
> - We implicitly assume that a sink is done after having a successful
> checkpoint at the end. Then we just need a tag interface
> `RequiresFinalization`. It also feels like we should add the property
> `final` to checkpoint options to help the sink detect that this is the last
> checkpoint to be taken. We could also try to always have the final
> checkpoint without tag interface on new sinks...
>
> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> This is somewhat unrelated to the discussion about how to actually do
>> the triggering when sources shut down, I'll write on that separately. I
>> just wanted to get this quick thought out.
>>
>> For letting operators decide whether they actually want to wait for a
>> final checkpoint, which is relevant at least for Async I/O and
>> potentially for sinks.
>>
>> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>>
>> This way we would decouple that logic from things that don't actually
>> need it. What do you think?
>>
>> Best,
>> Aljoscha
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi Roman,

   Very thanks for the feedbacks! I'll try to answer the issues inline:

> 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event).
> However, the resources would be wasted for a relatively short time until the job finishes completely.
> And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common?

There might be mixed jobs with both bounded sources and unbounded sources, in this case, the resource for the finished
part of the job would not be able to be released.

And the Option 1 also complicates the semantics of the EndOfPartition, since if we holding the tasks and we still need to
notify the following tasks about all records are sent, we would have to introduce some kind of pre-EndOfPartition messages,
which is similar to the current EndOfPartition, but do not cause the channels to be released.

> 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish).

For Option 2 and 3 we managed the states via the unit of operator, thus the process of rescaling would be the same with the normal checkpoint.
For example, support one operator resides in a tasks with parallelism 4, if 2 fo the subtasks are finished, now the state of the operator is composed
of the state of the 2 remaining subtask instance, if we rescale to 5 after failover, the state of the 2 previous remaining subtasks would be re-distributed
to the 5 new subtasks after failover.

If before failover all the 4 subtasks are finished, the operator would be marked as finished, after failover the operator would be still marked as finished,
and all the subtask instance of this operator would skip all the methods like open(), endOfInput(), close() and would be excluded when taking checkpoints
 after failover.


> 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)?
> I think it still has to be sent downstream which invalidates this Option.

For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, then its descandent tasks would wait all the records are received
 from the finished tasks before taking checkpoint, thus in this case we would not have result partition state, but only have channel state for the
downstream tasks that are still running.

In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would
choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks
are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with
EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then
we would be able to ensure the finished tasks always have an empty state.

I'll also optimize the FLIP to make it more clear~

Best,
 Yun



 ------------------Original Mail ------------------
Sender:Khachatryan Roman <[hidden email]>
Send Date:Thu Jan 7 21:55:52 2021
Recipients:Arvid Heise <[hidden email]>
CC:dev <[hidden email]>, user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event).
However, the resources would be wasted for a relatively short time until the job finishes completely.
And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option.

Regards,
Roman

On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote:

We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

I think we are mixing two different things here that may require different solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not depend on data flow (async I/O, but I could also think of some timer actions in process functions).

Your proposal would help most for the first case. The second case can solved entirely with current methods without being especially complicated:
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to fire immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to wait for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I don't see a use case outside sinks, we could simply add this method to the new sink interface.
- We implicitly assume that a sink is done after having a successful checkpoint at the end. Then we just need a tag interface `RequiresFinalization`. It also feels like we should add the property `final` to checkpoint options to help the sink detect that this is the last checkpoint to be taken. We could also try to always have the final checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> wrote:
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I
just wanted to get this quick thought out.

For letting operators decide whether they actually want to wait for a
final checkpoint, which is relevant at least for Async I/O and
potentially for sinks.

We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

This way we would decouple that logic from things that don't actually
need it. What do you think?

Best,
Aljoscha


--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Khachatryan Roman
Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in
one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B,
if it received checkpoint trigger, it would know that all its precedant
tasks
> are finished, then it would wait till all the InputChannel received
EndOfPartition from the network (namely inputChannel.onBuffer() is called
with
> EndOfPartition) and then taking snapshot for the input channels, as the
normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint
if not all inputs of this task are finished (with
CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will
delay performing the first checkpoint by B after becoming a new source.
This may in turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source
until it's EoP is consumed?

Regards,
Roman


On Thu, Jan 7, 2021 at 5:01 PM Yun Gao <[hidden email]> wrote:

> Hi Roman,
>
>    Very thanks for the feedbacks! I'll try to answer the issues inline:
>
> > 1. Option 1 is said to be not preferable because it wastes resources and
> adds complexity (new event).
> > However, the resources would be wasted for a relatively short time
> until the job finishes completely.
> > And compared to other options, complexity seems much lower. Or are
> differences in task completion times so huge and so common?
>
> There might be mixed jobs with both bounded sources and unbounded sources,
> in this case, the resource for the finished
> part of the job would not be able to be released.
>
> And the Option 1 also complicates the semantics of the EndOfPartition,
> since if we holding the tasks and we still need to
> notify the following tasks about all records are sent, we would have to
> introduce some kind of pre-EndOfPartition messages,
> which is similar to the current EndOfPartition, but do not cause the
> channels to be released.
>
> > 2. I think it would be helpful to describe how is rescaling handled in
> Options 2 and 3 (or maybe it's not supported for jobs about to finish).
>
> For Option 2 and 3 we managed the states via the unit of operator, thus
> the process of rescaling would be the same with the normal checkpoint.
> For example, support one operator resides in a tasks with parallelism 4,
> if 2 fo the subtasks are finished, now the state of the operator is
> composed
> of the state of the 2 remaining subtask instance, if we rescale to 5 after
> failover, the state of the 2 previous remaining subtasks would be
> re-distributed
> to the 5 new subtasks after failover.
>
> If before failover all the 4 subtasks are finished, the operator would be
> marked as finished, after failover the operator would be still marked as
> finished,
> and all the subtask instance of this operator would skip all the methods
> like open(), endOfInput(), close() and would be excluded when taking
> checkpoints
> after failover.
>
>
> > 3. Option 3 assumes that the state of a finished task is not used.
> That's true for operator state, but what about channel state (captured by
> unaligned checkpoint)?
> > I think it still has to be sent downstream which invalidates this Option.
>
> For unaligned checkpoint, if in one checkpoint a subtask is marked as
> finished, then its descandent tasks would wait all the records are received
> from the finished tasks before taking checkpoint, thus in this case we
> would not have result partition state, but only have channel state for the
> downstream tasks that are still running.
>
> In detail, support we have a job with the graph A -> B -> C, support in
> one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if
> it received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received
> EndOfPartition from the network (namely inputChannel.onBuffer() is called
> with
> EndOfPartition) and then taking snapshot for the input channels, as the
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.
>
> I'll also optimize the FLIP to make it more clear~
>
> Best,
>  Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Khachatryan Roman <[hidden email]>
> *Send Date:*Thu Jan 7 21:55:52 2021
> *Recipients:*Arvid Heise <[hidden email]>
> *CC:*dev <[hidden email]>, user <[hidden email]>
> *Subject:*Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>> Thanks for starting this discussion (and sorry for probably duplicated
>> questions, I couldn't find them answered in FLIP or this thread).
>>
>> 1. Option 1 is said to be not preferable because it wastes resources and
>> adds complexity (new event).
>> However, the resources would be wasted for a relatively short time until
>> the job finishes completely.
>> And compared to other options, complexity seems much lower. Or are
>> differences in task completion times so huge and so common?
>>
>> 2. I think it would be helpful to describe how is rescaling handled in
>> Options 2 and 3 (or maybe it's not supported for jobs about to finish).
>>
>> 3. Option 3 assumes that the state of a finished task is not used. That's
>> true for operator state, but what about channel state (captured by
>> unaligned checkpoint)? I think it still has to be sent downstream which
>> invalidates this Option.
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote:
>>
>>> We could introduce an interface, sth like `RequiresFinalization` or
>>>> `FinalizationListener` (all bad names). The operator itself knows when
>>>> it is ready to completely shut down, Async I/O would wait for all
>>>> requests, sink would potentially wait for a given number of
>>>> checkpoints.
>>>> The interface would have a method like `isFinalized()` that the
>>>> framework can call after each checkpoint (and potentially at other
>>>> points)
>>>
>>>
>>> I think we are mixing two different things here that may require
>>> different solutions:
>>> 1. Tasks (=sink) that may need to do something with the final checkpoint.
>>> 2. Tasks that only finish after having finished operations that do not
>>> depend on data flow (async I/O, but I could also think of some timer
>>> actions in process functions).
>>>
>>> Your proposal would help most for the first case. The second case can
>>> solved entirely with current methods without being especially complicated:
>>> - EOP is only emitted once Async I/O is done with all background tasks
>>> - All timers are fired in a process function (I think we rather want to
>>> fire immediately on EOP but that's a different discussion)
>>> The advantage of this approach over your idea is that you don't need to
>>> wait for a checkpoint to complete to check for finalization.
>>>
>>> Now let's look at the first case. I see two alternatives:
>>> - The new sink interface implicitly incorporates this listener. Since I
>>> don't see a use case outside sinks, we could simply add this method to the
>>> new sink interface.
>>> - We implicitly assume that a sink is done after having a successful
>>> checkpoint at the end. Then we just need a tag interface
>>> `RequiresFinalization`. It also feels like we should add the property
>>> `final` to checkpoint options to help the sink detect that this is the last
>>> checkpoint to be taken. We could also try to always have the final
>>> checkpoint without tag interface on new sinks...
>>>
>>> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]>
>>> wrote:
>>>
>>>> This is somewhat unrelated to the discussion about how to actually do
>>>> the triggering when sources shut down, I'll write on that separately. I
>>>> just wanted to get this quick thought out.
>>>>
>>>> For letting operators decide whether they actually want to wait for a
>>>> final checkpoint, which is relevant at least for Async I/O and
>>>> potentially for sinks.
>>>>
>>>> We could introduce an interface, sth like `RequiresFinalization` or
>>>> `FinalizationListener` (all bad names). The operator itself knows when
>>>> it is ready to completely shut down, Async I/O would wait for all
>>>> requests, sink would potentially wait for a given number of
>>>> checkpoints.
>>>> The interface would have a method like `isFinalized()` that the
>>>> framework can call after each checkpoint (and potentially at other
>>>> points)
>>>>
>>>> This way we would decouple that logic from things that don't actually
>>>> need it. What do you think?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
      Hi Roman,

          Very thanks for the feedbacks !
        > Probably it would be simpler to just decline the RPC-triggered checkpoint
        > if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY).

        > But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint
        > by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints.
        > Maybe a better option would be to postpone JM notification from source until it's EoP is consumed?

       I also agree with that there would indeed be possible cases that the checkpoint get slower since it could not skip
the data in  the result partition of the finished upstream task:
            a) For aligned checkpoint, the cases would not happen since the downstream tasks would always need to
                process the buffers in order.
           b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very
                slowly.

       But since only the result partition part of the finished upstream need wait to be processed, the other part of
the execution graph could  still perform the unaligned checkpoint normally, I think the average delay caused would
 be much lower than the completely aligned checkpoint, but there would still be extremely bad cases that
       the delay is long.

       Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
       failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
       For postpone the notification the JM notification, since current JM should not be able to know if the task has
       received all the EndOfPartition from the upstream tasks, we might need to introduce new RPC for notifying the
       state and since the triggering is not atomic, we may also met with some  synchronization issues between JM and TM,
       which would introduce some complexity.
      Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has
      been flushed before get to finish. We could only do the wait for the PipelineResultPartition so it won't affect the batch
      jobs. With the waiting the unaligned checkpoint could continue to trigger the upstream task and skip the buffers in
      the result partition. Since the result partition state would be kept within the operator state of the last operator, after failover
      we would found that the last operator has an non-empty state and we would restart the tasks containing this operator to
      resend the snapshotted buffers. Of course this would also introduce some complexity, and since the probability of long delay
      would be lower than the completely aligned case, do you think it would be ok for us to view it as an optimization and
      postpone it to future versions ?

     Best,
     Yun


------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 05:46
To:Yun Gao <[hidden email]>
Cc:Arvid Heise <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with
> EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source until it's EoP is consumed?

Regards,
Roman

Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Khachatryan Roman
Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if
the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's
source subtask finishes.

> But since only the result partition part of the finished upstream need
wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the
EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the
implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some
concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly
CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till
all the pending buffers in the result partition has been flushed before get
to finish.
This is what I meant by "postpone JM notification from source". Just
blocking the task thread wouldn't add much complexity, though I'm not sure
if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and
postpone it to future versions ?
I think that's a good idea.

Regards,
Roman


On Mon, Jan 11, 2021 at 11:03 AM Yun Gao <[hidden email]> wrote:

>       Hi Roman,
>
>           Very thanks for the feedbacks !
>
>
>         > Probably it would be simpler to just decline the RPC-triggered
> checkpoint
>         > if not all inputs of this task are finished (with
> CHECKPOINT_DECLINED_TASK_NOT_READY).
>
>         > But I wonder how significantly this waiting for EoP from every
> input will delay performing the first checkpoint
>         > by B after becoming a new source. This may in turn impact
> exactly-once sinks and incremental checkpoints.
>         > Maybe a better option would be to postpone JM notification from
> source until it's EoP is consumed?
>
>        I also agree with that there would indeed be possible cases that
> the checkpoint get slower since it could not skip
>        the data in  the result partition of the finished upstream task:
>             a) For aligned checkpoint, the cases would not happen since
> the downstream tasks would always need to
>                 process the buffers in order.
>            b)  With unaligned checkpoint enabled, the slower cases might
> happen if the downstream task processes very
>                 slowly.
>
>        But since only the result partition part of the finished upstream
> need wait to be processed, the other part of
>        the execution graph could  still perform the unaligned checkpoint
> normally, I think the average delay caused would
>        be much lower than the completely aligned checkpoint, but there
> would still be extremely bad cases that
>        the delay is long.
>
>        Declining the RPC-trigger checkpoint would indeed simplify the
> implementation, but since currently by default the
>        failed checkpoint would cause job failover, thus we might have some
> concerns in directly decline the checkpoint.
>        For postpone the notification the JM notification, since current JM
> should not be able to know if the task has
>        received all the EndOfPartition from the upstream tasks, we might
> need to introduce new RPC for notifying the
>        state and since the triggering is not atomic, we may also met with
> some  synchronization issues between JM and TM,
>        which would introduce some complexity.
>
>       Thus another possible option might be let the upstream task to wait
> till all the pending buffers in the result partition has
>       been flushed before get to finish. We could only do the wait for the
> PipelineResultPartition so it won't affect the batch
>       jobs. With the waiting the unaligned checkpoint could continue to
> trigger the upstream task and skip the buffers in
>       the result partition. Since the result partition state would be kept
> within the operator state of the last operator, after failover
>       we would found that the last operator has an non-empty state and we
> would restart the tasks containing this operator to
>       resend the snapshotted buffers. Of course this would also introduce
> some complexity, and since the probability of long delay
>       would be lower than the completely aligned case, do you think it
> would be ok for us to view it as an optimization and
>       postpone it to future versions ?
>
>      Best,
>      Yun
>
>
>
> ------------------------------------------------------------------
> From:Khachatryan Roman <[hidden email]>
> Send Time:2021 Jan. 11 (Mon.) 05:46
> To:Yun Gao <[hidden email]>
> Cc:Arvid Heise <[hidden email]>; dev <[hidden email]>; user <
> [hidden email]>
> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> Finished
>
> Thanks a lot for your answers Yun,
>
> > In detail, support we have a job with the graph A -> B -> C, support in
> one checkpoint A has reported FINISHED, CheckpointCoordinator would
> > choose B as the new "source" to trigger checkpoint via RPC. For task B,
> if it received checkpoint trigger, it would know that all its precedant
> tasks
> > are finished, then it would wait till all the InputChannel received
> EndOfPartition from the network (namely inputChannel.onBuffer() is called
> with
> > EndOfPartition) and then taking snapshot for the input channels, as the
> normal unaligned checkpoints does for the InputChannel side. Then
> > we would be able to ensure the finished tasks always have an empty state.
>
> Probably it would be simpler to just decline the RPC-triggered checkpoint
> if not all inputs of this task are finished (with
> CHECKPOINT_DECLINED_TASK_NOT_READY).
>
> But I wonder how significantly this waiting for EoP from every input will
> delay performing the first checkpoint by B after becoming a new source.
> This may in turn impact exactly-once sinks and incremental checkpoints.
> Maybe a better option would be to postpone JM notification from source
> until it's EoP is consumed?
>
> Regards,
> Roman
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
In reply to this post by Yun Gao
    Hi Roman,

         Very thanks for the feedbacks and suggestions!

        > I think UC will be the common case with multiple sources each with DoP > 1.
        > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

        Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream
finished task specially.

       > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.
      The downstream tasked get triggered indeed must wait for received EoPs from all the input channels,
I initially compared it with the completely aligned cases and now the remaining execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get
triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the
possible max delay.

    > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
    Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each checkpoint.

>> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
> This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

>> do you think it would be ok for us to view it as an optimization and postpone it to future versions ?
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and currently
I also do not see explicit problems for waiting for the flush of pipeline result partition.
Glad that we have the same viewpoints on  this issue. :)

 Best,
  Yun



------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao <[hidden email]>
Cc:dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

> But since only the result partition part of the finished upstream need wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone it to future versions ?
I think that's a good idea.

Regards,
Roman

Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
  Hi all,
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, previously we proposed
to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that
it might cause overhead to JobMaster on cascade finish and large parallelism cases. Another option
might be let the StreamTask do one synchronization with the CheckpointCoordinator before get finished
to be aware of the missed pending checkpoints, since at then EndOfPartitions are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been finished. One option
to address this issue is to make the upstream tasks to wait for buffers get flushed before exit, and
we would include this in the future versions. I updated this part in this section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint before exit. To support
the operators that need to wait for some finalization condition like the Sink committer and Async I/O, we
could introduce a new interface to mark this kind of operators, and let the runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks are warmly welcomed
and appreciated. Very thanks!

Best,
 Yun

[1] https://cwiki.apache.org/confluence/x/mw-ZCQ
[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment
[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman <[hidden email]>
Cc:dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

    Hi Roman,

         Very thanks for the feedbacks and suggestions!

        > I think UC will be the common case with multiple sources each with DoP > 1.
        > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

        Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream
finished task specially.

       > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.
      The downstream tasked get triggered indeed must wait for received EoPs from all the input channels,
I initially compared it with the completely aligned cases and now the remaining execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get
triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the
possible max delay.

    > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
    Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each checkpoint.

>> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
> This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

>> do you think it would be ok for us to view it as an optimization and postpone it to future versions ?
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and currently
I also do not see explicit problems for waiting for the flush of pipeline result partition.
Glad that we have the same viewpoints on  this issue. :)

 Best,
  Yun



------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao <[hidden email]>
Cc:dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

> But since only the result partition part of the finished upstream need wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone it to future versions ?
I think that's a good idea.

Regards,
Roman


Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
  Hi all,

We have some offline discussion together with @Arvid, @Roman and @Aljoscha and I'd
like to post some points we discussed:

1) For the problem that the "new" root task coincidently finished before getting triggered
successfully, we have listed two options in the FLIP-147[1], for the first version, now we are not tend
to go with the first option that JM would re-compute and re-trigger new sources when it realized
some tasks are not triggered successfully. This option would avoid the complexity of adding
new PRC and duplicating task states, and in average case it would not cause too much
overhead.

2) For how to support operators like Sink Committer to wait for one complete checkpoint
before exit, it would be more an issue of how to use the checkpoints after tasks finished instead
of how to achieve checkpoint after tasks finished, thus we would like to not include this part
first in the current discussion. We would discuss and solve this issue separately after FLIP-147 is done.

Best,
 Yun


[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2021 Jan. 13 (Wed.) 16:09
To:dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

  Hi all,
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, previously we proposed
to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that
it might cause overhead to JobMaster on cascade finish and large parallelism cases. Another option
might be let the StreamTask do one synchronization with the CheckpointCoordinator before get finished
to be aware of the missed pending checkpoints, since at then EndOfPartitions are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been finished. One option
to address this issue is to make the upstream tasks to wait for buffers get flushed before exit, and
we would include this in the future versions. I updated this part in this section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint before exit. To support
the operators that need to wait for some finalization condition like the Sink committer and Async I/O, we
could introduce a new interface to mark this kind of operators, and let the runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks are warmly welcomed
and appreciated. Very thanks!

Best,
 Yun

[1] https://cwiki.apache.org/confluence/x/mw-ZCQ
[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment
[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman <[hidden email]>
Cc:dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

    Hi Roman,

         Very thanks for the feedbacks and suggestions!

        > I think UC will be the common case with multiple sources each with DoP > 1.
        > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

        Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream
finished task specially.

       > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.
      The downstream tasked get triggered indeed must wait for received EoPs from all the input channels,
I initially compared it with the completely aligned cases and now the remaining execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get
triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the
possible max delay.

    > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
    Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each checkpoint.

>> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
> This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

>> do you think it would be ok for us to view it as an optimization and postpone it to future versions ?
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and currently
I also do not see explicit problems for waiting for the flush of pipeline result partition.
Glad that we have the same viewpoints on  this issue. :)

 Best,
  Yun



------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao <[hidden email]>
Cc:dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

> But since only the result partition part of the finished upstream need wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone it to future versions ?
I think that's a good idea.

Regards,
Roman



Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek-2
Thanks for the summary! I think we can now move towards a [VOTE] thread,
right?

On 2021/01/15 13:43, Yun Gao wrote:
>1) For the problem that the "new" root task coincidently finished
>before getting triggered successfully, we have listed two options in
>the FLIP-147[1], for the first version, now we are not tend to go with
>the first option that JM would re-compute and re-trigger new sources
>when it realized some tasks are not triggered successfully. This option
>would avoid the complexity of adding new PRC and duplicating task
>states, and in average case it would not cause too much overhead.

You wrote "we are *not* tend to go with the first option", but I think
you meant wo write "we tend to *now* go with the first option", right?  
That's also how it is in the FLIP, I just wanted to clarify for the
mailing list.
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi Aljoscha,

    I think so since we seems to do not have other divergence and new objections now. I'll open the vote then. Very thanks!

Best,
 Yun


------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2021 Jan. 15 (Fri.) 21:24
To:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the summary! I think we can now move towards a [VOTE] thread,
right?

On 2021/01/15 13:43, Yun Gao wrote:
>1) For the problem that the "new" root task coincidently finished
>before getting triggered successfully, we have listed two options in
>the FLIP-147[1], for the first version, now we are not tend to go with
>the first option that JM would re-compute and re-trigger new sources
>when it realized some tasks are not triggered successfully. This option
>would avoid the complexity of adding new PRC and duplicating task
>states, and in average case it would not cause too much overhead.

You wrote "we are *not* tend to go with the first option", but I think
you meant wo write "we tend to *now* go with the first option", right?  
That's also how it is in the FLIP, I just wanted to clarify for the
mailing list.

1234