On 2021/01/06 11:30, Arvid Heise wrote:
>I'm assuming that this is the normal case. In a A->B graph, as soon as A >finishes, B still has a couple of input buffers to process. If you add >backpressure or longer pipelines into the mix, it's quite likely that a >checkpoint may occur with B being the head. Ahh, I think I know what you mean. This can happen when the checkpoint coordinator issues concurrent checkpoint without waiting for older ones to finish. My head is mostly operating under the premise that there is at most one concurrent checkpoint. In the current code base the race conditions that Yun and I are talking about cannot occur. Checkpoints can only be triggered at sources and they will then travel through the graph. Intermediate operators are never directly triggered from the JobManager/CheckpointCoordinator. When source start to shut down, the JM has to directly inject/trigger checkpoints at the now new "sources" of the graph, which have previously been intermediate operators. I want to repeat that I have a suspicion that maybe this is a degenerate case and we never want to allow operators to be doing checkpoints when they are not connected to at least one running source. Which means that we have to find a solution for declined checkpoints, missing sources. I'll first show an example where I think we will never have intermediate operators running without the sources being running: Source -> Map -> Sink Here, when the Source does its final checkpoint and then shuts down, that same final checkpoint would travel downstream ahead of the EOF, which would in turn cause Map and Sink to also shut down. *We can't have the case that Map is still running when we want to take a checkpoint and Source is not running*. A similar case is this one: Source1 --+ |->Map -> Sink Source2 --+ Here, if Source1 is finished but Source2 is not, Map is still connected to at least one upstream source that is still running. Again. Map would never be running and doing checkpoints if neither of Source1 or Source2 are online. The cases I see where intermediate operators would keep running despite not being connected to any upstream operators are when we purposefully keep an operator online despite all inputs having seen EOF. One example is async I/O, another is what Yun mentioned where a sink might want to wait for another checkpoint to confirm some data. Example: Source -> Async I/O -> Sink Here, Async I/O will stay online as long as there are some scheduled requests outstanding, even when the Source has shut down. In those cases, the checkpoint coordinator would have to trigger new checkpoints at Async I/O and not Source, because it has become the new "head" of the graph. For Async I/O at least, we could say that the operator will wait for all outstanding requests to finish before it allows the final checkpoint and passes the barrier forward. Best, Aljoscha |
I was actually not thinking about concurrent checkpoints (and actually want
to get rid of them once UC is established, since they are addressing the same thing). But your explanation definitely helped me to better understand the race condition. However, I have the impression that you think mostly in terms of tasks and I mostly think in terms of subtasks. I especially want to have proper support for bounded sources where one partition is much larger than the other partitions (might be in conjunction with unbounded sources such that checkpointing is plausible to begin with). Hence, most of the subtasks are finished with one struggler remaining. In this case, the barriers are inserted now only in the struggling source subtask and potentially in any running downstream subtask. As far as I have understood, this would require barriers to be inserted downstream leading to similar race conditions. I'm also concerned about the notion of a final checkpoint. What happens when this final checkpoint times out (checkpoint timeout > async timeout) or fails for a different reason? I'm currently more inclined to just let checkpoints work until the whole graph is completed (and thought this was the initial goal of the whole FLIP to being with). However, that would require subtasks to stay alive until they receive checkpiontCompleted callback (which is currently also not guaranteed)... On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <[hidden email]> wrote: > On 2021/01/06 11:30, Arvid Heise wrote: > >I'm assuming that this is the normal case. In a A->B graph, as soon as A > >finishes, B still has a couple of input buffers to process. If you add > >backpressure or longer pipelines into the mix, it's quite likely that a > >checkpoint may occur with B being the head. > > Ahh, I think I know what you mean. This can happen when the checkpoint > coordinator issues concurrent checkpoint without waiting for older ones > to finish. My head is mostly operating under the premise that there is > at most one concurrent checkpoint. > > In the current code base the race conditions that Yun and I are talking > about cannot occur. Checkpoints can only be triggered at sources and > they will then travel through the graph. Intermediate operators are > never directly triggered from the JobManager/CheckpointCoordinator. > > When source start to shut down, the JM has to directly inject/trigger > checkpoints at the now new "sources" of the graph, which have previously > been intermediate operators. > > I want to repeat that I have a suspicion that maybe this is a degenerate > case and we never want to allow operators to be doing checkpoints when > they are not connected to at least one running source. Which means that > we have to find a solution for declined checkpoints, missing sources. > > I'll first show an example where I think we will never have intermediate > operators running without the sources being running: > > Source -> Map -> Sink > > Here, when the Source does its final checkpoint and then shuts down, > that same final checkpoint would travel downstream ahead of the EOF, > which would in turn cause Map and Sink to also shut down. *We can't have > the case that Map is still running when we want to take a checkpoint and > Source is not running*. > > A similar case is this one: > > Source1 --+ > |->Map -> Sink > Source2 --+ > > Here, if Source1 is finished but Source2 is not, Map is still connected > to at least one upstream source that is still running. Again. Map would > never be running and doing checkpoints if neither of Source1 or Source2 > are online. > > The cases I see where intermediate operators would keep running despite > not being connected to any upstream operators are when we purposefully > keep an operator online despite all inputs having seen EOF. One example > is async I/O, another is what Yun mentioned where a sink might want to > wait for another checkpoint to confirm some data. Example: > > Source -> Async I/O -> Sink > > Here, Async I/O will stay online as long as there are some scheduled > requests outstanding, even when the Source has shut down. In those > cases, the checkpoint coordinator would have to trigger new checkpoints > at Async I/O and not Source, because it has become the new "head" of the > graph. > > For Async I/O at least, we could say that the operator will wait for all > outstanding requests to finish before it allows the final checkpoint and > passes the barrier forward. > > Best, > Aljoscha > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Arvid,
Very thanks for the feedbacks! I'll try to answer the questions inline: > I'm also concerned about the notion of a final checkpoint. What happens > when this final checkpoint times out (checkpoint timeout > async timeout) > or fails for a different reason? I'm currently more inclined to just let > checkpoints work until the whole graph is completed (and thought this was > the initial goal of the whole FLIP to being with). I think we are still on the same page that we would like to trigger checkpoint periodically until the whole job is finished. I think in generaly we do not must force the checkpoint aligned with subtask finished, namely for example one operator might have the lifecycle that "taking one checkpoint -> emit some records -> taking another checkpoint -> emit more records -> finish", and do not need to must have to wait for one more checkpoint before finished. The second checkpoint just happens to be the "final" checkpoint of this operator. The only exception is that for sink operator that must wait for one more checkpoint to commit the last piece of data before finished, this kind of operators would be dealt with separately to force them to wait for checkpont before finished. > However, I have the impression that you think mostly in terms of tasks and > I mostly think in terms of subtasks. I especially want to have proper > support for bounded sources where one partition is much larger than the > other partitions (might be in conjunction with unbounded sources such that > checkpointing is plausible to begin with). Hence, most of the subtasks are > finished with one struggler remaining. In this case, the barriers are > inserted now only in the struggling source subtask and potentially in any > running downstream subtask. > As far as I have understood, this would require barriers to be inserted > downstream leading to similar race conditions. I might not fully understand the issue, but I'd like to further detail the expected process here: Source (subtask 0) ---+ | Source (subtask 1) ---+--> Async I/O (subtask 0) -> Sink (subtask 0). | Source (subtask 2) ---+ The async I/O subtask would have three input channels. case 1) Support source subtask 0 and 1 are finished and the Async I/O would received EndOfPartition from the corresponding channels. Now we happen to trigger a checkpoint, we in the remaining execution graph, the subtask 2 is the "source" of the graph. Then we would trigger source subtask 2 to start the checkpoint, source subtask 2 takes snapshot and emit barriers to Async I/O sutask. Async I/O subtask would found that 2/3 of its input channels have received Eof and received barrier from the remaining channel, then it knows the barriers are aligned, then it takes the snapshot and emit the barrier to the sink subtasks. case 2) Suppose the job continue to run and now source subtask 2 is also finished and now we are going to take another checkpoint, then we found that in the remaining execution graph the new "source" now is the Async I/O subtask. Then we would trigger this Async I/O instead (this is different from the current implementation). The Async I/O received the trigger and take its snapshot and emit barrier to the following sink subtask. (Of couse here the Async I/O subtask should have some method to wait till it received EoF from all the input channels before taking snapshot to keep consistent, but I think we could ignore the detail implementations first). For the race condition, it might happen if a) in case 1, the CheckpontCoordinator trigger Source subtask 2, but source subtask 2 report finished before the trigger RPC gets into the resided TaskManager. b) in case 2, the CheckpointCoordinator trigger Async I/O, but Async I/O subtask report finished before the trigger RPC gets into the resided TaskManager. In this case, if we do not deal with specially, based on the current implementation, the trigger RPC would just be ignored, and the checkpoint would finally failed due to timeout since no tasks would report its state. But we would be able to remedy this checkpont: since the Source subtask 2 and the Async I/O subtask would report FINISHED status to JobMaster after we tries to trigger the tasks, and before the task has reported its snapshot for this checkpont. The CheckpontCoordinator would listen to the notification, when it received the notification, it would iterates its pending checkpoints to see if it has trigger this task but received FINISHED before its snapshot. If so, it would recompute the subtasks to trigger, and re-trigger the following tasks. Of couse this is one possible implementation and we might have other solutions to this problem. Do you think the process would still have some problems ? > However, that would > require subtasks to stay alive until they receive checkpiontCompleted > callback (which is currently also not guaranteed) With the above process, I think the task would do not need to wait for receiving the checkpontCompleted callback? If it finished, the above process would try to trigger its following tasks. Best, Yun ------------------Original Mail ------------------ Sender:Arvid Heise <[hidden email]> Send Date:Wed Jan 6 20:42:56 2021 Recipients:Aljoscha Krettek <[hidden email]> CC:dev <[hidden email]> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished I was actually not thinking about concurrent checkpoints (and actually want to get rid of them once UC is established, since they are addressing the same thing). But your explanation definitely helped me to better understand the race condition. However, I have the impression that you think mostly in terms of tasks and I mostly think in terms of subtasks. I especially want to have proper support for bounded sources where one partition is much larger than the other partitions (might be in conjunction with unbounded sources such that checkpointing is plausible to begin with). Hence, most of the subtasks are finished with one struggler remaining. In this case, the barriers are inserted now only in the struggling source subtask and potentially in any running downstream subtask. As far as I have understood, this would require barriers to be inserted downstream leading to similar race conditions. I'm also concerned about the notion of a final checkpoint. What happens when this final checkpoint times out (checkpoint timeout > async timeout) or fails for a different reason? I'm currently more inclined to just let checkpoints work until the whole graph is completed (and thought this was the initial goal of the whole FLIP to being with). However, that would require subtasks to stay alive until they receive checkpiontCompleted callback (which is currently also not guaranteed)... On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <[hidden email]> wrote: > On 2021/01/06 11:30, Arvid Heise wrote: > >I'm assuming that this is the normal case. In a A->B graph, as soon as A > >finishes, B still has a couple of input buffers to process. If you add > >backpressure or longer pipelines into the mix, it's quite likely that a > >checkpoint may occur with B being the head. > > Ahh, I think I know what you mean. This can happen when the checkpoint > coordinator issues concurrent checkpoint without waiting for older ones > to finish. My head is mostly operating under the premise that there is > at most one concurrent checkpoint. > > In the current code base the race conditions that Yun and I are talking > about cannot occur. Checkpoints can only be triggered at sources and > they will then travel through the graph. Intermediate operators are > never directly triggered from the JobManager/CheckpointCoordinator. > > When source start to shut down, the JM has to directly inject/trigger > checkpoints at the now new "sources" of the graph, which have previously > been intermediate operators. > > I want to repeat that I have a suspicion that maybe this is a degenerate > case and we never want to allow operators to be doing checkpoints when > they are not connected to at least one running source. Which means that > we have to find a solution for declined checkpoints, missing sources. > > I'll first show an example where I think we will never have intermediate > operators running without the sources being running: > > Source -> Map -> Sink > > Here, when the Source does its final checkpoint and then shuts down, > that same final checkpoint would travel downstream ahead of the EOF, > which would in turn cause Map and Sink to also shut down. *We can't have > the case that Map is still running when we want to take a checkpoint and > Source is not running*. > > A similar case is this one: > > Source1 --+ > |->Map -> Sink > Source2 --+ > > Here, if Source1 is finished but Source2 is not, Map is still connected > to at least one upstream source that is still running. Again. Map would > never be running and doing checkpoints if neither of Source1 or Source2 > are online. > > The cases I see where intermediate operators would keep running despite > not being connected to any upstream operators are when we purposefully > keep an operator online despite all inputs having seen EOF. One example > is async I/O, another is what Yun mentioned where a sink might want to > wait for another checkpoint to confirm some data. Example: > > Source -> Async I/O -> Sink > > Here, Async I/O will stay online as long as there are some scheduled > requests outstanding, even when the Source has shut down. In those > cases, the checkpoint coordinator would have to trigger new checkpoints > at Async I/O and not Source, because it has become the new "head" of the > graph. > > For Async I/O at least, we could say that the operator will wait for all > outstanding requests to finish before it allows the final checkpoint and > passes the barrier forward. > > Best, > Aljoscha > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Yun,
thanks for the detailed example. It feels like Aljoscha and you are also not fully aligned yet. For me, it sounded as if Aljoscha would like to avoid sending RPC to non-source subtasks. I think we are still on the same page that we would like to trigger > checkpoint periodically until the whole job is finished. > I think in generaly we do not must force the checkpoint aligned with > subtask finished, namely for example one operator > might have the lifecycle that "taking one checkpoint -> emit some records > -> taking another checkpoint -> emit more records -> finish", > and do not need to must have to wait for one more checkpoint before > finished. The second checkpoint just happens to be the "final" checkpoint > of this operator. > The only exception is that for sink operator that must wait for one more > checkpoint to commit the last piece of data before finished, this kind of > operators > would be dealt with separately to force them to wait for checkpont before > finished > Yes that sounds good. I was concerned of any "holding back" of barriers in async I/O. I'd just hold back the EOP until all async threads finished and forward barriers in the normal way. That would then also be my solution for sinks - hold back EOP (=finish) until checkpoint is done. My concern here is still that we would need to have a reliable mechanism to notify checkpoint completed. Maybe we can use the GlobalCommitter? In this case, if we do not deal with specially, based on the current > implementation, the trigger RPC would just be ignored, and the checkpoint > would finally > failed due to timeout since no tasks would report its state. But we would > be able to remedy this checkpont: since the Source subtask 2 and the Async > I/O > subtask would report FINISHED status to JobMaster after we tries to > trigger the tasks, and before the task has reported its snapshot for this > checkpont. > The CheckpontCoordinator would listen to the notification, when it > received the notification, it would iterates its pending checkpoints to see > if it has trigger this task but received FINISHED before its snapshot. If > so, it would recompute the subtasks to trigger, and re-trigger the > following tasks. > Of couse this is one possible implementation and we might have other > solutions to this problem. Do you think the process would still have some > problems ? > Here I'm just concerned that we would overload JM. Especially if it's cascading: A is triggered in A->B->C but finishes, JM computes B and resends RPC but at that time B is also finished. Hence I was thinking of using TMs instead and only fall back to JM if TM has exited. On Wed, Jan 6, 2021 at 3:29 PM Yun Gao <[hidden email]> wrote: > Hi Arvid, > > Very thanks for the feedbacks! I'll try to answer the questions inline: > > > I'm also concerned about the notion of a final checkpoint. What happens > > when this final checkpoint times out (checkpoint timeout > async timeout) > > or fails for a different reason? I'm currently more inclined to just let > > checkpoints work until the whole graph is completed (and thought this was > > the initial goal of the whole FLIP to being with). > > I think we are still on the same page that we would like to trigger > checkpoint periodically until the whole job is finished. > I think in generaly we do not must force the checkpoint aligned with > subtask finished, namely for example one operator > might have the lifecycle that "taking one checkpoint -> emit some records > -> taking another checkpoint -> emit more records -> finish", > and do not need to must have to wait for one more checkpoint before > finished. The second checkpoint just happens to be the "final" checkpoint > of this operator. > The only exception is that for sink operator that must wait for one more > checkpoint to commit the last piece of data before finished, this kind of > operators > would be dealt with separately to force them to wait for checkpont before > finished. > > > > However, I have the impression that you think mostly in terms of tasks and > > I mostly think in terms of subtasks. I especially want to have proper > > support for bounded sources where one partition is much larger than the > > > other partitions (might be in conjunction with unbounded sources such that > > > checkpointing is plausible to begin with). Hence, most of the subtasks are > > finished with one struggler remaining. In this case, the barriers are > > inserted now only in the struggling source subtask and potentially in any > > running downstream subtask. > > As far as I have understood, this would require barriers to be inserted > > downstream leading to similar race conditions. > > I might not fully understand the issue, but I'd like to further detail > the expected process here: > > Source (subtask 0) ---+ > | > Source (subtask 1) ---+--> Async I/O (subtask 0) -> Sink (subtask 0). > | > Source (subtask 2) ---+ > > > The async I/O subtask would have three input channels. > > case 1) Support source subtask 0 and 1 are finished and the Async I/O > would received EndOfPartition from the corresponding > channels. Now we happen to trigger a checkpoint, we in the remaining > execution graph, the subtask 2 is the "source" of the > graph. Then we would trigger source subtask 2 to start the checkpoint, > source subtask 2 takes snapshot and emit barriers to > Async I/O sutask. Async I/O subtask would found that 2/3 of its input > channels have received Eof and received barrier from > the remaining channel, then it knows the barriers are aligned, then it > takes the snapshot and emit the barrier to the sink subtasks. > > case 2) Suppose the job continue to run and now source subtask 2 is also > finished and now we are going to take another checkpoint, > then we found that in the remaining execution graph the new "source" now > is the Async I/O subtask. Then we would trigger this > Async I/O instead (this is different from the current implementation). > The Async I/O received the trigger and take its snapshot and > emit barrier to the following sink subtask. (Of couse here the Async I/O > subtask should have some method to wait till it received EoF > from all the input channels before taking snapshot to keep consistent, but > I think we could ignore the detail implementations first). > > For the race condition, it might happen if > a) in case 1, the CheckpontCoordinator trigger Source subtask 2, but > source subtask 2 report finished before the trigger RPC gets into the > resided TaskManager. > b) in case 2, the CheckpointCoordinator trigger Async I/O, but Async I/O > subtask report finished before the trigger RPC gets into the resided > TaskManager. > > In this case, if we do not deal with specially, based on the current > implementation, the trigger RPC would just be ignored, and the checkpoint > would finally > failed due to timeout since no tasks would report its state. But we would > be able to remedy this checkpont: since the Source subtask 2 and the Async > I/O > subtask would report FINISHED status to JobMaster after we tries to > trigger the tasks, and before the task has reported its snapshot for this > checkpont. > The CheckpontCoordinator would listen to the notification, when it > received the notification, it would iterates its pending checkpoints to see > if it has trigger this task but received FINISHED before its snapshot. If > so, it would recompute the subtasks to trigger, and re-trigger the > following tasks. > Of couse this is one possible implementation and we might have other > solutions to this problem. Do you think the process would still have some > problems ? > > > However, that would > > require subtasks to stay alive until they receive checkpiontCompleted > > callback (which is currently also not guaranteed) > > With the above process, I think the task would do not need to wait for > receiving the checkpontCompleted callback? If it finished, the above > process > would try to trigger its following tasks. > > Best, > Yun > > > > > > > > ------------------Original Mail ------------------ > *Sender:*Arvid Heise <[hidden email]> > *Send Date:*Wed Jan 6 20:42:56 2021 > *Recipients:*Aljoscha Krettek <[hidden email]> > *CC:*dev <[hidden email]> > *Subject:*Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > >> >> I was actually not thinking about concurrent checkpoints (and actually want >> to get rid of them once UC is established, since they are addressing the >> same thing). >> >> But your explanation definitely helped me to better understand the race >> condition. >> >> However, I have the impression that you think mostly in terms of tasks and >> I mostly think in terms of subtasks. I especially want to have proper >> support for bounded sources where one partition is much larger than the >> other partitions (might be in conjunction with unbounded sources such that >> checkpointing is plausible to begin with). Hence, most of the subtasks are >> finished with one struggler remaining. In this case, the barriers are >> inserted now only in the struggling source subtask and potentially in any >> running downstream subtask. >> As far as I have understood, this would require barriers to be inserted >> downstream leading to similar race conditions. >> >> I'm also concerned about the notion of a final checkpoint. What happens >> when this final checkpoint times out (checkpoint timeout > async timeout) >> or fails for a different reason? I'm currently more inclined to just let >> checkpoints work until the whole graph is completed (and thought this was >> the initial goal of the whole FLIP to being with). However, that would >> require subtasks to stay alive until they receive checkpiontCompleted >> callback (which is currently also not guaranteed)... >> >> On Wed, Jan 6, 2021 at 12:17 PM Aljoscha Krettek <[hidden email]> >> wrote: >> >> > On 2021/01/06 11:30, Arvid Heise wrote: >> >> > >I'm assuming that this is the normal case. In a A->B graph, as soon as A >> > >finishes, B still has a couple of input buffers to process. If you add >> > >backpressure or longer pipelines into the mix, it's quite likely that a >> > >checkpoint may occur with B being the head. >> > >> > Ahh, I think I know what you mean. This can happen when the checkpoint >> > coordinator issues concurrent checkpoint without waiting for older ones >> > to finish. My head is mostly operating under the premise that there is >> > at most one concurrent checkpoint. >> > >> > In the current code base the race conditions that Yun and I are talking >> > about cannot occur. Checkpoints can only be triggered at sources and >> > they will then travel through the graph. Intermediate operators are >> > never directly triggered from the JobManager/CheckpointCoordinator. >> > >> > When source start to shut down, the JM has to directly inject/trigger >> > checkpoints at the now new "sources" of the graph, which have previously >> > been intermediate operators. >> > >> > I want to repeat that I have a suspicion that maybe this is a degenerate >> > case and we never want to allow operators to be doing checkpoints when >> > they are not connected to at least one running source. Which means that >> > we have to find a solution for declined checkpoints, missing sources. >> > >> > I'll first show an example where I think we will never have intermediate >> > operators running without the sources being running: >> > >> > Source -> Map -> Sink >> > >> > Here, when the Source does its final checkpoint and then shuts down, >> > that same final checkpoint would travel downstream ahead of the EOF, >> > which would in turn cause Map and Sink to also shut down. *We can't have >> > the case that Map is still running when we want to take a checkpoint and >> > Source is not running*. >> > >> > A similar case is this one: >> > >> > Source1 --+ >> > |->Map -> Sink >> > Source2 --+ >> > >> > Here, if Source1 is finished but Source2 is not, Map is still connected >> > to at least one upstream source that is still running. Again. Map would >> > never be running and doing checkpoints if neither of Source1 or Source2 >> > are online. >> > >> > The cases I see where intermediate operators would keep running despite >> > not being connected to any upstream operators are when we purposefully >> > keep an operator online despite all inputs having seen EOF. One example >> > is async I/O, another is what Yun mentioned where a sink might want to >> > wait for another checkpoint to confirm some data. Example: >> > >> > Source -> Async I/O -> Sink >> > >> > Here, Async I/O will stay online as long as there are some scheduled >> > requests outstanding, even when the Source has shut down. In those >> > cases, the checkpoint coordinator would have to trigger new checkpoints >> > at Async I/O and not Source, because it has become the new "head" of the >> > graph. >> > >> > For Async I/O at least, we could say that the operator will wait for all >> > outstanding requests to finish before it allows the final checkpoint and >> > passes the barrier forward. >> > >> > Best, >> > Aljoscha >> > >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
In reply to this post by Arvid Heise-3
On 2021/01/06 13:35, Arvid Heise wrote:
>I was actually not thinking about concurrent checkpoints (and actually want >to get rid of them once UC is established, since they are addressing the >same thing). I would give a yuge +1 to that. I don't see why we would need concurrent checkpoints in most cases. (Any case even?) >However, I have the impression that you think mostly in terms of tasks and >I mostly think in terms of subtasks. I especially want to have proper >support for bounded sources where one partition is much larger than the >other partitions (might be in conjunction with unbounded sources such that >checkpointing is plausible to begin with). Hence, most of the subtasks are >finished with one struggler remaining. In this case, the barriers are >inserted now only in the struggling source subtask and potentially in any >running downstream subtask. >As far as I have understood, this would require barriers to be inserted >downstream leading to similar race conditions. No, I'm also thinking in terms of subtasks when it comes to triggering. As long as a subtask has at least one upstream task we don't need to manually trigger that task. A task will know which of its inputs have finished, so it will take those out of the calculation that waits for barriers from all upstream tasks. In the case where only a single upstream source is remaining the barriers from that task will then trigger checkpointing at the downstream task. >I'm also concerned about the notion of a final checkpoint. What happens >when this final checkpoint times out (checkpoint timeout > async timeout) >or fails for a different reason? I'm currently more inclined to just let >checkpoints work until the whole graph is completed (and thought this was >the initial goal of the whole FLIP to being with). However, that would >require subtasks to stay alive until they receive checkpiontCompleted >callback (which is currently also not guaranteed)... The idea is that the final checkpoint is whatever checkpoint succeeds in the end. When a task (and I mostly mean subtask when I say task) knows that it is done it waits for the next successful checkpoint and then shuts down. This is a basic question, though: should we simply keep all tasks (subtasks) around forever until the whole graph shuts down? Our answer for this was *no*, so far. We would like to allow tasks to shut down, such that the resources are freed at that point. Best, Aljoscha |
In reply to this post by Arvid Heise-3
On 2021/01/06 16:05, Arvid Heise wrote:
>thanks for the detailed example. It feels like Aljoscha and you are also >not fully aligned yet. For me, it sounded as if Aljoscha would like to >avoid sending RPC to non-source subtasks. No, I think we need the triggering of intermediate operators. I was just thinking out loud about the potential scenarios where intermediate operators will in fact stay online, and how common they are. Also, I sent an explanation that is similar to Yuns. It seems we always write out mails in parallel and then sent them before checking. :-) So you always get two explanations of roughly the same thing. Best, Aljoscha |
Okay then at least you guys are in sync ;) (Although I'm also not too far
away) I hope I'm not super derailing but could we reiterate why it's good to get rid of finished tasks (note: I'm also mostly in favor of that): 1. We can free all acquired resources including buffer pools, state backend(?), threads. 2. TM can forget about the subtask entirely. 3. We can subsequently downscale. 4. What more? I'm assuming it's not needed to execute the application at all: The application at one point had all subtasks running, so it's not a resource issue per se (ignoring rescaling). My idea is not to let the task live longer (except for final checkpoints where we are all on the same page I guess). I'm just thinking out loud if we can avoid 2. while still doing 1.+3. So can TM retain some slim information about a finished task to still process RPCs in a potentially different way? Thus, without keeping the costly task thread and operator chains, could we implement some RPC handler that knows this is a finished task and forward the barrier to the next task/TM? Can we store this slim information in a checkpoint as an operator subtask state? Could we transfer this slim information in case of (dynamic) downscaling? If this somehow works, we would not need to change much in the checkpoint coordinator. He would always inject into sources. We could also ignore the race conditions as long as the TM lives. Checkpointing times are also not worse as with the live task. Clear downside (assuming feasibility) is that we have two code paths that would deal with barriers. We would also need to keep more information in the TM but again at some point the complete subtask fitted. On Wed, Jan 6, 2021 at 4:39 PM Aljoscha Krettek <[hidden email]> wrote: > On 2021/01/06 16:05, Arvid Heise wrote: > >thanks for the detailed example. It feels like Aljoscha and you are also > >not fully aligned yet. For me, it sounded as if Aljoscha would like to > >avoid sending RPC to non-source subtasks. > > No, I think we need the triggering of intermediate operators. > > I was just thinking out loud about the potential scenarios where > intermediate operators will in fact stay online, and how common they > are. > > Also, I sent an explanation that is similar to Yuns. It seems we always > write out mails in parallel and then sent them before checking. :-) So > you always get two explanations of roughly the same thing. > > Best, > Aljoscha > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
In reply to this post by Yun Gao
Hi Arvid,
Very thanks for the deep thoughts ! > If this somehow works, we would not need to change much in the checkpoint > coordinator. He would always inject into sources. We could also ignore the > race conditions as long as the TM lives. Checkpointing times are also not > worse as with the live task. > Clear downside (assuming feasibility) is that we have two code paths that > would deal with barriers. We would also need to keep more information in > the TM but again at some point the complete subtask fitted. I also agree with that with the slim information the checkpoint was more unified with the normal process, and it could simplify the changes to CheckpointCoordinator. I thought about some rought design with this direction, and I still have some concerns: 1. If we want to ensure we could always trigged the slim finished sources, we must ensure the TaskExecutor keeping this information not be released due to idle timeout. Thus the slim information would still pin some resources under some scenarios. If we have mixed jobs with both bounded and unbounded sources, the resources would be kept pinned. 2. I still have some concerns in introducing a new rpc communication network between TaskManagers. We might need to intiate the rpc connection together with the netty channels and close the rpc channels on job finished, this indicates that on job finished the JM need to try to notify TM to clean these connections, which requires reliable RPC message from JM to TM, which involves timeout & resent, and JM might be blocked before finished to close these connections. This would also complicate failover, since on failover we need to close the original connections and reconnect according to the new deployment. And for the RPC channels themselves, the TM also need to maintain heartbeat to ensure the channel is still opened, this would increase the burden for the TM rpc services when we have a lot of TMs. 3. For the slim information, we would need to clear the information on failover to avoid the TM get pinned wrongly. This would also requires scheduler to introduce a new process to clear the slim information that is similar to cancel a running task. 4. Since the checkpoint would only be available to streaming jobs, we might only want to keep the slim information and create the rpc channels between tasksmanagers for streaming jobs. Currently the batch jobs roughly only differ in that it does not have a CheckpontCoordinator to trigger the checkpoint and other components are mostly unified. If we also want to consider whether to keep slim information and create the rpc channels, the scheduler and the network layer would then also need to be aware of the execution mode and have special code path for streaming mode. For the JM-based method to retry triggering the following tasks: > Here I'm just concerned that we would overload JM. Especially if it's cascading: A is triggered in A->B->C but finishes, > JM computes B and resends RPC but at that time B is also finished. Hence I was thinking of using TMs instead and only > fall back to JM if TM has exited. If the overhead is the main concerns, I roughly think that we might avoid too much failed retriggers by make CheckpointCoordinator to wait a show period to accumulate more finished notification before taking actions. The JM would have heavy burden if in a short time there are a large batch of tasks get finished, with the show wait time the JM would be able to smooth the overload and avoid repeat trying. Best, Yun ------------------Original Mail ------------------ Sender:Arvid Heise <[hidden email]> Send Date:Thu Jan 7 00:52:27 2021 Recipients:Aljoscha Krettek <[hidden email]> CC:dev <[hidden email]>, Yun Gao <[hidden email]> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Okay then at least you guys are in sync ;) (Although I'm also not too far away) I hope I'm not super derailing but could we reiterate why it's good to get rid of finished tasks (note: I'm also mostly in favor of that): 1. We can free all acquired resources including buffer pools, state backend(?), threads. 2. TM can forget about the subtask entirely. 3. We can subsequently downscale. 4. What more? I'm assuming it's not needed to execute the application at all: The application at one point had all subtasks running, so it's not a resource issue per se (ignoring rescaling). My idea is not to let the task live longer (except for final checkpoints where we are all on the same page I guess). I'm just thinking out loud if we can avoid 2. while still doing 1.+3. So can TM retain some slim information about a finished task to still process RPCs in a potentially different way? Thus, without keeping the costly task thread and operator chains, could we implement some RPC handler that knows this is a finished task and forward the barrier to the next task/TM? Can we store this slim information in a checkpoint as an operator subtask state? Could we transfer this slim information in case of (dynamic) downscaling? If this somehow works, we would not need to change much in the checkpoint coordinator. He would always inject into sources. We could also ignore the race conditions as long as the TM lives. Checkpointing times are also not worse as with the live task. Clear downside (assuming feasibility) is that we have two code paths that would deal with barriers. We would also need to keep more information in the TM but again at some point the complete subtask fitted. On Wed, Jan 6, 2021 at 4:39 PM Aljoscha Krettek <[hidden email]> wrote: > On 2021/01/06 16:05, Arvid Heise wrote: > >thanks for the detailed example. It feels like Aljoscha and you are also > >not fully aligned yet. For me, it sounded as if Aljoscha would like to > >avoid sending RPC to non-source subtasks. > > No, I think we need the triggering of intermediate operators. > > I was just thinking out loud about the potential scenarios where > intermediate operators will in fact stay online, and how common they > are. > > Also, I sent an explanation that is similar to Yuns. It seems we always > write out mails in parallel and then sent them before checking. :-) So > you always get two explanations of roughly the same thing. > > Best, > Aljoscha > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
In reply to this post by Yun Gao
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I just wanted to get this quick thought out. For letting operators decide whether they actually want to wait for a final checkpoint, which is relevant at least for Async I/O and potentially for sinks. We could introduce an interface, sth like `RequiresFinalization` or `FinalizationListener` (all bad names). The operator itself knows when it is ready to completely shut down, Async I/O would wait for all requests, sink would potentially wait for a given number of checkpoints. The interface would have a method like `isFinalized()` that the framework can call after each checkpoint (and potentially at other points) This way we would decouple that logic from things that don't actually need it. What do you think? Best, Aljoscha |
>
> We could introduce an interface, sth like `RequiresFinalization` or > `FinalizationListener` (all bad names). The operator itself knows when > it is ready to completely shut down, Async I/O would wait for all > requests, sink would potentially wait for a given number of checkpoints. > The interface would have a method like `isFinalized()` that the > framework can call after each checkpoint (and potentially at other > points) I think we are mixing two different things here that may require different solutions: 1. Tasks (=sink) that may need to do something with the final checkpoint. 2. Tasks that only finish after having finished operations that do not depend on data flow (async I/O, but I could also think of some timer actions in process functions). Your proposal would help most for the first case. The second case can solved entirely with current methods without being especially complicated: - EOP is only emitted once Async I/O is done with all background tasks - All timers are fired in a process function (I think we rather want to fire immediately on EOP but that's a different discussion) The advantage of this approach over your idea is that you don't need to wait for a checkpoint to complete to check for finalization. Now let's look at the first case. I see two alternatives: - The new sink interface implicitly incorporates this listener. Since I don't see a use case outside sinks, we could simply add this method to the new sink interface. - We implicitly assume that a sink is done after having a successful checkpoint at the end. Then we just need a tag interface `RequiresFinalization`. It also feels like we should add the property `final` to checkpoint options to help the sink detect that this is the last checkpoint to be taken. We could also try to always have the final checkpoint without tag interface on new sinks... On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> wrote: > This is somewhat unrelated to the discussion about how to actually do > the triggering when sources shut down, I'll write on that separately. I > just wanted to get this quick thought out. > > For letting operators decide whether they actually want to wait for a > final checkpoint, which is relevant at least for Async I/O and > potentially for sinks. > > We could introduce an interface, sth like `RequiresFinalization` or > `FinalizationListener` (all bad names). The operator itself knows when > it is ready to completely shut down, Async I/O would wait for all > requests, sink would potentially wait for a given number of checkpoints. > The interface would have a method like `isFinalized()` that the > framework can call after each checkpoint (and potentially at other > points) > > This way we would decouple that logic from things that don't actually > need it. What do you think? > > Best, > Aljoscha > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks for starting this discussion (and sorry for probably duplicated
questions, I couldn't find them answered in FLIP or this thread). 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). However, the resources would be wasted for a relatively short time until the job finishes completely. And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common? 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish). 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option. Regards, Roman On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote: > We could introduce an interface, sth like `RequiresFinalization` or >> `FinalizationListener` (all bad names). The operator itself knows when >> it is ready to completely shut down, Async I/O would wait for all >> requests, sink would potentially wait for a given number of checkpoints. >> The interface would have a method like `isFinalized()` that the >> framework can call after each checkpoint (and potentially at other >> points) > > > I think we are mixing two different things here that may require different > solutions: > 1. Tasks (=sink) that may need to do something with the final checkpoint. > 2. Tasks that only finish after having finished operations that do not > depend on data flow (async I/O, but I could also think of some timer > actions in process functions). > > Your proposal would help most for the first case. The second case can > solved entirely with current methods without being especially complicated: > - EOP is only emitted once Async I/O is done with all background tasks > - All timers are fired in a process function (I think we rather want to > fire immediately on EOP but that's a different discussion) > The advantage of this approach over your idea is that you don't need to > wait for a checkpoint to complete to check for finalization. > > Now let's look at the first case. I see two alternatives: > - The new sink interface implicitly incorporates this listener. Since I > don't see a use case outside sinks, we could simply add this method to the > new sink interface. > - We implicitly assume that a sink is done after having a successful > checkpoint at the end. Then we just need a tag interface > `RequiresFinalization`. It also feels like we should add the property > `final` to checkpoint options to help the sink detect that this is the last > checkpoint to be taken. We could also try to always have the final > checkpoint without tag interface on new sinks... > > On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> > wrote: > >> This is somewhat unrelated to the discussion about how to actually do >> the triggering when sources shut down, I'll write on that separately. I >> just wanted to get this quick thought out. >> >> For letting operators decide whether they actually want to wait for a >> final checkpoint, which is relevant at least for Async I/O and >> potentially for sinks. >> >> We could introduce an interface, sth like `RequiresFinalization` or >> `FinalizationListener` (all bad names). The operator itself knows when >> it is ready to completely shut down, Async I/O would wait for all >> requests, sink would potentially wait for a given number of checkpoints. >> The interface would have a method like `isFinalized()` that the >> framework can call after each checkpoint (and potentially at other >> points) >> >> This way we would decouple that logic from things that don't actually >> need it. What do you think? >> >> Best, >> Aljoscha >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > |
Hi Roman,
Very thanks for the feedbacks! I'll try to answer the issues inline: > 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). > However, the resources would be wasted for a relatively short time until the job finishes completely. > And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common? There might be mixed jobs with both bounded sources and unbounded sources, in this case, the resource for the finished part of the job would not be able to be released. And the Option 1 also complicates the semantics of the EndOfPartition, since if we holding the tasks and we still need to notify the following tasks about all records are sent, we would have to introduce some kind of pre-EndOfPartition messages, which is similar to the current EndOfPartition, but do not cause the channels to be released. > 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish). For Option 2 and 3 we managed the states via the unit of operator, thus the process of rescaling would be the same with the normal checkpoint. For example, support one operator resides in a tasks with parallelism 4, if 2 fo the subtasks are finished, now the state of the operator is composed of the state of the 2 remaining subtask instance, if we rescale to 5 after failover, the state of the 2 previous remaining subtasks would be re-distributed to the 5 new subtasks after failover. If before failover all the 4 subtasks are finished, the operator would be marked as finished, after failover the operator would be still marked as finished, and all the subtask instance of this operator would skip all the methods like open(), endOfInput(), close() and would be excluded when taking checkpoints after failover. > 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? > I think it still has to be sent downstream which invalidates this Option. For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, then its descandent tasks would wait all the records are received from the finished tasks before taking checkpoint, thus in this case we would not have result partition state, but only have channel state for the downstream tasks that are still running. In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then we would be able to ensure the finished tasks always have an empty state. I'll also optimize the FLIP to make it more clear~ Best, Yun ------------------Original Mail ------------------ Sender:Khachatryan Roman <[hidden email]> Send Date:Thu Jan 7 21:55:52 2021 Recipients:Arvid Heise <[hidden email]> CC:dev <[hidden email]>, user <[hidden email]> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread). 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). However, the resources would be wasted for a relatively short time until the job finishes completely. And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common? 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish). 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option. Regards, Roman On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote: We could introduce an interface, sth like `RequiresFinalization` or `FinalizationListener` (all bad names). The operator itself knows when it is ready to completely shut down, Async I/O would wait for all requests, sink would potentially wait for a given number of checkpoints. The interface would have a method like `isFinalized()` that the framework can call after each checkpoint (and potentially at other points) I think we are mixing two different things here that may require different solutions: 1. Tasks (=sink) that may need to do something with the final checkpoint. 2. Tasks that only finish after having finished operations that do not depend on data flow (async I/O, but I could also think of some timer actions in process functions). Your proposal would help most for the first case. The second case can solved entirely with current methods without being especially complicated: - EOP is only emitted once Async I/O is done with all background tasks - All timers are fired in a process function (I think we rather want to fire immediately on EOP but that's a different discussion) The advantage of this approach over your idea is that you don't need to wait for a checkpoint to complete to check for finalization. Now let's look at the first case. I see two alternatives: - The new sink interface implicitly incorporates this listener. Since I don't see a use case outside sinks, we could simply add this method to the new sink interface. - We implicitly assume that a sink is done after having a successful checkpoint at the end. Then we just need a tag interface `RequiresFinalization`. It also feels like we should add the property `final` to checkpoint options to help the sink detect that this is the last checkpoint to be taken. We could also try to always have the final checkpoint without tag interface on new sinks... On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> wrote: This is somewhat unrelated to the discussion about how to actually do the triggering when sources shut down, I'll write on that separately. I just wanted to get this quick thought out. For letting operators decide whether they actually want to wait for a final checkpoint, which is relevant at least for Async I/O and potentially for sinks. We could introduce an interface, sth like `RequiresFinalization` or `FinalizationListener` (all bad names). The operator itself knows when it is ready to completely shut down, Async I/O would wait for all requests, sink would potentially wait for a given number of checkpoints. The interface would have a method like `isFinalized()` that the framework can call after each checkpoint (and potentially at other points) This way we would decouple that logic from things that don't actually need it. What do you think? Best, Aljoscha -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany --Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks a lot for your answers Yun,
> In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would > choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks > are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with > EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then > we would be able to ensure the finished tasks always have an empty state. Probably it would be simpler to just decline the RPC-triggered checkpoint if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints. Maybe a better option would be to postpone JM notification from source until it's EoP is consumed? Regards, Roman On Thu, Jan 7, 2021 at 5:01 PM Yun Gao <[hidden email]> wrote: > Hi Roman, > > Very thanks for the feedbacks! I'll try to answer the issues inline: > > > 1. Option 1 is said to be not preferable because it wastes resources and > adds complexity (new event). > > However, the resources would be wasted for a relatively short time > until the job finishes completely. > > And compared to other options, complexity seems much lower. Or are > differences in task completion times so huge and so common? > > There might be mixed jobs with both bounded sources and unbounded sources, > in this case, the resource for the finished > part of the job would not be able to be released. > > And the Option 1 also complicates the semantics of the EndOfPartition, > since if we holding the tasks and we still need to > notify the following tasks about all records are sent, we would have to > introduce some kind of pre-EndOfPartition messages, > which is similar to the current EndOfPartition, but do not cause the > channels to be released. > > > 2. I think it would be helpful to describe how is rescaling handled in > Options 2 and 3 (or maybe it's not supported for jobs about to finish). > > For Option 2 and 3 we managed the states via the unit of operator, thus > the process of rescaling would be the same with the normal checkpoint. > For example, support one operator resides in a tasks with parallelism 4, > if 2 fo the subtasks are finished, now the state of the operator is > composed > of the state of the 2 remaining subtask instance, if we rescale to 5 after > failover, the state of the 2 previous remaining subtasks would be > re-distributed > to the 5 new subtasks after failover. > > If before failover all the 4 subtasks are finished, the operator would be > marked as finished, after failover the operator would be still marked as > finished, > and all the subtask instance of this operator would skip all the methods > like open(), endOfInput(), close() and would be excluded when taking > checkpoints > after failover. > > > > 3. Option 3 assumes that the state of a finished task is not used. > That's true for operator state, but what about channel state (captured by > unaligned checkpoint)? > > I think it still has to be sent downstream which invalidates this Option. > > For unaligned checkpoint, if in one checkpoint a subtask is marked as > finished, then its descandent tasks would wait all the records are received > from the finished tasks before taking checkpoint, thus in this case we > would not have result partition state, but only have channel state for the > downstream tasks that are still running. > > In detail, support we have a job with the graph A -> B -> C, support in > one checkpoint A has reported FINISHED, CheckpointCoordinator would > choose B as the new "source" to trigger checkpoint via RPC. For task B, if > it received checkpoint trigger, it would know that all its precedant tasks > are finished, then it would wait till all the InputChannel received > EndOfPartition from the network (namely inputChannel.onBuffer() is called > with > EndOfPartition) and then taking snapshot for the input channels, as the > normal unaligned checkpoints does for the InputChannel side. Then > we would be able to ensure the finished tasks always have an empty state. > > I'll also optimize the FLIP to make it more clear~ > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Khachatryan Roman <[hidden email]> > *Send Date:*Thu Jan 7 21:55:52 2021 > *Recipients:*Arvid Heise <[hidden email]> > *CC:*dev <[hidden email]>, user <[hidden email]> > *Subject:*Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > >> Thanks for starting this discussion (and sorry for probably duplicated >> questions, I couldn't find them answered in FLIP or this thread). >> >> 1. Option 1 is said to be not preferable because it wastes resources and >> adds complexity (new event). >> However, the resources would be wasted for a relatively short time until >> the job finishes completely. >> And compared to other options, complexity seems much lower. Or are >> differences in task completion times so huge and so common? >> >> 2. I think it would be helpful to describe how is rescaling handled in >> Options 2 and 3 (or maybe it's not supported for jobs about to finish). >> >> 3. Option 3 assumes that the state of a finished task is not used. That's >> true for operator state, but what about channel state (captured by >> unaligned checkpoint)? I think it still has to be sent downstream which >> invalidates this Option. >> >> Regards, >> Roman >> >> >> On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote: >> >>> We could introduce an interface, sth like `RequiresFinalization` or >>>> `FinalizationListener` (all bad names). The operator itself knows when >>>> it is ready to completely shut down, Async I/O would wait for all >>>> requests, sink would potentially wait for a given number of >>>> checkpoints. >>>> The interface would have a method like `isFinalized()` that the >>>> framework can call after each checkpoint (and potentially at other >>>> points) >>> >>> >>> I think we are mixing two different things here that may require >>> different solutions: >>> 1. Tasks (=sink) that may need to do something with the final checkpoint. >>> 2. Tasks that only finish after having finished operations that do not >>> depend on data flow (async I/O, but I could also think of some timer >>> actions in process functions). >>> >>> Your proposal would help most for the first case. The second case can >>> solved entirely with current methods without being especially complicated: >>> - EOP is only emitted once Async I/O is done with all background tasks >>> - All timers are fired in a process function (I think we rather want to >>> fire immediately on EOP but that's a different discussion) >>> The advantage of this approach over your idea is that you don't need to >>> wait for a checkpoint to complete to check for finalization. >>> >>> Now let's look at the first case. I see two alternatives: >>> - The new sink interface implicitly incorporates this listener. Since I >>> don't see a use case outside sinks, we could simply add this method to the >>> new sink interface. >>> - We implicitly assume that a sink is done after having a successful >>> checkpoint at the end. Then we just need a tag interface >>> `RequiresFinalization`. It also feels like we should add the property >>> `final` to checkpoint options to help the sink detect that this is the last >>> checkpoint to be taken. We could also try to always have the final >>> checkpoint without tag interface on new sinks... >>> >>> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> >>> wrote: >>> >>>> This is somewhat unrelated to the discussion about how to actually do >>>> the triggering when sources shut down, I'll write on that separately. I >>>> just wanted to get this quick thought out. >>>> >>>> For letting operators decide whether they actually want to wait for a >>>> final checkpoint, which is relevant at least for Async I/O and >>>> potentially for sinks. >>>> >>>> We could introduce an interface, sth like `RequiresFinalization` or >>>> `FinalizationListener` (all bad names). The operator itself knows when >>>> it is ready to completely shut down, Async I/O would wait for all >>>> requests, sink would potentially wait for a given number of >>>> checkpoints. >>>> The interface would have a method like `isFinalized()` that the >>>> framework can call after each checkpoint (and potentially at other >>>> points) >>>> >>>> This way we would decouple that logic from things that don't actually >>>> need it. What do you think? >>>> >>>> Best, >>>> Aljoscha >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> |
Hi Roman,
Very thanks for the feedbacks ! > Probably it would be simpler to just decline the RPC-triggered checkpoint > if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). > But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint > by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints. > Maybe a better option would be to postpone JM notification from source until it's EoP is consumed? I also agree with that there would indeed be possible cases that the checkpoint get slower since it could not skip the data in the result partition of the finished upstream task: a) For aligned checkpoint, the cases would not happen since the downstream tasks would always need to process the buffers in order. b) With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. But since only the result partition part of the finished upstream need wait to be processed, the other part of the execution graph could still perform the unaligned checkpoint normally, I think the average delay caused would be much lower than the completely aligned checkpoint, but there would still be extremely bad cases that the delay is long. Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint. For postpone the notification the JM notification, since current JM should not be able to know if the task has received all the EndOfPartition from the upstream tasks, we might need to introduce new RPC for notifying the state and since the triggering is not atomic, we may also met with some synchronization issues between JM and TM, which would introduce some complexity. Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. We could only do the wait for the PipelineResultPartition so it won't affect the batch jobs. With the waiting the unaligned checkpoint could continue to trigger the upstream task and skip the buffers in the result partition. Since the result partition state would be kept within the operator state of the last operator, after failover we would found that the last operator has an non-empty state and we would restart the tasks containing this operator to resend the snapshotted buffers. Of course this would also introduce some complexity, and since the probability of long delay would be lower than the completely aligned case, do you think it would be ok for us to view it as an optimization and postpone it to future versions ? Best, Yun ------------------------------------------------------------------ From:Khachatryan Roman <[hidden email]> Send Time:2021 Jan. 11 (Mon.) 05:46 To:Yun Gao <[hidden email]> Cc:Arvid Heise <[hidden email]>; dev <[hidden email]>; user <[hidden email]> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks a lot for your answers Yun, > In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would > choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks > are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with > EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then > we would be able to ensure the finished tasks always have an empty state. Probably it would be simpler to just decline the RPC-triggered checkpoint if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY). But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints. Maybe a better option would be to postpone JM notification from source until it's EoP is consumed? Regards, Roman |
Hi Yun,
> b) With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. I think UC will be the common case with multiple sources each with DoP > 1. IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. > But since only the result partition part of the finished upstream need wait to be processed, the other part of > the execution graph could still perform the unaligned checkpoint normally Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. > Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the > failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint. Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. > Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems. > do you think it would be ok for us to view it as an optimization and postpone it to future versions ? I think that's a good idea. Regards, Roman On Mon, Jan 11, 2021 at 11:03 AM Yun Gao <[hidden email]> wrote: > Hi Roman, > > Very thanks for the feedbacks ! > > > > Probably it would be simpler to just decline the RPC-triggered > checkpoint > > if not all inputs of this task are finished (with > CHECKPOINT_DECLINED_TASK_NOT_READY). > > > But I wonder how significantly this waiting for EoP from every > input will delay performing the first checkpoint > > by B after becoming a new source. This may in turn impact > exactly-once sinks and incremental checkpoints. > > Maybe a better option would be to postpone JM notification from > source until it's EoP is consumed? > > I also agree with that there would indeed be possible cases that > the checkpoint get slower since it could not skip > the data in the result partition of the finished upstream task: > a) For aligned checkpoint, the cases would not happen since > the downstream tasks would always need to > process the buffers in order. > b) With unaligned checkpoint enabled, the slower cases might > happen if the downstream task processes very > slowly. > > But since only the result partition part of the finished upstream > need wait to be processed, the other part of > the execution graph could still perform the unaligned checkpoint > normally, I think the average delay caused would > be much lower than the completely aligned checkpoint, but there > would still be extremely bad cases that > the delay is long. > > Declining the RPC-trigger checkpoint would indeed simplify the > implementation, but since currently by default the > failed checkpoint would cause job failover, thus we might have some > concerns in directly decline the checkpoint. > For postpone the notification the JM notification, since current JM > should not be able to know if the task has > received all the EndOfPartition from the upstream tasks, we might > need to introduce new RPC for notifying the > state and since the triggering is not atomic, we may also met with > some synchronization issues between JM and TM, > which would introduce some complexity. > > Thus another possible option might be let the upstream task to wait > till all the pending buffers in the result partition has > been flushed before get to finish. We could only do the wait for the > PipelineResultPartition so it won't affect the batch > jobs. With the waiting the unaligned checkpoint could continue to > trigger the upstream task and skip the buffers in > the result partition. Since the result partition state would be kept > within the operator state of the last operator, after failover > we would found that the last operator has an non-empty state and we > would restart the tasks containing this operator to > resend the snapshotted buffers. Of course this would also introduce > some complexity, and since the probability of long delay > would be lower than the completely aligned case, do you think it > would be ok for us to view it as an optimization and > postpone it to future versions ? > > Best, > Yun > > > > ------------------------------------------------------------------ > From:Khachatryan Roman <[hidden email]> > Send Time:2021 Jan. 11 (Mon.) 05:46 > To:Yun Gao <[hidden email]> > Cc:Arvid Heise <[hidden email]>; dev <[hidden email]>; user < > [hidden email]> > Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks > Finished > > Thanks a lot for your answers Yun, > > > In detail, support we have a job with the graph A -> B -> C, support in > one checkpoint A has reported FINISHED, CheckpointCoordinator would > > choose B as the new "source" to trigger checkpoint via RPC. For task B, > if it received checkpoint trigger, it would know that all its precedant > tasks > > are finished, then it would wait till all the InputChannel received > EndOfPartition from the network (namely inputChannel.onBuffer() is called > with > > EndOfPartition) and then taking snapshot for the input channels, as the > normal unaligned checkpoints does for the InputChannel side. Then > > we would be able to ensure the finished tasks always have an empty state. > > Probably it would be simpler to just decline the RPC-triggered checkpoint > if not all inputs of this task are finished (with > CHECKPOINT_DECLINED_TASK_NOT_READY). > > But I wonder how significantly this waiting for EoP from every input will > delay performing the first checkpoint by B after becoming a new source. > This may in turn impact exactly-once sinks and incremental checkpoints. > Maybe a better option would be to postpone JM notification from source > until it's EoP is consumed? > > Regards, > Roman > > > |
In reply to this post by Yun Gao
Hi Roman,
Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream finished task specially. > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. The downstream tasked get triggered indeed must wait for received EoPs from all the input channels, I initially compared it with the completely aligned cases and now the remaining execution graph after the trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the possible max delay. > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure. But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also agree the following option would be a better one that we try to complete each checkpoint. >> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. > This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems. >> do you think it would be ok for us to view it as an optimization and postpone it to future versions ? > I think that's a good idea. And also very sorry for here I should wrongly understand the proposals, and currently I also do not see explicit problems for waiting for the flush of pipeline result partition. Glad that we have the same viewpoints on this issue. :) Best, Yun ------------------------------------------------------------------ From:Khachatryan Roman <[hidden email]> Send Time:2021 Jan. 11 (Mon.) 19:14 To:Yun Gao <[hidden email]> Cc:dev <[hidden email]>; user <[hidden email]> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun, > b) With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. I think UC will be the common case with multiple sources each with DoP > 1. IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. > But since only the result partition part of the finished upstream need wait to be processed, the other part of > the execution graph could still perform the unaligned checkpoint normally Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. > Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the > failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint. Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. > Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems. > do you think it would be ok for us to view it as an optimization and postpone it to future versions ? I think that's a good idea. Regards, Roman |
Hi all,
I updated the FLIP[1] to reflect the major discussed points in the ML thread: 1) For the "new" root tasks finished before it received trigger message, previously we proposed to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that it might cause overhead to JobMaster on cascade finish and large parallelism cases. Another option might be let the StreamTask do one synchronization with the CheckpointCoordinator before get finished to be aware of the missed pending checkpoints, since at then EndOfPartitions are not emitted yet, it could still broadcast barriers to its descendant tasks. I updated the details in this section[2] in the FLIP. 2) For the barrier alignment, now we change to insert faked barriers in the input channels to avoid interference with checkpoint alignment algorithms. One remaining issue is that for unaligned checkpoint mode we could not snapshot the upstream tasks' result partition if it have been finished. One option to address this issue is to make the upstream tasks to wait for buffers get flushed before exit, and we would include this in the future versions. I updated this part in this section[3] in the FLIP. 3) Some operators like Sink Committer need to wait for one complete checkpoint before exit. To support the operators that need to wait for some finalization condition like the Sink committer and Async I/O, we could introduce a new interface to mark this kind of operators, and let the runtime to wait till the operators reached its condition. I updated this part in this section[4] in the FLIP. Could you have another look of the FLIP and the pending issues ? Any feedbacks are warmly welcomed and appreciated. Very thanks! Best, Yun [1] https://cwiki.apache.org/confluence/x/mw-ZCQ [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment [4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish ------------------------------------------------------------------ From:Yun Gao <[hidden email]> Send Time:2021 Jan. 12 (Tue.) 10:30 To:Khachatryan Roman <[hidden email]> Cc:dev <[hidden email]>; user <[hidden email]> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Roman, Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream finished task specially. > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. The downstream tasked get triggered indeed must wait for received EoPs from all the input channels, I initially compared it with the completely aligned cases and now the remaining execution graph after the trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the possible max delay. > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure. But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also agree the following option would be a better one that we try to complete each checkpoint. >> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. > This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems. >> do you think it would be ok for us to view it as an optimization and postpone it to future versions ? > I think that's a good idea. And also very sorry for here I should wrongly understand the proposals, and currently I also do not see explicit problems for waiting for the flush of pipeline result partition. Glad that we have the same viewpoints on this issue. :) Best, Yun ------------------------------------------------------------------ From:Khachatryan Roman <[hidden email]> Send Time:2021 Jan. 11 (Mon.) 19:14 To:Yun Gao <[hidden email]> Cc:dev <[hidden email]>; user <[hidden email]> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun, > b) With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. I think UC will be the common case with multiple sources each with DoP > 1. IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. > But since only the result partition part of the finished upstream need wait to be processed, the other part of > the execution graph could still perform the unaligned checkpoint normally Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. > Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the > failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint. Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. > Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems. > do you think it would be ok for us to view it as an optimization and postpone it to future versions ? I think that's a good idea. Regards, Roman |
Hi all,
We have some offline discussion together with @Arvid, @Roman and @Aljoscha and I'd like to post some points we discussed: 1) For the problem that the "new" root task coincidently finished before getting triggered successfully, we have listed two options in the FLIP-147[1], for the first version, now we are not tend to go with the first option that JM would re-compute and re-trigger new sources when it realized some tasks are not triggered successfully. This option would avoid the complexity of adding new PRC and duplicating task states, and in average case it would not cause too much overhead. 2) For how to support operators like Sink Committer to wait for one complete checkpoint before exit, it would be more an issue of how to use the checkpoints after tasks finished instead of how to achieve checkpoint after tasks finished, thus we would like to not include this part first in the current discussion. We would discuss and solve this issue separately after FLIP-147 is done. Best, Yun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished ------------------------------------------------------------------ From:Yun Gao <[hidden email]> Send Time:2021 Jan. 13 (Wed.) 16:09 To:dev <[hidden email]>; user <[hidden email]> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi all, I updated the FLIP[1] to reflect the major discussed points in the ML thread: 1) For the "new" root tasks finished before it received trigger message, previously we proposed to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that it might cause overhead to JobMaster on cascade finish and large parallelism cases. Another option might be let the StreamTask do one synchronization with the CheckpointCoordinator before get finished to be aware of the missed pending checkpoints, since at then EndOfPartitions are not emitted yet, it could still broadcast barriers to its descendant tasks. I updated the details in this section[2] in the FLIP. 2) For the barrier alignment, now we change to insert faked barriers in the input channels to avoid interference with checkpoint alignment algorithms. One remaining issue is that for unaligned checkpoint mode we could not snapshot the upstream tasks' result partition if it have been finished. One option to address this issue is to make the upstream tasks to wait for buffers get flushed before exit, and we would include this in the future versions. I updated this part in this section[3] in the FLIP. 3) Some operators like Sink Committer need to wait for one complete checkpoint before exit. To support the operators that need to wait for some finalization condition like the Sink committer and Async I/O, we could introduce a new interface to mark this kind of operators, and let the runtime to wait till the operators reached its condition. I updated this part in this section[4] in the FLIP. Could you have another look of the FLIP and the pending issues ? Any feedbacks are warmly welcomed and appreciated. Very thanks! Best, Yun [1] https://cwiki.apache.org/confluence/x/mw-ZCQ [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment [4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish ------------------------------------------------------------------ From:Yun Gao <[hidden email]> Send Time:2021 Jan. 12 (Tue.) 10:30 To:Khachatryan Roman <[hidden email]> Cc:dev <[hidden email]>; user <[hidden email]> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Roman, Very thanks for the feedbacks and suggestions! > I think UC will be the common case with multiple sources each with DoP > 1. > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream finished task specially. > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. The downstream tasked get triggered indeed must wait for received EoPs from all the input channels, I initially compared it with the completely aligned cases and now the remaining execution graph after the trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the possible max delay. > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure. But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also agree the following option would be a better one that we try to complete each checkpoint. >> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. > This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems. >> do you think it would be ok for us to view it as an optimization and postpone it to future versions ? > I think that's a good idea. And also very sorry for here I should wrongly understand the proposals, and currently I also do not see explicit problems for waiting for the flush of pipeline result partition. Glad that we have the same viewpoints on this issue. :) Best, Yun ------------------------------------------------------------------ From:Khachatryan Roman <[hidden email]> Send Time:2021 Jan. 11 (Mon.) 19:14 To:Yun Gao <[hidden email]> Cc:dev <[hidden email]>; user <[hidden email]> Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Yun, > b) With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. I think UC will be the common case with multiple sources each with DoP > 1. IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes. > But since only the result partition part of the finished upstream need wait to be processed, the other part of > the execution graph could still perform the unaligned checkpoint normally Yes, but checkpoint completion notification will not be sent until all the EOPs are processed. > Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the > failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint. Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't. > Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish. This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems. > do you think it would be ok for us to view it as an optimization and postpone it to future versions ? I think that's a good idea. Regards, Roman |
Thanks for the summary! I think we can now move towards a [VOTE] thread,
right? On 2021/01/15 13:43, Yun Gao wrote: >1) For the problem that the "new" root task coincidently finished >before getting triggered successfully, we have listed two options in >the FLIP-147[1], for the first version, now we are not tend to go with >the first option that JM would re-compute and re-trigger new sources >when it realized some tasks are not triggered successfully. This option >would avoid the complexity of adding new PRC and duplicating task >states, and in average case it would not cause too much overhead. You wrote "we are *not* tend to go with the first option", but I think you meant wo write "we tend to *now* go with the first option", right? That's also how it is in the FLIP, I just wanted to clarify for the mailing list. |
Hi Aljoscha,
I think so since we seems to do not have other divergence and new objections now. I'll open the vote then. Very thanks! Best, Yun ------------------------------------------------------------------ From:Aljoscha Krettek <[hidden email]> Send Time:2021 Jan. 15 (Fri.) 21:24 To:dev <[hidden email]> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for the summary! I think we can now move towards a [VOTE] thread, right? On 2021/01/15 13:43, Yun Gao wrote: >1) For the problem that the "new" root task coincidently finished >before getting triggered successfully, we have listed two options in >the FLIP-147[1], for the first version, now we are not tend to go with >the first option that JM would re-compute and re-trigger new sources >when it realized some tasks are not triggered successfully. This option >would avoid the complexity of adding new PRC and duplicating task >states, and in average case it would not cause too much overhead. You wrote "we are *not* tend to go with the first option", but I think you meant wo write "we tend to *now* go with the first option", right? That's also how it is in the FLIP, I just wanted to clarify for the mailing list. |
Free forum by Nabble | Edit this page |