[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

classic Classic list List threaded Threaded
77 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not s​upport checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
 ------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
Hi Yun,

Thank you for starting the discussion. This will solve one of the
long-standing issues [1] that confuse users. I'm also a big fan of option
3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which
would make the sinks the root nodes. That is very similar to how graphs in
relational algebra are labeled. However, I got the feeling that in Flink,
we rather iterate from sources to sink, making the sources root nodes and
the sinks the leaf nodes. However, I have no clue how it's done in similar
cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in
CheckpointCoordinator. Let's assume that we inject the barrier at all root
subtasks (initially all sources). So in the iterative algorithm, whenever
root A finishes, it looks at all connected subtasks B if they have any
upstream task left. If not B becomes a new root. That would require to only
touch a part of the job graph, but would require some callback from
JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is
about to finish, we could send the barrier to it from
CheckpointCoordinator, but at the time it arrives, the subtask is finished
already.
3) An implied change is that checkpoints are not aborted anymore at
EndOfPartition,
which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit
ambiguous: What happens when an unaligned checkpoint is started and then
one input channel contains the EndOfPartition event? From the written
description, it sounds to me like, we move back to an aligned checkpoint
for the whole receiving task. However, that is neither easily possible nor
necessary. Imho it would be enough to also store the EndOfPartition in the
channel state.
5) I'd expand the recovery section a bit. It would be the first time that
we recover an incomplete DAG. Afaik the subtasks are deployed before the
state is recovered, so at some point, the subtasks either need to be
removed again or maybe we could even avoid them being created in the first
place.

[1] https://issues.apache.org/jira/browse/FLINK-2491

On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:

> Hi, devs & users
>
> Very sorry for the spoiled formats, I resent the discussion as follows.
>
>
> As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not s
> ​
> upport checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
>         1.
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
>         2.
> The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
>
>
> Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
> ------------------Original Mail ------------------
> *Sender:*Yun Gao <[hidden email]>
> *Send Date:*Fri Oct 9 14:16:52 2020
> *Recipients:*Flink Dev <[hidden email]>, User-Flink <
> [hidden email]>
> *Subject:*[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>> Hi, devs & users
>>
>>
>> As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
>>
>> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
>>
>> The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
>>
>> Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
>> Best,
>> Yun
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> [2]
>> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
>> [3]
>> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
>> [4]
>> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
>> [5]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>> [6]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
>

--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue under the quota:
>> 1) You call the tasks that get the barriers injected leaf nodes, which would make the > sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
>> 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.

I think I should have used a bad name of "leaf nodes", in fact I think we should have the same thoughts that we start with the source nodes to find all the nodes whose precedent nodes are all finished. It would be much better to call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP to change the names to "root nodes".
>> 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
Exactly. When the checkpoint triggers a task but found the task is not there, it may then further check if the task has been finished, if so, it should then re-check its descendants to see if there are new "root nodes" to trigger.
>> 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
Yes, currently barrier alignment would fail the current checkpoint on EndOfPartition, and we would modify the behavior.
>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.

Very thanks for the suggestions on this issue and in fact I did stuck on it for some time. Previously for me one implementation detail issue is that EndOfPartition seems not be able to overtake the previous buffers easily as CheckpointBarrier does, otherwise it might start destroying the input channels if all EndOfPartitions are received.
Therefore, although we could also persistent the channels with EndOfPartition:
1. Start persisting the channels when CheckpointUnaligner received barrier (if not all precendant tasks are finished) or received triggering (if all precendant tasks are finished).
2. The persisting actually stops when onBuffer received EndOfPartition.
After the last channel stopped persisting, CheckpointUnaligner still need to wait till all the previous buffers are processed before complete the allBarriersReceivedFuture. Therefore it would not be able to accelerate the checkpoint in this case.
After some rethinking today currently I think we might inserts some additional virtual events into receivedBuffer when received EndOfPartition and allows these virtual events to overtake the previous buffers. I'll try to double check if it is feasible and let me know if there are also other solutions on this issue :).
> 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.
I also agree that finally we should not "restarted" the finished tasks in some way. It seems not start it in the first place would be better. We should be able to bookkeep additional information in the checkpoint meta about which operators are fully finished, and the scheduler could restore the status of tasks on restoring from previous checkpoints. It would also requires some modification in the task side to support input channels that are finished on starting.
But in the first version, I think we might simplify this issue by still restart all the tasks, but let the finished sources to exit directly? The new Source API would terminate directly since there is no pending splits and the legacy sources would be dealt specially by skipped execution if the source operator is fully finished before. We would be able to turn to the final solution gradually in the next steps.

Best,
Yun


------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 12 (Mon.) 15:38
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.

[1] https://issues.apache.org/jira/browse/FLINK-2491
On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:

Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
 ------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely
thought it through. But in general, I'm currently at the point where I
think that we also need non-checkpoint related events in unaligned
checkpoints. So just keep that in mind, that we might converge anyhow at
this point.

In general, what is helping in this case is to remember that there no
unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we
can completely ignore the problem on how to store and restore output
buffers of a completed task (also important for the next point).

5) I think we are on the same page and I completely agree that for the
MVP/first version, it's completely fine to start and immediately stop. A
tad better would be even to not even start the procession loop.

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao <[hidden email]> wrote:

> Hi Arvid,
>
> Very thanks for the insightful comments! I added the responses for this
> issue under the quota:
>
> >> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the > sinks the root nodes. That is very similar to how graphs
> in relational algebra are labeled. However, I got the feeling that in
> Flink, we rather iterate from sources to sink, making the sources root
> nodes and the sinks the leaf nodes. However, I have no clue how it's done
> in similar cases, so please take that hint cautiously.
>
> >> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
>
>
> I think I should have used a bad name of "leaf nodes", in fact I think we
> should have the same thoughts that we start with the source nodes to find
> all the nodes whose precedent nodes are all finished. It would be much
> better to call these nodes (which we would trigger) as "root nodes". I'll
> modify the FLIP to change the names to "root nodes".
>
> >> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
>
> Exactly. When the checkpoint triggers a task but found the task is not
> there, it may then further check if the task has been finished, if so, it
> should then re-check its descendants to see if there are new "root nodes"
> to trigger.
>
> >> 3) An implied change is that checkpoints are not aborted anymore at
> EndOfPartition, which is good, but might be explicitly added.
>
> Yes, currently barrier alignment would fail the current checkpoint on
> EndOfPartition, and we would modify the behavior.
>
> >> 4) The interaction between unaligned checkpoint and EndOfPartition is a
> bit ambiguous: What happens when an unaligned checkpoint is started and
> then one input channel contains the EndOfPartition event? From the written
> description, it sounds to me like, we move back to an aligned checkpoint
> for the whole receiving task. However, that is neither easily possible nor
> necessary. Imho it would be enough to also store the EndOfPartition in the
> channel state.
>
>
> Very thanks for the suggestions on this issue and in fact I did stuck on
> it for some time. Previously for me one implementation detail issue is that
> EndOfPartition seems not be able to overtake the previous buffers easily as
> CheckpointBarrier does, otherwise it might start destroying the input
> channels if all EndOfPartitions are received.
>
> Therefore, although we could also persistent the channels with
> EndOfPartition:
>
> 1. Start persisting the channels when CheckpointUnaligner received barrier
> (if not all precendant tasks are finished) or received triggering (if all
> precendant tasks are finished).
>
> 2. The persisting actually stops when onBuffer received EndOfPartition.
>
> After the last channel stopped persisting, CheckpointUnaligner still need
> to wait till all the previous buffers are processed before complete the
> allBarriersReceivedFuture. Therefore it would not be able to accelerate the
> checkpoint in this case.
>
> After some rethinking today currently I think we might inserts some
> additional virtual events into receivedBuffer when received EndOfPartition
> and allows these virtual events to overtake the previous buffers. I'll try
> to double check if it is feasible and let me know if there are also other
> solutions on this issue :).
>
> > 5) I'd expand the recovery section a bit. It would be the first time
> that we recover an incomplete DAG. Afaik the subtasks are deployed before
> the state is recovered, so at some point, the subtasks either need to be
> removed again or maybe we could even avoid them being created in the first
> place.
>
> I also agree that finally we should not "restarted" the finished tasks in
> some way. It seems not start it in the first place would be better. We
> should be able to bookkeep additional information in the checkpoint meta
> about which operators are fully finished, and the scheduler could restore
> the status of tasks on restoring from previous checkpoints. It would also
> requires some modification in the task side to support input channels that
> are finished on starting.
>
> But in the first version, I think we might simplify this issue by still
> restart all the tasks, but let the finished sources to exit directly? The
> new Source API would terminate directly since there is no pending splits
> and the legacy sources would be dealt specially by skipped execution if the
> source operator is fully finished before. We would be able to turn to the
> final solution gradually in the next steps.
>
>
> Best,
>
> Yun
>
> ------------------------------------------------------------------
> From:Arvid Heise <[hidden email]>
> Send Time:2020 Oct. 12 (Mon.) 15:38
> To:Yun Gao <[hidden email]>
> Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> Hi Yun,
>
> Thank you for starting the discussion. This will solve one of the
> long-standing issues [1] that confuse users. I'm also a big fan of option
> 3. It is also a bit closer to Chandy-Lamport again.
>
> A couple of comments:
>
> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the sinks the root nodes. That is very similar to how graphs in
> relational algebra are labeled. However, I got the feeling that in Flink,
> we rather iterate from sources to sink, making the sources root nodes and
> the sinks the leaf nodes. However, I have no clue how it's done in similar
> cases, so please take that hint cautiously.
> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
> 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition,
> which is good, but might be explicitly added.
> 4) The interaction between unaligned checkpoint and EndOfPartition is a
> bit ambiguous: What happens when an unaligned checkpoint is started and
> then one input channel contains the EndOfPartition event? From the
> written description, it sounds to me like, we move back to an aligned
> checkpoint for the whole receiving task. However, that is neither easily
> possible nor necessary. Imho it would be enough to also store the EndOfPartition
> in the channel state.
> 5) I'd expand the recovery section a bit. It would be the first time that
> we recover an incomplete DAG. Afaik the subtasks are deployed before the
> state is recovered, so at some point, the subtasks either need to be
> removed again or maybe we could even avoid them being created in the first
> place.
>
> [1] https://issues.apache.org/jira/browse/FLINK-2491
>
> On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:
> Hi, devs & users
>
> Very sorry for the spoiled formats, I resent the discussion as follows.
>
>
> As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
>         1.
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
>         2.
> The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
>
>
> Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> ------------------Original Mail ------------------
> *Sender:*Yun Gao <[hidden email]>
> *Send Date:*Fri Oct 9 14:16:52 2020
> *Recipients:*Flink Dev <[hidden email]>, User-Flink <
> [hidden email]>
> *Subject:*[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
> Hi, devs & users
>
>
> As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
>
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
>
> The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
>
> Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>

--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi Arvid,
Very thanks for the comments!
>>> 4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.
I also agree with that it would be better to keep the unaligned checkpoints behavior on EndOfPartition, I will then double check on this issue again.

>>> In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).
Exactly, we should not need to persist the output buffers for the completed tasks, and that would simply the implementation a lot.

>>> 5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop.
I also agree with this part. We would keep optimizing the implementation after the first version.

Best,
Yun  




------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 13 (Tue.) 03:39
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.

In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).

5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop.

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao <[hidden email]> wrote:

Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue under the quota:
>> 1) You call the tasks that get the barriers injected leaf nodes, which would make the > sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
>> 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.

I think I should have used a bad name of "leaf nodes", in fact I think we should have the same thoughts that we start with the source nodes to find all the nodes whose precedent nodes are all finished. It would be much better to call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP to change the names to "root nodes".
>> 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
Exactly. When the checkpoint triggers a task but found the task is not there, it may then further check if the task has been finished, if so, it should then re-check its descendants to see if there are new "root nodes" to trigger.
>> 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
Yes, currently barrier alignment would fail the current checkpoint on EndOfPartition, and we would modify the behavior.
>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.

Very thanks for the suggestions on this issue and in fact I did stuck on it for some time. Previously for me one implementation detail issue is that EndOfPartition seems not be able to overtake the previous buffers easily as CheckpointBarrier does, otherwise it might start destroying the input channels if all EndOfPartitions are received.
Therefore, although we could also persistent the channels with EndOfPartition:
1. Start persisting the channels when CheckpointUnaligner received barrier (if not all precendant tasks are finished) or received triggering (if all precendant tasks are finished).
2. The persisting actually stops when onBuffer received EndOfPartition.
After the last channel stopped persisting, CheckpointUnaligner still need to wait till all the previous buffers are processed before complete the allBarriersReceivedFuture. Therefore it would not be able to accelerate the checkpoint in this case.
After some rethinking today currently I think we might inserts some additional virtual events into receivedBuffer when received EndOfPartition and allows these virtual events to overtake the previous buffers. I'll try to double check if it is feasible and let me know if there are also other solutions on this issue :).
> 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.
I also agree that finally we should not "restarted" the finished tasks in some way. It seems not start it in the first place would be better. We should be able to bookkeep additional information in the checkpoint meta about which operators are fully finished, and the scheduler could restore the status of tasks on restoring from previous checkpoints. It would also requires some modification in the task side to support input channels that are finished on starting.
But in the first version, I think we might simplify this issue by still restart all the tasks, but let the finished sources to exit directly? The new Source API would terminate directly since there is no pending splits and the legacy sources would be dealt specially by skipped execution if the source operator is fully finished before. We would be able to turn to the final solution gradually in the next steps.

Best,
Yun

------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 12 (Mon.) 15:38
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.

[1] https://issues.apache.org/jira/browse/FLINK-2491
On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
 ------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

--
Arvid Heise| Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Till Rohrmann
Thanks for starting this discussion Yun Gao,

I have three comments/questions:

1) When restarting all tasks independent of the status at checkpoint time
(finished, running, scheduled), we might allocate more resources than we
actually need to run the remaining job. From a scheduling perspective it
would be easier if we already know that certain subtasks don't need to be
rescheduled. I believe this can be an optimization, though.

2) In the section Compatibility, Deprecation and Migration Plan you
mentioned that you want to record operators in the CompletedCheckpoint
which are fully finished. How will this information be used for
constructing a recovered ExecutionGraph? Why wouldn't the same principle
work for the task level?

3) How will checkpointing work together with fully bounded jobs and FLIP-1
(fine grained recovery)?

Cheers,
Till

On Tue, Oct 13, 2020 at 9:30 AM Yun Gao <[hidden email]> wrote:

> Hi Arvid,
>
> Very thanks for the comments!
>
> >>> 4) Yes, the interaction is not trivial and also I have not completely
> thought it through. But in general, I'm currently at the point where I
> think that we also need non-checkpoint related events in unaligned
> checkpoints. So just keep that in mind, that we might converge anyhow at
> this point.
>
> I also agree with that it would be better to keep the unaligned
> checkpoints behavior on EndOfPartition, I will then double check on this
> issue again.
>
> >>> In general, what is helping in this case is to remember that there no
> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we
> can completely ignore the problem on how to store and restore output
> buffers of a completed task (also important for the next point).
>
> Exactly, we should not need to persist the output buffers for the
> completed tasks, and that would simply the implementation a lot.
>
>
> >>> 5) I think we are on the same page and I completely agree that for
> the MVP/first version, it's completely fine to start and immediately stop.
> A tad better would be even to not even start the procession loop.
>
> I also agree with this part. We would keep optimizing the implementation
> after the first version.
>
>
> Best,
>
> Yun
>
>
>
> ------------------------------------------------------------------
> From:Arvid Heise <[hidden email]>
> Send Time:2020 Oct. 13 (Tue.) 03:39
> To:Yun Gao <[hidden email]>
> Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> Hi Yun,
>
> 4) Yes, the interaction is not trivial and also I have not completely
> thought it through. But in general, I'm currently at the point where I
> think that we also need non-checkpoint related events in unaligned
> checkpoints. So just keep that in mind, that we might converge anyhow at
> this point.
>
> In general, what is helping in this case is to remember that there no
> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we
> can completely ignore the problem on how to store and restore output
> buffers of a completed task (also important for the next point).
>
> 5) I think we are on the same page and I completely agree that for the
> MVP/first version, it's completely fine to start and immediately stop. A
> tad better would be even to not even start the procession loop.
>
> On Mon, Oct 12, 2020 at 6:18 PM Yun Gao <[hidden email]> wrote:
>
> Hi Arvid,
>
> Very thanks for the insightful comments! I added the responses for this
> issue under the quota:
>
> >> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the > sinks the root nodes. That is very similar to how graphs
> in relational algebra are labeled. However, I got the feeling that in
> Flink, we rather iterate from sources to sink, making the sources root
> nodes and the sinks the leaf nodes. However, I have no clue how it's done
> in similar cases, so please take that hint cautiously.
>
> >> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
>
>
> I think I should have used a bad name of "leaf nodes", in fact I think we
> should have the same thoughts that we start with the source nodes to find
> all the nodes whose precedent nodes are all finished. It would be much
> better to call these nodes (which we would trigger) as "root nodes". I'll
> modify the FLIP to change the names to "root nodes".
>
> >> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
>
> Exactly. When the checkpoint triggers a task but found the task is not
> there, it may then further check if the task has been finished, if so, it
> should then re-check its descendants to see if there are new "root nodes"
> to trigger.
>
> >> 3) An implied change is that checkpoints are not aborted anymore at
> EndOfPartition, which is good, but might be explicitly added.
>
> Yes, currently barrier alignment would fail the current checkpoint on
> EndOfPartition, and we would modify the behavior.
>
> >> 4) The interaction between unaligned checkpoint and EndOfPartition is a
> bit ambiguous: What happens when an unaligned checkpoint is started and
> then one input channel contains the EndOfPartition event? From the written
> description, it sounds to me like, we move back to an aligned checkpoint
> for the whole receiving task. However, that is neither easily possible nor
> necessary. Imho it would be enough to also store the EndOfPartition in the
> channel state.
>
>
> Very thanks for the suggestions on this issue and in fact I did stuck on
> it for some time. Previously for me one implementation detail issue is that
> EndOfPartition seems not be able to overtake the previous buffers easily as
> CheckpointBarrier does, otherwise it might start destroying the input
> channels if all EndOfPartitions are received.
>
> Therefore, although we could also persistent the channels with
> EndOfPartition:
>
> 1. Start persisting the channels when CheckpointUnaligner received barrier
> (if not all precendant tasks are finished) or received triggering (if all
> precendant tasks are finished).
>
> 2. The persisting actually stops when onBuffer received EndOfPartition.
>
> After the last channel stopped persisting, CheckpointUnaligner still need
> to wait till all the previous buffers are processed before complete the
> allBarriersReceivedFuture. Therefore it would not be able to accelerate the
> checkpoint in this case.
>
> After some rethinking today currently I think we might inserts some
> additional virtual events into receivedBuffer when received EndOfPartition
> and allows these virtual events to overtake the previous buffers. I'll try
> to double check if it is feasible and let me know if there are also other
> solutions on this issue :).
>
> > 5) I'd expand the recovery section a bit. It would be the first time
> that we recover an incomplete DAG. Afaik the subtasks are deployed before
> the state is recovered, so at some point, the subtasks either need to be
> removed again or maybe we could even avoid them being created in the first
> place.
>
> I also agree that finally we should not "restarted" the finished tasks in
> some way. It seems not start it in the first place would be better. We
> should be able to bookkeep additional information in the checkpoint meta
> about which operators are fully finished, and the scheduler could restore
> the status of tasks on restoring from previous checkpoints. It would also
> requires some modification in the task side to support input channels that
> are finished on starting.
>
> But in the first version, I think we might simplify this issue by still
> restart all the tasks, but let the finished sources to exit directly? The
> new Source API would terminate directly since there is no pending splits
> and the legacy sources would be dealt specially by skipped execution if the
> source operator is fully finished before. We would be able to turn to the
> final solution gradually in the next steps.
>
>
> Best,
>
> Yun
>
> ------------------------------------------------------------------
> From:Arvid Heise <[hidden email]>
> Send Time:2020 Oct. 12 (Mon.) 15:38
> To:Yun Gao <[hidden email]>
> Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> Hi Yun,
>
> Thank you for starting the discussion. This will solve one of the
> long-standing issues [1] that confuse users. I'm also a big fan of option
> 3. It is also a bit closer to Chandy-Lamport again.
>
> A couple of comments:
>
> 1) You call the tasks that get the barriers injected leaf nodes, which
> would make the sinks the root nodes. That is very similar to how graphs in
> relational algebra are labeled. However, I got the feeling that in Flink,
> we rather iterate from sources to sink, making the sources root nodes and
> the sinks the leaf nodes. However, I have no clue how it's done in similar
> cases, so please take that hint cautiously.
> 2) I'd make the algorithm to find the subtasks iterative and react in
> CheckpointCoordinator. Let's assume that we inject the barrier at all root
> subtasks (initially all sources). So in the iterative algorithm, whenever
> root A finishes, it looks at all connected subtasks B if they have any
> upstream task left. If not B becomes a new root. That would require to only
> touch a part of the job graph, but would require some callback from
> JobManager to CheckpointCoordinator.
> 2b) We also need to be careful for out-of-sync updates: if the root is
> about to finish, we could send the barrier to it from
> CheckpointCoordinator, but at the time it arrives, the subtask is finished
> already.
> 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition,
> which is good, but might be explicitly added.
> 4) The interaction between unaligned checkpoint and EndOfPartition is a
> bit ambiguous: What happens when an unaligned checkpoint is started and
> then one input channel contains the EndOfPartition event? From the
> written description, it sounds to me like, we move back to an aligned
> checkpoint for the whole receiving task. However, that is neither easily
> possible nor necessary. Imho it would be enough to also store the EndOfPartition
> in the channel state.
> 5) I'd expand the recovery section a bit. It would be the first time that
> we recover an incomplete DAG. Afaik the subtasks are deployed before the
> state is recovered, so at some point, the subtasks either need to be
> removed again or maybe we could even avoid them being created in the first
> place.
>
> [1] https://issues.apache.org/jira/browse/FLINK-2491
>
> On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:
> Hi, devs & users
>
> Very sorry for the spoiled formats, I resent the discussion as follows.
>
>
> As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
>         1.
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
>         2.
> The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
>
>
> Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> ------------------Original Mail ------------------
> *Sender:*Yun Gao <[hidden email]>
> *Send Date:*Fri Oct 9 14:16:52 2020
> *Recipients:*Flink Dev <[hidden email]>, User-Flink <
> [hidden email]>
> *Subject:*[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
> Hi, devs & users
>
>
> As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
>
> Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5].
>
> The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
>
> Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6].
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
> [4]
> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> [6]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>
>
> --
>
> Arvid Heise| Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi Till,
Very thanks for the feedbacks !
> 1) When restarting all tasks independent of the status at checkpoint time (finished, running, scheduled), we might allocate more resources than we actually need to run the remaining job. From a scheduling perspective it would be easier if we already know that certain subtasks don't need to be rescheduled. I believe this can be an optimization, though.
> 2) In the section Compatibility, Deprecation and Migration Plan you mentioned that you want to record operators in the CompletedCheckpoint which are fully finished. How will this information be used for constructing a recovered ExecutionGraph? Why wouldn't the same principle work for the task level?

I think the first two issues should be related. The main reason that with external checkpoints the checkpoint might taken from one job and used in another jobs, but we do not have a unique ID to match tasks across jobs. Furthermore, users may also change the parallelism of JobVertex, or even modify the graph structures by adding/removing operators or changing the chain relationship between operators.
On the other side, currently Flink already provides custom UID for operators, which makes the operators a stable unit for recovery. The current checkpoints are also organized in the unit of operators to support rescale and job Upgrading.
When restarting from a checkpoint with finished operators, we could only starts the tasks with operators that are not fully finished (namely some subtasks are still running when taking checkpoints). Then during the execution of a single task, we only initialize/open/run/close the operators not fully finished. The Scheduler should be able to compute if a tasks contains not fully finished operators with the current JobGraph and the operator finish states restored from the checkpoints.

> 3) How will checkpointing work together with fully bounded jobs and FLIP-1 (fine grained recovery)?
Currently I think it should be compatible with fully bounded jobs and FLIP-1 since it could be viewed as a completion of the current checkpoint mechanism. Concretely
1. The batch job (with blocking execution mode) should be not affected since checkpoints are not enabled in this case.
2. The bounded job running with pipeline mode would be also supported with checkpoints during it is finishing with the modification. As discussed in the FLIP it should not affect the current behavior after restored for almost all the jobs.
3. The region failover and more fine-grained tasks should also not be affected: similar to the previous behavior, after failover, the failover policy (full/region/fine-grained) decides which tasks to restart and the checkpoint only decides what state are restored for these tasks. The only difference with this modification is that these tasks are now might restored from a checkpoints taken after some tasks are finished. Since the perviously finished tasks would always be skipped by not started or run an empty execution, and the behavior of the previously running tasks should keeps unchanged, the overall behavior should be not affected.


Best,
Yun


------------------------------------------------------------------
From:Till Rohrmann <[hidden email]>
Send Time:2020 Oct. 13 (Tue.) 17:25
To:Yun Gao <[hidden email]>
Cc:Arvid Heise <[hidden email]>; Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion Yun Gao,

I have three comments/questions:

1) When restarting all tasks independent of the status at checkpoint time (finished, running, scheduled), we might allocate more resources than we actually need to run the remaining job. From a scheduling perspective it would be easier if we already know that certain subtasks don't need to be rescheduled. I believe this can be an optimization, though.

2) In the section Compatibility, Deprecation and Migration Plan you mentioned that you want to record operators in the CompletedCheckpoint which are fully finished. How will this information be used for constructing a recovered ExecutionGraph? Why wouldn't the same principle work for the task level?

3) How will checkpointing work together with fully bounded jobs and FLIP-1 (fine grained recovery)?

Cheers,
Till
On Tue, Oct 13, 2020 at 9:30 AM Yun Gao <[hidden email]> wrote:

Hi Arvid,
Very thanks for the comments!
>>> 4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.
I also agree with that it would be better to keep the unaligned checkpoints behavior on EndOfPartition, I will then double check on this issue again.

>>> In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).
Exactly, we should not need to persist the output buffers for the completed tasks, and that would simply the implementation a lot.

>>> 5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop.
I also agree with this part. We would keep optimizing the implementation after the first version.

Best,
Yun  

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao

Hi all,

    I would like to resume this discussion for supporting checkpoints after tasks Finished :) Based on the previous discussion, we now implement a version of PoC [1] to try the idea. During the PoC we also met with some possible issues:

    1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.
    2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

    3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

    4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Glad to hear your opinions!

Best,
 Yun

[1] https://github.com/gaoyunhaii/flink/commits/try_checkpoint_6 , starts from commit f8005be1ab5e5124e981e56db7bdf2908f4a969a.
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek-2
Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we
receive  EndOfPartition while other inputs have already received barrier
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks
and non-source tasks behave differently when it comes to checkpoints.
Are you saying we should fix that or would the new work introduce even
more duplicate code?
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
     Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive  EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes.

      Yes, exactly, I would like to  insert "artificial" barriers for in case we receive  EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern.

Best,
Yun


------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we
receive  EndOfPartition while other inputs have already received barrier
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks
and non-source tasks behave differently when it comes to checkpoints.
Are you saying we should fix that or would the new work introduce even
more duplicate code?

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
   Hi all,

         I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems:

     1. Which operators should wait for one more checkpoint before close ?

        One motivation for this FLIP is to ensure the 2PC sink commits the last part of data before closed, which makes the sink operator need to wait for one more checkpoint like onEndOfInput() -> waitForCheckpoint() -> notifyCheckpointComplete() -> close(). This lead to the issue which operators should wait for checkpoint? Possible options are
                 a. Make all the operators (or UDF) implemented notifyCheckpointCompleted method wait for one more checkpoint. One exception is that since we can only snapshot one or all tasks for a legacy source operator to avoid data repetition[1], we could not support legacy operators and its chained operators to wait for checkpoints since there will be deadlock if part of the tasks are finished, this would finally be solved after legacy source are deprecated. The PoC used this option for now.
                b. Make operators (or UDF) implemented a special marker interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

      Previously I think we could postpone it, however, during testing I found that it might cause some problems since by default checkpoint failure would cause job failover, and the job would also need wait for another interval to trigger the next checkpoint. To pass the tests, I updated the PoC to include this part, and we may have a double think on if we need to include it or use some other options.

3. How to extend a new format for checkpoint meta ?

    Sorry previously I gave a wrong estimation, after I extract a sub-component for (de)serialize operator state, I found the problem just goes to the new OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have different fields, thus they use different process when (de)serialize, which is a bit different from the case that we have a fixed steps and each step has different logic. Thus we might either
     a. Use base classes for each two version.
     b. Or have a unified framework contains all the possible fields across all version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

     Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive  EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes.

      Yes, exactly, I would like to  insert "artificial" barriers for in case we receive  EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern.

Best,
Yun


------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we
receive  EndOfPartition while other inputs have already received barrier
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks
and non-source tasks behave differently when it comes to checkpoints.
Are you saying we should fix that or would the new work introduce even
more duplicate code?


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately.
My gut feeling says that this is something we should only address for new
sinks where we decouple the semantics of commits and checkpoints
anyways. @Aljoscha
Krettek <[hidden email]> any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source
partition that is finished before the first checkpoint. Then, we would need
to store the finished state of the subtask somehow. So I'm assuming, we
still need to trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was
assuming we want to have it more fine-grained on OperatorSubtaskState.
Maybe we can store the flag inside managed or raw state without changing
the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao <[hidden email]> wrote:

>    Hi all,
>
>          I tested the previous PoC with the current tests and I found some
> new issues that might cause divergence, and sorry for there might also be
> some reversal for some previous problems:
>
>
>      1. Which operators should wait for one more checkpoint before close ?
>
>         One motivation for this FLIP is to ensure the 2PC sink commits the
> last part of data before closed, which makes the sink operator need to wait
> for one more checkpoint like onEndOfInput() -> waitForCheckpoint() ->
> notifyCheckpointComplete() -> close(). This lead to the issue which
> operators should wait for checkpoint? Possible options are
>                  a. Make all the operators (or UDF) implemented
> notifyCheckpointCompleted method wait for one more checkpoint. One
> exception is that since we can only snapshot one or all tasks for a legacy
> source operator to avoid data repetition[1], we could not support legacy
> operators and its chained operators to wait for checkpoints since there
> will be deadlock if part of the tasks are finished, this would finally be
> solved after legacy source are deprecated. The PoC used this option for now.
>                 b. Make operators (or UDF) implemented a special marker
> interface to wait for one more checkpoint.
>
>
>    2. Do we need to solve the case that tasks finished before triggered ?
>
>       Previously I think we could postpone it, however, during testing I
> found that it might cause some problems since by default checkpoint failure
> would cause job failover, and the job would also need wait for another
> interval to trigger the next checkpoint. To pass the tests, I updated the
> PoC to include this part, and we may have a double think on if we need to
> include it or use some other options.
>
> 3. How to extend a new format for checkpoint meta ?
>
>     Sorry previously I gave a wrong estimation, after I extract a
> sub-component for (de)serialize operator state, I found the problem just
> goes to the new OperatorStateSerializer. The problem seems to be that v2,
> v3 and v4 have different fields, thus they use different process when
> (de)serialize, which is a bit different from the case that we have a fixed
> steps and each step has different logic. Thus we might either
>      a. Use base classes for each two version.
>      b. Or have a unified framework contains all the possible fields
> across all version, and use empty field serializer to skip some fields in
> each version.
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks
>
> ------------------------------------------------------------------
> From:Yun Gao <[hidden email]>
> Send Time:2020 Dec. 16 (Wed.) 11:07
> To:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>;
> user <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>      Hi Aljoscha,
>
>         Very thanks for the feedbacks! For the remaining issues:
>
>
>       > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive  EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes.
>
>
>       Yes, exactly, I would like to  insert "artificial" barriers for in case we receive  EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet.
>
>
>      > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible?
>
>
>     I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions!
>
>
>    > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even
> more duplicate code?
>
>
>   Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern.
>
> Best,
> Yun
>
>
> ------------------------------------------------------------------
> From:Aljoscha Krettek <[hidden email]>
> Send Time:2020 Dec. 15 (Tue.) 18:11
> To:dev <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> Thanks for the thorough update! I'll answer inline.
>
> On 14.12.20 16:33, Yun Gao wrote:
>
> >      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.
>
> You mean we would insert "artificial" barriers for barrier 2 in case we
> receive  EndOfPartition while other inputs have already received barrier
> 2? I think that makes sense, yes.
>
>
> >      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.
>
> I think this should be completely fine.
>
>
> >      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.
>
> This indeed seems complex. Maybe we could switch to using composition
> instead of inheritance to make this more extensible?
>
>
> >      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.
>
> Don't we currently have the same problem? Even right now source tasks
> and non-source tasks behave differently when it comes to checkpoints.
> Are you saying we should fix that or would the new work introduce even
> more duplicate code?
>
>
>

--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
     Hi Avrid,

         Very thanks for the feedbacks!

         For the second issue, sorry I think I might not make it very clear, I'm initially thinking the case that for example for a job with graph A -> B -> C, when we compute which tasks to trigger, A is still running, so we trigger A to start the checkpoint. However, before the triggering message reached A, A gets finished and the trigger message failed due to not found the task. In this case if we do not handle it, the checkpoint would failed due to timeout. However, by default failed checkpoint would cause job failure and we would also need to wait for a checkpoint interval for the next checkpoint. One solution would be check all the pending checkpoints to trigger B instead when JM is notified that A is finished.

       For the third issue, it should work if we store a special value for some filed in OperatorState or OperatorSubtaskState, for example, we might store a special subtaskState map inside the OperatorState to mark it is finished since the finished operator should always have an empty state. Very thanks for the advices! I'll try with this method.

Best,
 Yun



------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2021 Jan. 5 (Tue.) 17:16
To:Yun Gao <[hidden email]>
Cc:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately. My gut feeling says that this is something we should only address for new sinks where we decouple the semantics of commits and checkpoints anyways. @Aljoscha Krettek any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source partition that is finished before the first checkpoint. Then, we would need to store the finished state of the subtask somehow. So I'm assuming, we still need to trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we can store the flag inside managed or raw state without changing the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao <[hidden email]> wrote:

   Hi all,

         I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems:

     1. Which operators should wait for one more checkpoint before close ?

        One motivation for this FLIP is to ensure the 2PC sink commits the last part of data before closed, which makes the sink operator need to wait for one more checkpoint like onEndOfInput() -> waitForCheckpoint() -> notifyCheckpointComplete() -> close(). This lead to the issue which operators should wait for checkpoint? Possible options are
                 a. Make all the operators (or UDF) implemented notifyCheckpointCompleted method wait for one more checkpoint. One exception is that since we can only snapshot one or all tasks for a legacy source operator to avoid data repetition[1], we could not support legacy operators and its chained operators to wait for checkpoints since there will be deadlock if part of the tasks are finished, this would finally be solved after legacy source are deprecated. The PoC used this option for now.
                b. Make operators (or UDF) implemented a special marker interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

      Previously I think we could postpone it, however, during testing I found that it might cause some problems since by default checkpoint failure would cause job failover, and the job would also need wait for another interval to trigger the next checkpoint. To pass the tests, I updated the PoC to include this part, and we may have a double think on if we need to include it or use some other options.

3. How to extend a new format for checkpoint meta ?

    Sorry previously I gave a wrong estimation, after I extract a sub-component for (de)serialize operator state, I found the problem just goes to the new OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have different fields, thus they use different process when (de)serialize, which is a bit different from the case that we have a fixed steps and each step has different logic. Thus we might either
     a. Use base classes for each two version.
     b. Or have a unified framework contains all the possible fields across all version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks

------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

     Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive  EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes.

      Yes, exactly, I would like to  insert "artificial" barriers for in case we receive  EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern.

Best,
Yun


------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we
receive  EndOfPartition while other inputs have already received barrier
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks
and non-source tasks behave differently when it comes to checkpoints.
Are you saying we should fix that or would the new work introduce even
more duplicate code?




--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek-2
In reply to this post by Arvid Heise-3
On 2021/01/05 10:16, Arvid Heise wrote:
>1. I'd think that this is an orthogonal issue, which I'd solve separately.
>My gut feeling says that this is something we should only address for new
>sinks where we decouple the semantics of commits and checkpoints
>anyways. @Aljoscha
>Krettek <[hidden email]> any idea on this one?

I also think it's somewhat orthogonal, let's see where we land here once
the other issues are hammered out.

>2. I'm not sure I get it completely. Let's assume we have a source
>partition that is finished before the first checkpoint. Then, we would need
>to store the finished state of the subtask somehow. So I'm assuming, we
>still need to trigger some checkpointing code on finished subtasks.

What he's talking about here is the race condition between a) checkpoint
coordinator decides to do a checkpoint and b) a source operator shuts
down.

Normally, the checkpoint coordinator only needs to trigger sources, and
not intermediate operators. When we allow sources to shut down,
intermediate operators now can become the "head" of a pipeline and
become the things that need to be triggered.

One thought here is this: will there ever be intermediate operators that
should be running that are not connected to at least once source? The
only case I can think of right now is async I/O. Or are there others? If
we think that there will never be intermediate operators that are not
connected to at least once source we might come up with a simpler
solution.

>3. Do we really want to store the finished flag in OperatorState? I was
>assuming we want to have it more fine-grained on OperatorSubtaskState.
>Maybe we can store the flag inside managed or raw state without changing
>the format?

I think we cannot store it in `OperatorSubtaskState` because of how
operator state (the actual `ListState` that operators use) is reshuffled
on restore to all operators. So normally it doesn't make sense to say
that one of the subtasks is done when operator state is involved. Only
when all subtasks are done can we record this operator as done, I think.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
     Hi Aljoscha,

         Very thanks for the feedbacks!

         For the second issue, I'm indeed thinking the race condition between deciding to trigger and operator get finished. And for this point,
 > One thought here is this: will there ever be intermediate operators that
> should be running that are not connected to at least once source? The
> only case I can think of right now is async I/O. Or are there others? If
> we think that there will never be intermediate operators that are not
> connected to at least once source we might come up with a simpler
> solution.
     I think there are still cases that the intermediate operators runs with all its sources have finished, for example, source -> sink writer -> sink committer -> sink global committer,  since sink committer need to wait for one more checkpoint between endOfInput and close,
it would continue to run after the source and sink writer are finished, until we could finish one checkpoint. And since the four operators could also be chained in one task, we may also need to consider the case that part of operators are finished when taking snapshot in
of the tasks.

   Best,
    Yun



------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2021 Jan. 5 (Tue.) 22:34
To:dev <[hidden email]>
Cc:Yun Gao <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

On 2021/01/05 10:16, Arvid Heise wrote:
>1. I'd think that this is an orthogonal issue, which I'd solve separately.
>My gut feeling says that this is something we should only address for new
>sinks where we decouple the semantics of commits and checkpoints
>anyways. @Aljoscha
>Krettek <[hidden email]> any idea on this one?

I also think it's somewhat orthogonal, let's see where we land here once
the other issues are hammered out.

>2. I'm not sure I get it completely. Let's assume we have a source
>partition that is finished before the first checkpoint. Then, we would need
>to store the finished state of the subtask somehow. So I'm assuming, we
>still need to trigger some checkpointing code on finished subtasks.

What he's talking about here is the race condition between a) checkpoint
coordinator decides to do a checkpoint and b) a source operator shuts
down.

Normally, the checkpoint coordinator only needs to trigger sources, and
not intermediate operators. When we allow sources to shut down,
intermediate operators now can become the "head" of a pipeline and
become the things that need to be triggered.

One thought here is this: will there ever be intermediate operators that
should be running that are not connected to at least once source? The
only case I can think of right now is async I/O. Or are there others? If
we think that there will never be intermediate operators that are not
connected to at least once source we might come up with a simpler
solution.

>3. Do we really want to store the finished flag in OperatorState? I was
>assuming we want to have it more fine-grained on OperatorSubtaskState.
>Maybe we can store the flag inside managed or raw state without changing
>the format?

I think we cannot store it in `OperatorSubtaskState` because of how
operator state (the actual `ListState` that operators use) is reshuffled
on restore to all operators. So normally it doesn't make sense to say
that one of the subtasks is done when operator state is involved. Only
when all subtasks are done can we record this operator as done, I think.

Best,
Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
For 2) the race condition, I was more thinking of still injecting the
barrier at the source in all cases, but having some kind of short-cut to
immediately execute the RPC inside the respective taskmanager. However,
that may prove hard in case of dynamic scale-ins. Nevertheless, because of
this race condition, we should still take some time to think about it as it
effectively means we need to support handling a barrier in a finished task
anyways. Maybe a finished task is still assigned to a TM with JM as a
fallback?

For your question: will there ever be intermediate operators that should be
running that are not connected to at least once source?
I think there are plenty of examples if you go beyond chained operators and
fully connected exchanges. Think of any fan-in, let's assume you have
source S1...S4, with S1+S2->M1, and S3+S4->M2. If S1 is finished, S2 and M1
is still running. Or I didn't get your question ;).

On Tue, Jan 5, 2021 at 5:00 PM Yun Gao <[hidden email]> wrote:

>      Hi Aljoscha,
>
>          Very thanks for the feedbacks!
>
>          For the second issue, I'm indeed thinking the race condition
> between deciding to trigger and operator get finished. And for this point,
>
>  >
> One thought here is this: will there ever be intermediate operators that
> > should be running that are not connected to at least once source? The
> > only case I can think of right now is async I/O. Or are there others? If
> > we think that there will never be intermediate operators that are not
> > connected to at least once source we might come up with a simpler
> > solution.
>
>      I think there are still cases that the intermediate operators runs
> with all its sources have finished, for example, source -> sink writer ->
> sink committer -> sink global committer,  since sink committer need to wait
> for one more checkpoint between endOfInput and close,
> it would continue to run after the source and sink writer are finished,
> until we could finish one checkpoint. And since the four operators could
> also be chained in one task, we may also need to consider the case that
> part of operators are finished when taking snapshot in
> of the tasks.
>
>    Best,
>     Yun
>
>
> ------------------------------------------------------------------
> From:Aljoscha Krettek <[hidden email]>
> Send Time:2021 Jan. 5 (Tue.) 22:34
> To:dev <[hidden email]>
> Cc:Yun Gao <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> On 2021/01/05 10:16, Arvid Heise wrote:
> >1. I'd think that this is an orthogonal issue, which I'd solve separately.
> >My gut feeling says that this is something we should only address for new
> >sinks where we decouple the semantics of commits and checkpoints
> >anyways. @Aljoscha
> >Krettek <[hidden email]> any idea on this one?
>
> I also think it's somewhat orthogonal, let's see where we land here once
> the other issues are hammered out.
>
> >2. I'm not sure I get it completely. Let's assume we have a source
>
> >partition that is finished before the first checkpoint. Then, we would need
> >to store the finished state of the subtask somehow. So I'm assuming, we
> >still need to trigger some checkpointing code on finished subtasks.
>
> What he's talking about here is the race condition between a) checkpoint
> coordinator decides to do a checkpoint and b) a source operator shuts
> down.
>
> Normally, the checkpoint coordinator only needs to trigger sources, and
> not intermediate operators. When we allow sources to shut down,
> intermediate operators now can become the "head" of a pipeline and
> become the things that need to be triggered.
>
> One thought here is this: will there ever be intermediate operators that
> should be running that are not connected to at least once source? The
> only case I can think of right now is async I/O. Or are there others? If
> we think that there will never be intermediate operators that are not
> connected to at least once source we might come up with a simpler
> solution.
>
> >3. Do we really want to store the finished flag in OperatorState? I was
> >assuming we want to have it more fine-grained on OperatorSubtaskState.
> >Maybe we can store the flag inside managed or raw state without changing
> >the format?
>
> I think we cannot store it in `OperatorSubtaskState` because of how
> operator state (the actual `ListState` that operators use) is reshuffled
> on restore to all operators. So normally it doesn't make sense to say
> that one of the subtasks is done when operator state is involved. Only
> when all subtasks are done can we record this operator as done, I think.
>
> Best,
> Aljoscha
>
>
>

--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
     Hi Arvid,

          Very thanks for the feedbacks!

         > For 2) the race condition, I was more thinking of still injecting the
         > barrier at the source in all cases, but having some kind of short-cut to
         > immediately execute the RPC inside the respective taskmanager. However,
         > that may prove hard in case of dynamic scale-ins. Nevertheless, because of
         > this race condition, we should still take some time to think about it as it
         > effectively means we need to support handling a barrier in a finished task
         > anyways. Maybe a finished task is still assigned to a TM with JM as a
         > fallback?
    For faked finished tasks, I have some concerns that if the faked finished tasks reside in the JM side, there should still be the race condition between triggering
and tasks get finished, and if the faked finished tasks reside in the TM side, we would have to keep consider these tasks in scheduler when failover happens.
    Besides, we would also need to keep the channels between the faked finished tasks and normal tasks to pass the checkpoint barriers, this would have some conflicts with
    the current tasks' lifecycle since we still need to keep channels open and send messages after EndOfPartitions are sent. If we have mixed jobs with both bounded and
 unbounded sources, the left network channels would not have a chance to get closed.

   Best,
    Yun


------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2021 Jan. 6 (Wed.) 00:28
To:Yun Gao <[hidden email]>
Cc:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

For 2) the race condition, I was more thinking of still injecting the
barrier at the source in all cases, but having some kind of short-cut to
immediately execute the RPC inside the respective taskmanager. However,
that may prove hard in case of dynamic scale-ins. Nevertheless, because of
this race condition, we should still take some time to think about it as it
effectively means we need to support handling a barrier in a finished task
anyways. Maybe a finished task is still assigned to a TM with JM as a
fallback?

For your question: will there ever be intermediate operators that should be
running that are not connected to at least once source?
I think there are plenty of examples if you go beyond chained operators and
fully connected exchanges. Think of any fan-in, let's assume you have
source S1...S4, with S1+S2->M1, and S3+S4->M2. If S1 is finished, S2 and M1
is still running. Or I didn't get your question ;).

On Tue, Jan 5, 2021 at 5:00 PM Yun Gao <[hidden email]> wrote:

>      Hi Aljoscha,
>
>          Very thanks for the feedbacks!
>
>          For the second issue, I'm indeed thinking the race condition
> between deciding to trigger and operator get finished. And for this point,
>
>  >
> One thought here is this: will there ever be intermediate operators that
> > should be running that are not connected to at least once source? The
> > only case I can think of right now is async I/O. Or are there others? If
> > we think that there will never be intermediate operators that are not
> > connected to at least once source we might come up with a simpler
> > solution.
>
>      I think there are still cases that the intermediate operators runs
> with all its sources have finished, for example, source -> sink writer ->
> sink committer -> sink global committer,  since sink committer need to wait
> for one more checkpoint between endOfInput and close,
> it would continue to run after the source and sink writer are finished,
> until we could finish one checkpoint. And since the four operators could
> also be chained in one task, we may also need to consider the case that
> part of operators are finished when taking snapshot in
> of the tasks.
>
>    Best,
>     Yun
>
>
> ------------------------------------------------------------------
> From:Aljoscha Krettek <[hidden email]>
> Send Time:2021 Jan. 5 (Tue.) 22:34
> To:dev <[hidden email]>
> Cc:Yun Gao <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> On 2021/01/05 10:16, Arvid Heise wrote:
> >1. I'd think that this is an orthogonal issue, which I'd solve separately.
> >My gut feeling says that this is something we should only address for new
> >sinks where we decouple the semantics of commits and checkpoints
> >anyways. @Aljoscha
> >Krettek <[hidden email]> any idea on this one?
>
> I also think it's somewhat orthogonal, let's see where we land here once
> the other issues are hammered out.
>
> >2. I'm not sure I get it completely. Let's assume we have a source
>
> >partition that is finished before the first checkpoint. Then, we would need
> >to store the finished state of the subtask somehow. So I'm assuming, we
> >still need to trigger some checkpointing code on finished subtasks.
>
> What he's talking about here is the race condition between a) checkpoint
> coordinator decides to do a checkpoint and b) a source operator shuts
> down.
>
> Normally, the checkpoint coordinator only needs to trigger sources, and
> not intermediate operators. When we allow sources to shut down,
> intermediate operators now can become the "head" of a pipeline and
> become the things that need to be triggered.
>
> One thought here is this: will there ever be intermediate operators that
> should be running that are not connected to at least once source? The
> only case I can think of right now is async I/O. Or are there others? If
> we think that there will never be intermediate operators that are not
> connected to at least once source we might come up with a simpler
> solution.
>
> >3. Do we really want to store the finished flag in OperatorState? I was
> >assuming we want to have it more fine-grained on OperatorSubtaskState.
> >Maybe we can store the flag inside managed or raw state without changing
> >the format?
>
> I think we cannot store it in `OperatorSubtaskState` because of how
> operator state (the actual `ListState` that operators use) is reshuffled
> on restore to all operators. So normally it doesn't make sense to say
> that one of the subtasks is done when operator state is involved. Only
> when all subtasks are done can we record this operator as done, I think.
>
> Best,
> Aljoscha
>
>
>

--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek-2
In reply to this post by Arvid Heise-3
On 2021/01/05 17:27, Arvid Heise wrote:
>For your question: will there ever be intermediate operators that should be
>running that are not connected to at least once source?
>I think there are plenty of examples if you go beyond chained operators and
>fully connected exchanges. Think of any fan-in, let's assume you have
>source S1...S4, with S1+S2->M1, and S3+S4->M2. If S1 is finished, S2 and M1
>is still running. Or I didn't get your question ;).

I was referring to the case where intermediate operators don't have any
active upstream (input) operators. In that case, they basically become
the "source" of that part of the graph. In your example, M1 is still
connected to a "real" source.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
>
> I was referring to the case where intermediate operators don't have any
> active upstream (input) operators. In that case, they basically become
> the "source" of that part of the graph. In your example, M1 is still
> connected to a "real" source.


I'm assuming that this is the normal case. In a A->B graph, as soon as A
finishes, B still has a couple of input buffers to process. If you add
backpressure or longer pipelines into the mix, it's quite likely that a
checkpoint may occur with B being the head.

    For faked finished tasks, I have some concerns that if the faked

> finished tasks reside in the JM side, there should still be the race
> condition between triggering
>     and tasks get finished, and if the faked finished tasks reside in the
> TM side, we would have to keep consider these tasks in scheduler when
> failover happens.
>     Besides, we would also need to keep the channels between the faked
> finished tasks and normal tasks to pass the checkpoint barriers, this would
> have some conflicts with
>     the current tasks' lifecycle since we still need to keep channels open
> and send messages after EndOfPartitions are sent. If we have mixed jobs
> with both bounded and
>      unbounded sources, the left network channels would not have a chance
> to get closed.
>

These are all valid concerns but I fear that if we don't find a solution to
them, we will not have a reliable system (cancelling checkpoints when
encountering this race condition with a higher DOP).

Let me clarify that faked finished tasks should reside on the TM that they
previously lived. Only through some kind of job stealing that is necessary
for dynamic rescaling they may end up in JM (that's all far down the road).

I have not thought about channels, but I think you are right that channels
should be strictly bound to the life-cycle of a subtask. The question is if
fake finished tasks do need to use channels at all. We could also relay RPC
calls from TM to TM.

For all practical purposes on checkpoint barrier alignment, EndOfPartitions
should make the channel being excluded from alignment (the respective
channel has implicitly received all future barriers).

On Wed, Jan 6, 2021 at 10:46 AM Aljoscha Krettek <[hidden email]>
wrote:

> On 2021/01/05 17:27, Arvid Heise wrote:
> >For your question: will there ever be intermediate operators that should
> be
> >running that are not connected to at least once source?
> >I think there are plenty of examples if you go beyond chained operators
> and
> >fully connected exchanges. Think of any fan-in, let's assume you have
> >source S1...S4, with S1+S2->M1, and S3+S4->M2. If S1 is finished, S2 and
> M1
> >is still running. Or I didn't get your question ;).
>
> I was referring to the case where intermediate operators don't have any
> active upstream (input) operators. In that case, they basically become
> the "source" of that part of the graph. In your example, M1 is still
> connected to a "real" source.
>
> Best,
> Aljoscha
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
1234