Hi all,
When running the batch WordCount example, I configured the job execution mode as BATCH_FORCED, and failover-strategy as region, I manually injected some errors to let the execution fail in different phases. In some cases, the job could recovery from failover and became succeed, but in some cases, the job retried several times and failed. Example: - If the failure occurred before task read data, e.g., failed before invokable.invoke() in Task.java, failover could succeed. - If the failure occurred after task having read data, failover did not work. Problem diagnose: Running the example described before, each ExecutionVertex is defined as a restart region, and the ResultPartitionType between executions is BLOCKING. Thus, SpillableSubpartition and SpillableSubpartitionView are used to write/read shuffle data, and data blocks are described as BufferConsumers stored in a list called buffers, when task requires input data from SpillableSubpartitionView, BufferConsumers are REMOVED from buffers. Thus, when failures occurred after having read data, some BufferConsumers have already released. Although tasks retried, the input data is incomplete. Fix Proposal: - BufferConsumer should not be removed from buffers until the consumed ExecutionVertex is terminal. - SpillableSubpartition should not be released until the consumed ExecutionVertex is terminal. - SpillableSubpartition could creates multi SpillableSubpartitionViews, each of which is corresponding to a ExecutionAttempt. Best, Bo |
Hi,
I’m not sure how much effort we will be willing to invest in the existing batch stack. We are currently focusing on the support of bounded DataStreams (already done in Blink and will be merged to Flink soon) and unifing batch & stream under DataStream API. Piotrek > On 23 Jan 2019, at 04:45, Bo WANG <[hidden email]> wrote: > > Hi all, > > When running the batch WordCount example, I configured the job execution > mode > as BATCH_FORCED, and failover-strategy as region, I manually injected some > errors to let the execution fail in different phases. In some cases, the > job could > recovery from failover and became succeed, but in some cases, the job > retried > several times and failed. > > Example: > - If the failure occurred before task read data, e.g., failed before > invokable.invoke() in Task.java, failover could succeed. > - If the failure occurred after task having read data, failover did not > work. > > Problem diagnose: > Running the example described before, each ExecutionVertex is defined as > a restart region, and the ResultPartitionType between executions is > BLOCKING. > Thus, SpillableSubpartition and SpillableSubpartitionView are used to > write/read > shuffle data, and data blocks are described as BufferConsumers stored in a > list > called buffers, when task requires input data from > SpillableSubpartitionView, > BufferConsumers are REMOVED from buffers. Thus, when failures occurred > after having read data, some BufferConsumers have already released. > Although tasks retried, the input data is incomplete. > > Fix Proposal: > - BufferConsumer should not be removed from buffers until the consumed > ExecutionVertex is terminal. > - SpillableSubpartition should not be released until the consumed > ExecutionVertex is terminal. > - SpillableSubpartition could creates multi SpillableSubpartitionViews, > each of which is corresponding to a ExecutionAttempt. > > Best, > Bo |
The SpillableSubpartition can also be used during the execution of bounded
DataStreams programs. I think this is largely independent from deprecating the DataSet API. I am wondering if this particular issue is one that has been addressed in the Blink code already (we are looking to merge much of that functionality) - because the proposed extension is actually necessary for proper batch fault tolerance (independent of the DataSet or Query Processor stack). I am adding Kurt to this thread - maybe he help us find that out. On Thu, Jan 24, 2019 at 2:36 PM Piotr Nowojski <[hidden email]> wrote: > Hi, > > I’m not sure how much effort we will be willing to invest in the existing > batch stack. We are currently focusing on the support of bounded > DataStreams (already done in Blink and will be merged to Flink soon) and > unifing batch & stream under DataStream API. > > Piotrek > > > On 23 Jan 2019, at 04:45, Bo WANG <[hidden email]> wrote: > > > > Hi all, > > > > When running the batch WordCount example, I configured the job execution > > mode > > as BATCH_FORCED, and failover-strategy as region, I manually injected > some > > errors to let the execution fail in different phases. In some cases, the > > job could > > recovery from failover and became succeed, but in some cases, the job > > retried > > several times and failed. > > > > Example: > > - If the failure occurred before task read data, e.g., failed before > > invokable.invoke() in Task.java, failover could succeed. > > - If the failure occurred after task having read data, failover did not > > work. > > > > Problem diagnose: > > Running the example described before, each ExecutionVertex is defined as > > a restart region, and the ResultPartitionType between executions is > > BLOCKING. > > Thus, SpillableSubpartition and SpillableSubpartitionView are used to > > write/read > > shuffle data, and data blocks are described as BufferConsumers stored in > a > > list > > called buffers, when task requires input data from > > SpillableSubpartitionView, > > BufferConsumers are REMOVED from buffers. Thus, when failures occurred > > after having read data, some BufferConsumers have already released. > > Although tasks retried, the input data is incomplete. > > > > Fix Proposal: > > - BufferConsumer should not be removed from buffers until the consumed > > ExecutionVertex is terminal. > > - SpillableSubpartition should not be released until the consumed > > ExecutionVertex is terminal. > > - SpillableSubpartition could creates multi SpillableSubpartitionViews, > > each of which is corresponding to a ExecutionAttempt. > > > > Best, > > Bo > > |
Hi Bo,
Your mentioned problems can be summaried into two issues: 1. Failover strategy should consider whether the upstream produced partition is still available when the downstream fails. If the produced partition is available, then only downstream region needs to restarted, otherwise the upstream region should also be restarted to re-produce the partition data. 2. The lifecycle of partition: Currently once the partition data is transfered via network completely, the partition and view would be released from producer side, no matter whether the data is actually processed by consumer or not. Even the TaskManager would be released earier when the partition data is not transfered yet. Both issues are already considered in my proposed pluggable shuffle manager architecutre which would introduce the ShuffleMaster componenet to manage partition globally on JobManager side, then it is natural to solve the above problems based on this architecuture. You can refer to the flip [1] if interested. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager Best, Zhijiang ------------------------------------------------------------------ From:Stephan Ewen <[hidden email]> Send Time:2019年1月24日(星期四) 22:17 To:dev <[hidden email]>; Kurt Young <[hidden email]> Subject:Re: [DISCUSS] Shall we make SpillableSubpartition repeatedly readable to support fine grained recovery The SpillableSubpartition can also be used during the execution of bounded DataStreams programs. I think this is largely independent from deprecating the DataSet API. I am wondering if this particular issue is one that has been addressed in the Blink code already (we are looking to merge much of that functionality) - because the proposed extension is actually necessary for proper batch fault tolerance (independent of the DataSet or Query Processor stack). I am adding Kurt to this thread - maybe he help us find that out. On Thu, Jan 24, 2019 at 2:36 PM Piotr Nowojski <[hidden email]> wrote: > Hi, > > I’m not sure how much effort we will be willing to invest in the existing > batch stack. We are currently focusing on the support of bounded > DataStreams (already done in Blink and will be merged to Flink soon) and > unifing batch & stream under DataStream API. > > Piotrek > > > On 23 Jan 2019, at 04:45, Bo WANG <[hidden email]> wrote: > > > > Hi all, > > > > When running the batch WordCount example, I configured the job execution > > mode > > as BATCH_FORCED, and failover-strategy as region, I manually injected > some > > errors to let the execution fail in different phases. In some cases, the > > job could > > recovery from failover and became succeed, but in some cases, the job > > retried > > several times and failed. > > > > Example: > > - If the failure occurred before task read data, e.g., failed before > > invokable.invoke() in Task.java, failover could succeed. > > - If the failure occurred after task having read data, failover did not > > work. > > > > Problem diagnose: > > Running the example described before, each ExecutionVertex is defined as > > a restart region, and the ResultPartitionType between executions is > > BLOCKING. > > Thus, SpillableSubpartition and SpillableSubpartitionView are used to > > write/read > > shuffle data, and data blocks are described as BufferConsumers stored in > a > > list > > called buffers, when task requires input data from > > SpillableSubpartitionView, > > BufferConsumers are REMOVED from buffers. Thus, when failures occurred > > after having read data, some BufferConsumers have already released. > > Although tasks retried, the input data is incomplete. > > > > Fix Proposal: > > - BufferConsumer should not be removed from buffers until the consumed > > ExecutionVertex is terminal. > > - SpillableSubpartition should not be released until the consumed > > ExecutionVertex is terminal. > > - SpillableSubpartition could creates multi SpillableSubpartitionViews, > > each of which is corresponding to a ExecutionAttempt. > > > > Best, > > Bo > > |
Thanks to zhijiang for a detailed explanation. I would do some supplements
Blink has indeed solved this particular problem. This problem can be identified in Blink and the upstream will be restarted by Blink thanks zhijiang <[hidden email]> 于2019年1月25日周五 下午12:04写道: > Hi Bo, > > Your mentioned problems can be summaried into two issues: > > 1. Failover strategy should consider whether the upstream produced > partition is still available when the downstream fails. If the produced > partition is available, then only downstream region needs to restarted, > otherwise the upstream region should also be restarted to re-produce the > partition data. > 2. The lifecycle of partition: Currently once the partition data is > transfered via network completely, the partition and view would be released > from producer side, no matter whether the data is actually processed by > consumer or not. Even the TaskManager would be released earier when the > partition data is not transfered yet. > > Both issues are already considered in my proposed pluggable shuffle > manager architecutre which would introduce the ShuffleMaster componenet to > manage partition globally on JobManager side, then it is natural to solve > the above problems based on this architecuture. You can refer to the flip > [1] if interested. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager > > Best, > Zhijiang > ------------------------------------------------------------------ > From:Stephan Ewen <[hidden email]> > Send Time:2019年1月24日(星期四) 22:17 > To:dev <[hidden email]>; Kurt Young <[hidden email]> > Subject:Re: [DISCUSS] Shall we make SpillableSubpartition repeatedly > readable to support fine grained recovery > > The SpillableSubpartition can also be used during the execution of bounded > DataStreams programs. I think this is largely independent from deprecating > the DataSet API. > > I am wondering if this particular issue is one that has been addressed in > the Blink code already (we are looking to merge much of that functionality) > - because the proposed extension is actually necessary for proper batch > fault tolerance (independent of the DataSet or Query Processor stack). > > I am adding Kurt to this thread - maybe he help us find that out. > > On Thu, Jan 24, 2019 at 2:36 PM Piotr Nowojski <[hidden email]> > wrote: > > > Hi, > > > > I’m not sure how much effort we will be willing to invest in the existing > > batch stack. We are currently focusing on the support of bounded > > DataStreams (already done in Blink and will be merged to Flink soon) and > > unifing batch & stream under DataStream API. > > > > Piotrek > > > > > On 23 Jan 2019, at 04:45, Bo WANG <[hidden email]> wrote: > > > > > > Hi all, > > > > > > When running the batch WordCount example, I configured the job > execution > > > mode > > > as BATCH_FORCED, and failover-strategy as region, I manually injected > > some > > > errors to let the execution fail in different phases. In some cases, > the > > > job could > > > recovery from failover and became succeed, but in some cases, the job > > > retried > > > several times and failed. > > > > > > Example: > > > - If the failure occurred before task read data, e.g., failed before > > > invokable.invoke() in Task.java, failover could succeed. > > > - If the failure occurred after task having read data, failover did not > > > work. > > > > > > Problem diagnose: > > > Running the example described before, each ExecutionVertex is defined > as > > > a restart region, and the ResultPartitionType between executions is > > > BLOCKING. > > > Thus, SpillableSubpartition and SpillableSubpartitionView are used to > > > write/read > > > shuffle data, and data blocks are described as BufferConsumers stored > in > > a > > > list > > > called buffers, when task requires input data from > > > SpillableSubpartitionView, > > > BufferConsumers are REMOVED from buffers. Thus, when failures occurred > > > after having read data, some BufferConsumers have already released. > > > Although tasks retried, the input data is incomplete. > > > > > > Fix Proposal: > > > - BufferConsumer should not be removed from buffers until the consumed > > > ExecutionVertex is terminal. > > > - SpillableSubpartition should not be released until the consumed > > > ExecutionVertex is terminal. > > > - SpillableSubpartition could creates multi SpillableSubpartitionViews, > > > each of which is corresponding to a ExecutionAttempt. > > > > > > Best, > > > Bo > > > > > > |
Let's make sure that this is on the list of patches me merge from the blink
branch... On Fri, Jan 25, 2019, 07:56 Guowei Ma <[hidden email] wrote: > Thanks to zhijiang for a detailed explanation. I would do some supplements > Blink has indeed solved this particular problem. This problem can be > identified in Blink and the upstream will be restarted by Blink > thanks > > zhijiang <[hidden email]> 于2019年1月25日周五 下午12:04写道: > > > Hi Bo, > > > > Your mentioned problems can be summaried into two issues: > > > > 1. Failover strategy should consider whether the upstream produced > > partition is still available when the downstream fails. If the produced > > partition is available, then only downstream region needs to restarted, > > otherwise the upstream region should also be restarted to re-produce the > > partition data. > > 2. The lifecycle of partition: Currently once the partition data is > > transfered via network completely, the partition and view would be > released > > from producer side, no matter whether the data is actually processed by > > consumer or not. Even the TaskManager would be released earier when the > > partition data is not transfered yet. > > > > Both issues are already considered in my proposed pluggable shuffle > > manager architecutre which would introduce the ShuffleMaster componenet > to > > manage partition globally on JobManager side, then it is natural to solve > > the above problems based on this architecuture. You can refer to the flip > > [1] if interested. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager > > > > Best, > > Zhijiang > > ------------------------------------------------------------------ > > From:Stephan Ewen <[hidden email]> > > Send Time:2019年1月24日(星期四) 22:17 > > To:dev <[hidden email]>; Kurt Young <[hidden email]> > > Subject:Re: [DISCUSS] Shall we make SpillableSubpartition repeatedly > > readable to support fine grained recovery > > > > The SpillableSubpartition can also be used during the execution of > bounded > > DataStreams programs. I think this is largely independent from > deprecating > > the DataSet API. > > > > I am wondering if this particular issue is one that has been addressed in > > the Blink code already (we are looking to merge much of that > functionality) > > - because the proposed extension is actually necessary for proper batch > > fault tolerance (independent of the DataSet or Query Processor stack). > > > > I am adding Kurt to this thread - maybe he help us find that out. > > > > On Thu, Jan 24, 2019 at 2:36 PM Piotr Nowojski <[hidden email]> > > wrote: > > > > > Hi, > > > > > > I’m not sure how much effort we will be willing to invest in the > existing > > > batch stack. We are currently focusing on the support of bounded > > > DataStreams (already done in Blink and will be merged to Flink soon) > and > > > unifing batch & stream under DataStream API. > > > > > > Piotrek > > > > > > > On 23 Jan 2019, at 04:45, Bo WANG <[hidden email]> wrote: > > > > > > > > Hi all, > > > > > > > > When running the batch WordCount example, I configured the job > > execution > > > > mode > > > > as BATCH_FORCED, and failover-strategy as region, I manually injected > > > some > > > > errors to let the execution fail in different phases. In some cases, > > the > > > > job could > > > > recovery from failover and became succeed, but in some cases, the job > > > > retried > > > > several times and failed. > > > > > > > > Example: > > > > - If the failure occurred before task read data, e.g., failed before > > > > invokable.invoke() in Task.java, failover could succeed. > > > > - If the failure occurred after task having read data, failover did > not > > > > work. > > > > > > > > Problem diagnose: > > > > Running the example described before, each ExecutionVertex is defined > > as > > > > a restart region, and the ResultPartitionType between executions is > > > > BLOCKING. > > > > Thus, SpillableSubpartition and SpillableSubpartitionView are used to > > > > write/read > > > > shuffle data, and data blocks are described as BufferConsumers stored > > in > > > a > > > > list > > > > called buffers, when task requires input data from > > > > SpillableSubpartitionView, > > > > BufferConsumers are REMOVED from buffers. Thus, when failures > occurred > > > > after having read data, some BufferConsumers have already released. > > > > Although tasks retried, the input data is incomplete. > > > > > > > > Fix Proposal: > > > > - BufferConsumer should not be removed from buffers until the > consumed > > > > ExecutionVertex is terminal. > > > > - SpillableSubpartition should not be released until the consumed > > > > ExecutionVertex is terminal. > > > > - SpillableSubpartition could creates multi > SpillableSubpartitionViews, > > > > each of which is corresponding to a ExecutionAttempt. > > > > > > > > Best, > > > > Bo > > > > > > > > > > > |
Thanks all for the replies.
Though the current implementation in blink can handle some cases of the problem, the solution is not straight forward and efficient enough and can result in unnecessary restarting of terminated upstream task. The following word count job with parallelism 2 can demonstrate the retry process: T0: Map(1/2)#1, Map(2/2)#1 - both terminate and produce partitions for Reduce(1/2) and Reduce(2/2) T1: Reduce(1/2)#1, Reduce(2/2)#1 - Reduce(1/2)#1 throws a RuntimeException and fails due to error injection, Reduce(2/2)#1 terminates. T3: Reduce(1/2)#2 restarts and throws PartitionNotFoundException of Map(1/2) T4: Map(1/2)#2 restarts, produces partitions, and terminates T5: Reduce(1/2)#3 restarts and successfully consumes partitions produced by Map(1/2)#2, but throws PartitionNotFoundException of Map(2/2) T6: Map(2/2)#2 restarts, produces partitions, and terminates T7: Reduce(1/2)#4 restarts and throws PartitionNotFoundException of Map(1/2) (since partition produced by Map(1/2)#2 has been consumed by Reduce(1/2)#3) T8: Map(1/2)#3 restarts, produces partitions, and terminates T9: Reduce(1/2)#5 restarts, consumes partitions from Map(1/2)#3 and Map(2/2)#2, and terminates. Thus, Map restart 3 times and Reduce restart 4 times totally in this fail over. And we could conclude that with parallelism of n of Map vertex, Map will restart 2^n - 1 times, and Reduce will restart 2^n times. We can found that each time Reduce(1/2) retries, all subpartitions produced by Map(1/2) are consumed and thus cannot be reused for possible later downstream task retry. In fact, Map(1/2) is not necessary to restart every time since its output is OK if partitions are repeatedly readable. Based on this observation, we propose to improve the current implementation in Blink by reusing the output for possible later failure and re-start task only if its output is missing or corrupted. Repeatably readable SpillableSubpartition and restarting upstream producer are complementary rather than exclusive features. Combining with advantages of these two methods, failure recovery would be more efficient and effective. On Sat, Jan 26, 2019 at 11:54 PM Stephan Ewen <[hidden email]> wrote: > > Let's make sure that this is on the list of patches me merge from the blink > branch... > > On Fri, Jan 25, 2019, 07:56 Guowei Ma <[hidden email] wrote: > > > Thanks to zhijiang for a detailed explanation. I would do some supplements > > Blink has indeed solved this particular problem. This problem can be > > identified in Blink and the upstream will be restarted by Blink > > thanks > > > > zhijiang <[hidden email]> 于2019年1月25日周五 下午12:04写道: > > > > > Hi Bo, > > > > > > Your mentioned problems can be summaried into two issues: > > > > > > 1. Failover strategy should consider whether the upstream produced > > > partition is still available when the downstream fails. If the produced > > > partition is available, then only downstream region needs to restarted, > > > otherwise the upstream region should also be restarted to re-produce the > > > partition data. > > > 2. The lifecycle of partition: Currently once the partition data is > > > transfered via network completely, the partition and view would be > > released > > > from producer side, no matter whether the data is actually processed by > > > consumer or not. Even the TaskManager would be released earier when the > > > partition data is not transfered yet. > > > > > > Both issues are already considered in my proposed pluggable shuffle > > > manager architecutre which would introduce the ShuffleMaster componenet > > to > > > manage partition globally on JobManager side, then it is natural to solve > > > the above problems based on this architecuture. You can refer to the flip > > > [1] if interested. > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager > > > > > > Best, > > > Zhijiang > > > ------------------------------------------------------------------ > > > From:Stephan Ewen <[hidden email]> > > > Send Time:2019年1月24日(星期四) 22:17 > > > To:dev <[hidden email]>; Kurt Young <[hidden email]> > > > Subject:Re: [DISCUSS] Shall we make SpillableSubpartition repeatedly > > > readable to support fine grained recovery > > > > > > The SpillableSubpartition can also be used during the execution of > > bounded > > > DataStreams programs. I think this is largely independent from > > deprecating > > > the DataSet API. > > > > > > I am wondering if this particular issue is one that has been addressed in > > > the Blink code already (we are looking to merge much of that > > functionality) > > > - because the proposed extension is actually necessary for proper batch > > > fault tolerance (independent of the DataSet or Query Processor stack). > > > > > > I am adding Kurt to this thread - maybe he help us find that out. > > > > > > On Thu, Jan 24, 2019 at 2:36 PM Piotr Nowojski <[hidden email]> > > > wrote: > > > > > > > Hi, > > > > > > > > I’m not sure how much effort we will be willing to invest in the > > existing > > > > batch stack. We are currently focusing on the support of bounded > > > > DataStreams (already done in Blink and will be merged to Flink soon) > > and > > > > unifing batch & stream under DataStream API. > > > > > > > > Piotrek > > > > > > > > > On 23 Jan 2019, at 04:45, Bo WANG <[hidden email]> wrote: > > > > > > > > > > Hi all, > > > > > > > > > > When running the batch WordCount example, I configured the job > > > execution > > > > > mode > > > > > as BATCH_FORCED, and failover-strategy as region, I manually injected > > > > some > > > > > errors to let the execution fail in different phases. In some cases, > > > the > > > > > job could > > > > > recovery from failover and became succeed, but in some cases, the job > > > > > retried > > > > > several times and failed. > > > > > > > > > > Example: > > > > > - If the failure occurred before task read data, e.g., failed before > > > > > invokable.invoke() in Task.java, failover could succeed. > > > > > - If the failure occurred after task having read data, failover did > > not > > > > > work. > > > > > > > > > > Problem diagnose: > > > > > Running the example described before, each ExecutionVertex is defined > > > as > > > > > a restart region, and the ResultPartitionType between executions is > > > > > BLOCKING. > > > > > Thus, SpillableSubpartition and SpillableSubpartitionView are used to > > > > > write/read > > > > > shuffle data, and data blocks are described as BufferConsumers stored > > > in > > > > a > > > > > list > > > > > called buffers, when task requires input data from > > > > > SpillableSubpartitionView, > > > > > BufferConsumers are REMOVED from buffers. Thus, when failures > > occurred > > > > > after having read data, some BufferConsumers have already released. > > > > > Although tasks retried, the input data is incomplete. > > > > > > > > > > Fix Proposal: > > > > > - BufferConsumer should not be removed from buffers until the > > consumed > > > > > ExecutionVertex is terminal. > > > > > - SpillableSubpartition should not be released until the consumed > > > > > ExecutionVertex is terminal. > > > > > - SpillableSubpartition could creates multi > > SpillableSubpartitionViews, > > > > > each of which is corresponding to a ExecutionAttempt. > > > > > > > > > > Best, > > > > > Bo > > > > > > > > > > > > > > > > |
Hi Bo,
In current Blink implementation, the failover strategy can only confirm restart the upstream task region for some special exceptions reporeted by downstream task failure. As you said, if the partition is consumed once by downstream task, then it would be removed and can not be consumed again even though the data is still available on disk. The key problem as I mentioned in last email is loss of parrition management on JM side. JM is only aware of execution states currently but not aware of corresponding partition state during deciding which region shoulde be restarted. So there would result in unnecessary restart process after partition is removed from TaskManager shuffle service. As I mentioned, the new proposed shuffle manager achitecture already considers the partition management in ShuffleMaster componenet on JM side. That means the failover strategy could get correct partition state via ShuffleMaster#getFeature on JM side, so it can make the proper decision of restarting regions. And the partition state might be updated via communication from ShuffleService on TM side to ShuffleMaster on JM side. I and @Andrey Zagrebin are working on this feature now, and I think it would solve all your concerns based on this architecuture. Best, Zhijiang ------------------------------------------------------------------ From:Bo WANG <[hidden email]> Send Time:2019年1月28日(星期一) 15:11 To:dev <[hidden email]> Cc:wangzhijiang999 <[hidden email]>; Kurt Young <[hidden email]> Subject:Re: [DISCUSS] Shall we make SpillableSubpartition repeatedly readable to support fine grained recovery Thanks all for the replies. Though the current implementation in blink can handle some cases of the problem, the solution is not straight forward and efficient enough and can result in unnecessary restarting of terminated upstream task. The following word count job with parallelism 2 can demonstrate the retry process: T0: Map(1/2)#1, Map(2/2)#1 - both terminate and produce partitions for Reduce(1/2) and Reduce(2/2) T1: Reduce(1/2)#1, Reduce(2/2)#1 - Reduce(1/2)#1 throws a RuntimeException and fails due to error injection, Reduce(2/2)#1 terminates. T3: Reduce(1/2)#2 restarts and throws PartitionNotFoundException of Map(1/2) T4: Map(1/2)#2 restarts, produces partitions, and terminates T5: Reduce(1/2)#3 restarts and successfully consumes partitions produced by Map(1/2)#2, but throws PartitionNotFoundException of Map(2/2) T6: Map(2/2)#2 restarts, produces partitions, and terminates T7: Reduce(1/2)#4 restarts and throws PartitionNotFoundException of Map(1/2) (since partition produced by Map(1/2)#2 has been consumed by Reduce(1/2)#3) T8: Map(1/2)#3 restarts, produces partitions, and terminates T9: Reduce(1/2)#5 restarts, consumes partitions from Map(1/2)#3 and Map(2/2)#2, and terminates. Thus, Map restart 3 times and Reduce restart 4 times totally in this fail over. And we could conclude that with parallelism of n of Map vertex, Map will restart 2^n - 1 times, and Reduce will restart 2^n times. We can found that each time Reduce(1/2) retries, all subpartitions produced by Map(1/2) are consumed and thus cannot be reused for possible later downstream task retry. In fact, Map(1/2) is not necessary to restart every time since its output is OK if partitions are repeatedly readable. Based on this observation, we propose to improve the current implementation in Blink by reusing the output for possible later failure and re-start task only if its output is missing or corrupted. Repeatably readable SpillableSubpartition and restarting upstream producer are complementary rather than exclusive features. Combining with advantages of these two methods, failure recovery would be more efficient and effective. On Sat, Jan 26, 2019 at 11:54 PM Stephan Ewen <[hidden email]> wrote: > > Let's make sure that this is on the list of patches me merge from the blink > branch... > > On Fri, Jan 25, 2019, 07:56 Guowei Ma <[hidden email] wrote: > > > Thanks to zhijiang for a detailed explanation. I would do some supplements > > Blink has indeed solved this particular problem. This problem can be > > identified in Blink and the upstream will be restarted by Blink > > thanks > > > > zhijiang <[hidden email]> 于2019年1月25日周五 下午12:04写道: > > > > > Hi Bo, > > > > > > Your mentioned problems can be summaried into two issues: > > > > > > 1. Failover strategy should consider whether the upstream produced > > > partition is still available when the downstream fails. If the produced > > > partition is available, then only downstream region needs to restarted, > > > otherwise the upstream region should also be restarted to re-produce the > > > partition data. > > > 2. The lifecycle of partition: Currently once the partition data is > > > transfered via network completely, the partition and view would be > > released > > > from producer side, no matter whether the data is actually processed by > > > consumer or not. Even the TaskManager would be released earier when the > > > partition data is not transfered yet. > > > > > > Both issues are already considered in my proposed pluggable shuffle > > > manager architecutre which would introduce the ShuffleMaster componenet > > to > > > manage partition globally on JobManager side, then it is natural to solve > > > the above problems based on this architecuture. You can refer to the flip > > > [1] if interested. > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager > > > > > > Best, > > > Zhijiang > > > ------------------------------------------------------------------ > > > From:Stephan Ewen <[hidden email]> > > > Send Time:2019年1月24日(星期四) 22:17 > > > To:dev <[hidden email]>; Kurt Young <[hidden email]> > > > Subject:Re: [DISCUSS] Shall we make SpillableSubpartition repeatedly > > > readable to support fine grained recovery > > > > > > The SpillableSubpartition can also be used during the execution of > > bounded > > > DataStreams programs. I think this is largely independent from > > deprecating > > > the DataSet API. > > > > > > I am wondering if this particular issue is one that has been addressed in > > > the Blink code already (we are looking to merge much of that > > functionality) > > > - because the proposed extension is actually necessary for proper batch > > > fault tolerance (independent of the DataSet or Query Processor stack). > > > > > > I am adding Kurt to this thread - maybe he help us find that out. > > > > > > On Thu, Jan 24, 2019 at 2:36 PM Piotr Nowojski <[hidden email]> > > > wrote: > > > > > > > Hi, > > > > > > > > I’m not sure how much effort we will be willing to invest in the > > existing > > > > batch stack. We are currently focusing on the support of bounded > > > > DataStreams (already done in Blink and will be merged to Flink soon) > > and > > > > unifing batch & stream under DataStream API. > > > > > > > > Piotrek > > > > > > > > > On 23 Jan 2019, at 04:45, Bo WANG <[hidden email]> wrote: > > > > > > > > > > Hi all, > > > > > > > > > > When running the batch WordCount example, I configured the job > > > execution > > > > > mode > > > > > as BATCH_FORCED, and failover-strategy as region, I manually injected > > > > some > > > > > errors to let the execution fail in different phases. In some cases, > > > the > > > > > job could > > > > > recovery from failover and became succeed, but in some cases, the job > > > > > retried > > > > > several times and failed. > > > > > > > > > > Example: > > > > > - If the failure occurred before task read data, e.g., failed before > > > > > invokable.invoke() in Task.java, failover could succeed. > > > > > - If the failure occurred after task having read data, failover did > > not > > > > > work. > > > > > > > > > > Problem diagnose: > > > > > Running the example described before, each ExecutionVertex is defined > > > as > > > > > a restart region, and the ResultPartitionType between executions is > > > > > BLOCKING. > > > > > Thus, SpillableSubpartition and SpillableSubpartitionView are used to > > > > > write/read > > > > > shuffle data, and data blocks are described as BufferConsumers stored > > > in > > > > a > > > > > list > > > > > called buffers, when task requires input data from > > > > > SpillableSubpartitionView, > > > > > BufferConsumers are REMOVED from buffers. Thus, when failures > > occurred > > > > > after having read data, some BufferConsumers have already released. > > > > > Although tasks retried, the input data is incomplete. > > > > > > > > > > Fix Proposal: > > > > > - BufferConsumer should not be removed from buffers until the > > consumed > > > > > ExecutionVertex is terminal. > > > > > - SpillableSubpartition should not be released until the consumed > > > > > ExecutionVertex is terminal. > > > > > - SpillableSubpartition could creates multi > > SpillableSubpartitionViews, > > > > > each of which is corresponding to a ExecutionAttempt. > > > > > > > > > > Best, > > > > > Bo > > > > > > > > > > > > > > > > |
Thank Bo for your interesting example, and it seems the current task retry
strategy could be more efficient. The ExternalShuffleService refactor is an exciting improvement for both shuffle and failover. And Bo proposed to make the SubPartition buffers available for possible later retry until the consumer really terminates. Basically I think the proposal is part of the ExternalShuffleService refactor, but in a way can be independent considering the progress of the refactor. The code change (PR7537 <https://github.com/apache/flink/pull/7537>) of this proposal can benefit the current task retry implementation shortly, as an initial begin to the long-term refactor. Best, Ryan zhijiang <[hidden email]> 于2019年1月28日周一 下午4:33写道: > Hi Bo, > > In current Blink implementation, the failover strategy can only confirm > restart the upstream task region for some special exceptions reporeted by > downstream task failure. As you said, if the partition is consumed once by > downstream task, then it would be removed and can not be consumed again > even though the data is still available on disk. > > The key problem as I mentioned in last email is loss of parrition > management on JM side. JM is only aware of execution states currently but > not aware of corresponding partition state during deciding which region > shoulde be restarted. So there would result in unnecessary restart process > after partition is removed from TaskManager shuffle service. As I > mentioned, the new proposed shuffle manager achitecture already considers > the partition management in ShuffleMaster componenet on JM side. That means > the failover strategy could get correct partition state via > ShuffleMaster#getFeature on JM side, so it can make the proper decision of > restarting regions. And the partition state might be updated via > communication from ShuffleService on TM side to ShuffleMaster on JM side. > > I and @Andrey Zagrebin are working on this feature now, and I think it > would solve all your concerns based on this architecuture. > > Best, > Zhijiang > > > ------------------------------------------------------------------ > From:Bo WANG <[hidden email]> > Send Time:2019年1月28日(星期一) 15:11 > To:dev <[hidden email]> > Cc:wangzhijiang999 <[hidden email]>; Kurt Young < > [hidden email]> > Subject:Re: [DISCUSS] Shall we make SpillableSubpartition repeatedly > readable to support fine grained recovery > > Thanks all for the replies. > > Though the current implementation in blink can handle some cases of the > problem, > the solution is not straight forward and efficient enough and can > result in unnecessary > restarting of terminated upstream task. The following word count job > with parallelism 2 > can demonstrate the retry process: > > T0: Map(1/2)#1, Map(2/2)#1 - both terminate and produce partitions > for Reduce(1/2) > and Reduce(2/2) > T1: Reduce(1/2)#1, Reduce(2/2)#1 - Reduce(1/2)#1 throws a > RuntimeException and fails > due to error injection, Reduce(2/2)#1 terminates. > T3: Reduce(1/2)#2 restarts and throws PartitionNotFoundException of > Map(1/2) > T4: Map(1/2)#2 restarts, produces partitions, and terminates > T5: Reduce(1/2)#3 restarts and successfully consumes partitions > produced by Map(1/2)#2, > but throws PartitionNotFoundException of Map(2/2) > T6: Map(2/2)#2 restarts, produces partitions, and terminates > T7: Reduce(1/2)#4 restarts and throws PartitionNotFoundException of > Map(1/2) > (since partition produced by Map(1/2)#2 has been consumed by Reduce(1/2)#3) > T8: Map(1/2)#3 restarts, produces partitions, and terminates > T9: Reduce(1/2)#5 restarts, consumes partitions from Map(1/2)#3 and > Map(2/2)#2, and terminates. > > Thus, Map restart 3 times and Reduce restart 4 times totally in this > fail over. And we could > conclude that with parallelism of n of Map vertex, Map will restart > 2^n - 1 times, and Reduce will restart 2^n times. > > We can found that each time Reduce(1/2) retries, all subpartitions > produced by Map(1/2) > are consumed and thus cannot be reused for possible later downstream task > retry. > In fact, Map(1/2) is not necessary to restart every time since its > output is OK if partitions > are repeatedly readable. Based on this observation, we propose to > improve the current > implementation in Blink by reusing the output for possible later > failure and re-start task > only if its output is missing or corrupted. Repeatably readable > SpillableSubpartition > and restarting upstream producer are complementary rather than > exclusive features. > Combining with advantages of these two methods, failure recovery would > be more efficient and effective. > > On Sat, Jan 26, 2019 at 11:54 PM Stephan Ewen <[hidden email]> > wrote: > > > > Let's make sure that this is on the list of patches me merge from the > blink > > branch... > > > > On Fri, Jan 25, 2019, 07:56 Guowei Ma <[hidden email] wrote: > > > > > Thanks to zhijiang for a detailed explanation. I would do some > supplements > > > Blink has indeed solved this particular problem. This problem can be > > > identified in Blink and the upstream will be restarted by Blink > > > thanks > > > > > > zhijiang <[hidden email]> 于2019年1月25日周五 下午12:04写道: > > > > > > > Hi Bo, > > > > > > > > Your mentioned problems can be summaried into two issues: > > > > > > > > 1. Failover strategy should consider whether the upstream produced > > > > partition is still available when the downstream fails. If the > produced > > > > partition is available, then only downstream region needs to > restarted, > > > > otherwise the upstream region should also be restarted to re-produce > the > > > > partition data. > > > > 2. The lifecycle of partition: Currently once the partition data is > > > > transfered via network completely, the partition and view would be > > > released > > > > from producer side, no matter whether the data is actually processed > by > > > > consumer or not. Even the TaskManager would be released earier when > the > > > > partition data is not transfered yet. > > > > > > > > Both issues are already considered in my proposed pluggable shuffle > > > > manager architecutre which would introduce the ShuffleMaster > componenet > > > to > > > > manage partition globally on JobManager side, then it is natural to > solve > > > > the above problems based on this architecuture. You can refer to the > flip > > > > [1] if interested. > > > > > > > > [1] > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager > > > > > > > > Best, > > > > Zhijiang > > > > ------------------------------------------------------------------ > > > > From:Stephan Ewen <[hidden email]> > > > > Send Time:2019年1月24日(星期四) 22:17 > > > > To:dev <[hidden email]>; Kurt Young <[hidden email]> > > > > Subject:Re: [DISCUSS] Shall we make SpillableSubpartition repeatedly > > > > readable to support fine grained recovery > > > > > > > > The SpillableSubpartition can also be used during the execution of > > > bounded > > > > DataStreams programs. I think this is largely independent from > > > deprecating > > > > the DataSet API. > > > > > > > > I am wondering if this particular issue is one that has been > addressed in > > > > the Blink code already (we are looking to merge much of that > > > functionality) > > > > - because the proposed extension is actually necessary for proper > batch > > > > fault tolerance (independent of the DataSet or Query Processor > stack). > > > > > > > > I am adding Kurt to this thread - maybe he help us find that out. > > > > > > > > On Thu, Jan 24, 2019 at 2:36 PM Piotr Nowojski < > [hidden email]> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I’m not sure how much effort we will be willing to invest in the > > > existing > > > > > batch stack. We are currently focusing on the support of bounded > > > > > DataStreams (already done in Blink and will be merged to Flink > soon) > > > and > > > > > unifing batch & stream under DataStream API. > > > > > > > > > > Piotrek > > > > > > > > > > > On 23 Jan 2019, at 04:45, Bo WANG <[hidden email]> > wrote: > > > > > > > > > > > > Hi all, > > > > > > > > > > > > When running the batch WordCount example, I configured the job > > > > execution > > > > > > mode > > > > > > as BATCH_FORCED, and failover-strategy as region, I manually > injected > > > > > some > > > > > > errors to let the execution fail in different phases. In some > cases, > > > > the > > > > > > job could > > > > > > recovery from failover and became succeed, but in some cases, > the job > > > > > > retried > > > > > > several times and failed. > > > > > > > > > > > > Example: > > > > > > - If the failure occurred before task read data, e.g., failed > before > > > > > > invokable.invoke() in Task.java, failover could succeed. > > > > > > - If the failure occurred after task having read data, failover > did > > > not > > > > > > work. > > > > > > > > > > > > Problem diagnose: > > > > > > Running the example described before, each ExecutionVertex is > defined > > > > as > > > > > > a restart region, and the ResultPartitionType between executions > is > > > > > > BLOCKING. > > > > > > Thus, SpillableSubpartition and SpillableSubpartitionView are > used to > > > > > > write/read > > > > > > shuffle data, and data blocks are described as BufferConsumers > stored > > > > in > > > > > a > > > > > > list > > > > > > called buffers, when task requires input data from > > > > > > SpillableSubpartitionView, > > > > > > BufferConsumers are REMOVED from buffers. Thus, when failures > > > occurred > > > > > > after having read data, some BufferConsumers have already > released. > > > > > > Although tasks retried, the input data is incomplete. > > > > > > > > > > > > Fix Proposal: > > > > > > - BufferConsumer should not be removed from buffers until the > > > consumed > > > > > > ExecutionVertex is terminal. > > > > > > - SpillableSubpartition should not be released until the consumed > > > > > > ExecutionVertex is terminal. > > > > > > - SpillableSubpartition could creates multi > > > SpillableSubpartitionViews, > > > > > > each of which is corresponding to a ExecutionAttempt. > > > > > > > > > > > > Best, > > > > > > Bo > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |