Hi everyone,
I would like to start a discussion about adding a new type of scheduler to Flink. The declarative scheduler will first declare the required resources and wait for them before deciding on the actual parallelism of a job. Thereby it can better handle situations where resources cannot be fully fulfilled. Moreover, it will act as a building block for the reactive mode where Flink should scale to the maximum of the currently available resources. Please find more details in the FLIP wiki document [1]. Looking forward to your feedback. [1] https://cwiki.apache.org/confluence/x/mwtRCg Cheers, Till |
Till, thanks a lot for the proposal.
Even if the initial phase is only to support scale-up, maybe the "ScaleUpController" interface should be called "RescaleController" so that in the future scale-down can be added. On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <[hidden email]> wrote: > Hi everyone, > > I would like to start a discussion about adding a new type of scheduler to > Flink. The declarative scheduler will first declare the required resources > and wait for them before deciding on the actual parallelism of a job. > Thereby it can better handle situations where resources cannot be fully > fulfilled. Moreover, it will act as a building block for the reactive mode > where Flink should scale to the maximum of the currently available > resources. > > Please find more details in the FLIP wiki document [1]. Looking forward to > your feedback. > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > > Cheers, > Till > |
Thanks for preparing the FLIP and starting the discussion, Till.
I have a few questions trying to understand the design. ## What is the relationship between the current and new schedulers? IIUC, the new declarative scheduler will coexist with the current scheduler, as an alternative that the user needs to explicitly switch to. Then does it require any changes to the scheduler interfaces and how the JobMaster interacts with it? ## Is `SlotAllocator` aware of `ExecutionGraph`? Looking at the interfaces, it seems to me that `SlotAllocator` only takes `JobInformation` and `VertexInformation` as topology inputs. However, it generates `ParallelismAndResourceAssignments` which maps slots to `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is generated outside `SlotAllocator` while `ExecutionVertexID`s are generated inside `SlotAllocator`. ## About `ScaleUpController#canScaleUp` ### What is cumulative parallelism? The interface shows that the cumulative parallelism is a single number per job graph. I assume this is the sum of parallelism of all vertices? ### Is this interface expected to be called before or after `SlotAllocator#determineParallelism`? IIUC, on new resources appear, the scheduler always calls `SlotAllocator#determineParallelism` to generate a new plan based on the available resources, and `ScaleUpController#canScaleUp` is then called to decide whether to apply the new plan, according to whether the increasement is significant, how long the job has been running since last restart, etc. Is that correct? ### The cumulative parallelism may not be enough for deciding whether the job should be scaled up. I'm assuming my understanding for the above questions are correct. Please correct me if not. I noticed Chesnay's comment on the wiki page about making the decision based on when the job entered the execution state last time. In addition to that, there could also be situations where jobs may not benefit much an increment in cumulative parallelism. E.g., for a topology A -> B, where A and B are in different slot sharing groups, and the current parallelism for both A and B are 2. When 1 new slot appears, `SlotAllocator` may suggest increasing parallelism of A to 3. But this may not be a significant beneficial change for the job because the overall performance is still bottlenecked by the parallelism of B. ## Cluster vs. Job configuration I'm not entirely sure about specifying which scheduler to be used via a cluster level configuration option. Each job in a Flink cluster has its own JobMaster, and which scheduler to use is internal to that JobMaster. I understand that the declarative scheduler requires the cluster to use declarative resource management. However, other jobs should still be allowed to use other scheduler implementations that also support declarative resource management (e.g. the current `DefaultScheduler`). Maybe we should consider the cluster level configuration option as a default scheduler, and allow the job to specify a different scheduler in its execution config. This is similar to how we specify which state backend to be used. ## Minor: It might be better to also show in the state machine figure that it can go from `Executing` to `Restarting` when new resources appear. Thank you~ Xintong Song On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <[hidden email]> wrote: > Till, thanks a lot for the proposal. > > Even if the initial phase is only to support scale-up, maybe the > "ScaleUpController" interface should be called "RescaleController" so that > in the future scale-down can be added. > > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <[hidden email]> > wrote: > > > Hi everyone, > > > > I would like to start a discussion about adding a new type of scheduler > to > > Flink. The declarative scheduler will first declare the required > resources > > and wait for them before deciding on the actual parallelism of a job. > > Thereby it can better handle situations where resources cannot be fully > > fulfilled. Moreover, it will act as a building block for the reactive > mode > > where Flink should scale to the maximum of the currently available > > resources. > > > > Please find more details in the FLIP wiki document [1]. Looking forward > to > > your feedback. > > > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > > > > Cheers, > > Till > > > |
Thanks for the proposal! @Till Rohrmann <[hidden email]>
The design looks generally good to me. I have 2 concerns though: # performance on failover I can see is that an ExecutionGraph will be built and initialized on each task failure. The process is possibly to be slow: 1. the topology building can be very slow (much slower than the scheduling or restarting process) when the job scale becomes large (see FLINK-20612). Luckily FLINK-21110 will improve it. 2. the input/output format can be slow due to possible IO work to external services. Maybe we should extract it out from ExecutionGraph building? # execution history lost After building and using a new ExecutionGraph, the execution history in the old graph will be lost, including status timestamps of the job, prior execution attempts and their failures. Should we let the new EG inherit these information from prior EG? Maybe as a future plan with further discussions regarding the varying parallelism. Thanks, Zhu Xintong Song <[hidden email]> 于2021年1月24日周日 上午11:55写道: > Thanks for preparing the FLIP and starting the discussion, Till. > > I have a few questions trying to understand the design. > > ## What is the relationship between the current and new schedulers? > IIUC, the new declarative scheduler will coexist with the current > scheduler, as an alternative that the user needs to explicitly switch to. > Then does it require any changes to the scheduler interfaces and how the > JobMaster interacts with it? > > ## Is `SlotAllocator` aware of `ExecutionGraph`? > Looking at the interfaces, it seems to me that `SlotAllocator` only takes > `JobInformation` and `VertexInformation` as topology inputs. However, it > generates `ParallelismAndResourceAssignments` which maps slots to > `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is > generated outside `SlotAllocator` while `ExecutionVertexID`s are generated > inside `SlotAllocator`. > > ## About `ScaleUpController#canScaleUp` > > ### What is cumulative parallelism? > The interface shows that the cumulative parallelism is a single number per > job graph. I assume this is the sum of parallelism of all vertices? > > ### Is this interface expected to be called before or after > `SlotAllocator#determineParallelism`? > IIUC, on new resources appear, the scheduler always calls > `SlotAllocator#determineParallelism` to generate a new plan based on the > available resources, and `ScaleUpController#canScaleUp` is then called to > decide whether to apply the new plan, according to whether the increasement > is significant, how long the job has been running since last restart, etc. > Is that correct? > > ### The cumulative parallelism may not be enough for deciding whether the > job should be scaled up. > I'm assuming my understanding for the above questions are correct. Please > correct me if not. > > I noticed Chesnay's comment on the wiki page about making the decision > based on when the job entered the execution state last time. > > In addition to that, there could also be situations where jobs may not > benefit much an increment in cumulative parallelism. > E.g., for a topology A -> B, where A and B are in different slot sharing > groups, and the current parallelism for both A and B are 2. When 1 new slot > appears, `SlotAllocator` may suggest increasing parallelism of A to 3. But > this may not be a significant beneficial change for the job because the > overall performance is still bottlenecked by the parallelism of B. > > ## Cluster vs. Job configuration > I'm not entirely sure about specifying which scheduler to be used via a > cluster level configuration option. Each job in a Flink cluster has its own > JobMaster, and which scheduler to use is internal to that JobMaster. I > understand that the declarative scheduler requires the cluster to use > declarative resource management. However, other jobs should still be > allowed to use other scheduler implementations that also support > declarative resource management (e.g. the current `DefaultScheduler`). > Maybe we should consider the cluster level configuration option as a > default scheduler, and allow the job to specify a different scheduler in > its execution config. This is similar to how we specify which state backend > to be used. > > ## Minor: It might be better to also show in the state machine figure that > it can go from `Executing` to `Restarting` when new resources appear. > > Thank you~ > > Xintong Song > > > > On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <[hidden email]> wrote: > > > Till, thanks a lot for the proposal. > > > > Even if the initial phase is only to support scale-up, maybe the > > "ScaleUpController" interface should be called "RescaleController" so > that > > in the future scale-down can be added. > > > > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <[hidden email]> > > wrote: > > > > > Hi everyone, > > > > > > I would like to start a discussion about adding a new type of scheduler > > to > > > Flink. The declarative scheduler will first declare the required > > resources > > > and wait for them before deciding on the actual parallelism of a job. > > > Thereby it can better handle situations where resources cannot be fully > > > fulfilled. Moreover, it will act as a building block for the reactive > > mode > > > where Flink should scale to the maximum of the currently available > > > resources. > > > > > > Please find more details in the FLIP wiki document [1]. Looking forward > > to > > > your feedback. > > > > > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > > > > > > Cheers, > > > Till > > > > > > |
Thanks a lot for all your feedback. Let me try to answer it.
# ScaleUpController vs. RescaleController At the moment, the idea of the declarative scheduler is to run a job with a parallelism which is as close to the target value as possible but to never exceed it. Since the target value is currently fixed (this will change once auto-scaling is supported), we never have to scale down. In fact, scale down events only happen if the system loses slots and then you cannot choose whether to scale down or not. This and the idea to keep the initial design as simple as possible motivated the naming of ScaleUpController. Once we support auto-scaling, we might have to rename this interface. However, since I don't fully know how things will look like with auto-scaling, I would like to refrain from this change now. # Relationship between declarative scheduler and existing implementations The declarative scheduler will implement the SchedulerNG interface. At the moment I am not aware of any required changes to this interface. Also the way the JobMaster interacts with the scheduler won't change to the best of my knowledge. # Awareness of SlotAllocator of the ExecutionGraph The idea is to make the SlotAllocator not aware of the ExecutionGraph because it can only be created after the SlotAllocator has decided on the parallelism the job can be run with. Moreover, the idea is that the JobInformation instance contains all the required information of a job to make this decision. The SlotAllocator also reserves the slots for a job before the ExecutionGraph is created. Consequently, we need a way to associate the slots with the Executions of the ExecutionGraph. Here we have decided to use the ExecutionVertexID as the identificator. We could also introduce a new identificator as long as we can map this one to the ExecutionVertexID. We could for example use JobVertexID and subtask id as the identificator but this is exactly what the ExecutionVertexID is. That's why we decided to reuse it for the time being. # ScaleUpController ## Cumulative parallelism Yes, the cumulative parallelism is the sum of all tasks. ## When to call the ScaleUpController Yes, the idea is that the scheduler will check whether one can run the job with an increased parallelism if there are new slots available. If this is the case, then we will ask the ScaleUpController, whether we actually should scale up (e.g. whether the increase is significant enough or whether enough time has passed between the last scale up operation). ## Do we provide enough information to the ScaleUpController I am pretty sure that we don't provide the ScaleUpController enough information to make the best possible decision. We wanted to keep it simple in the first version and iterate on the interface with the help of user feedback. I think that this interface won't fundamentally change the scheduler and, hence, it shouldn't block future extensions by starting with a simple interface. # Cluster vs. job configuration I think you are right that it would be most flexible if one could select a scheduler on a per job basis with falling back to the cluster configuration if nothing has been specified. For the sake of simplicity and narrowing down the scope I would consider this as a follow up. # Performance on failover I agree that the performance won't be optimal in the failover case because of 1) having to recreate the EG and 2) redundant IO operations. For 1) I agree that FLINK-21110 will help a lot. For 2) moving the IO operations out of the EG could be a good improvement. With the same argument as above, I would consider this a follow up because I would like to keep the initial design as simple as possible and I think that these performance optimizations are not required to make this feature work. # Lost execution history You are right Zhu Zhu that recreating the ExecutionGraph causes the loss of information. We are currently investigating which information is strictly required and needs to be maintained across ExecutionGraph creations (mainly for tests to succeed). One idea is to either store this information outside of the ExecutionGraph or to initialize the ExecutionGraph with the information from the previous instance. Most likely, we won't give a guarantee that all counters, timestamps and metrics are correctly maintained across failovers in the first version, though. It is a bit the same argument as above that this is not strictly required to make this feature work. Maybe this means that this feature is not production ready for some users, but I think this is ok. In general, to fully integrate the declarative scheduler with the web ui we have to be able to display changing ExecutionGraphs. Ideally, we would have something like a timeline where one can select the time for which to display the state of the job execution. If one goes all the way to the right, the system shows the live state and all other positions are the present time minus some offset. I will add the follow ups to the FLIP as potential improvements in order to keep track of them. Cheers, Till On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <[hidden email]> wrote: > Thanks for the proposal! @Till Rohrmann <[hidden email]> > > The design looks generally good to me. I have 2 concerns though: > > # performance on failover > I can see is that an ExecutionGraph will be built and initialized on each > task failure. The process is possibly to be slow: > 1. the topology building can be very slow (much slower than the scheduling > or restarting process) when the job scale becomes large (see FLINK-20612). > Luckily FLINK-21110 will improve it. > 2. the input/output format can be slow due to possible IO work to external > services. > Maybe we should extract it out from ExecutionGraph building? > > # execution history lost > After building and using a new ExecutionGraph, the execution history in the > old graph will be lost, including status timestamps of the job, prior > execution > attempts and their failures. Should we let the new EG inherit these > information > from prior EG? Maybe as a future plan with further discussions regarding > the > varying parallelism. > > Thanks, > Zhu > > Xintong Song <[hidden email]> 于2021年1月24日周日 上午11:55写道: > >> Thanks for preparing the FLIP and starting the discussion, Till. >> >> I have a few questions trying to understand the design. >> >> ## What is the relationship between the current and new schedulers? >> IIUC, the new declarative scheduler will coexist with the current >> scheduler, as an alternative that the user needs to explicitly switch to. >> Then does it require any changes to the scheduler interfaces and how the >> JobMaster interacts with it? >> >> ## Is `SlotAllocator` aware of `ExecutionGraph`? >> Looking at the interfaces, it seems to me that `SlotAllocator` only takes >> `JobInformation` and `VertexInformation` as topology inputs. However, it >> generates `ParallelismAndResourceAssignments` which maps slots to >> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is >> generated outside `SlotAllocator` while `ExecutionVertexID`s are generated >> inside `SlotAllocator`. >> >> ## About `ScaleUpController#canScaleUp` >> >> ### What is cumulative parallelism? >> The interface shows that the cumulative parallelism is a single number per >> job graph. I assume this is the sum of parallelism of all vertices? >> >> ### Is this interface expected to be called before or after >> `SlotAllocator#determineParallelism`? >> IIUC, on new resources appear, the scheduler always calls >> `SlotAllocator#determineParallelism` to generate a new plan based on the >> available resources, and `ScaleUpController#canScaleUp` is then called to >> decide whether to apply the new plan, according to whether the >> increasement >> is significant, how long the job has been running since last restart, etc. >> Is that correct? >> >> ### The cumulative parallelism may not be enough for deciding whether the >> job should be scaled up. >> I'm assuming my understanding for the above questions are correct. Please >> correct me if not. >> >> I noticed Chesnay's comment on the wiki page about making the decision >> based on when the job entered the execution state last time. >> >> In addition to that, there could also be situations where jobs may not >> benefit much an increment in cumulative parallelism. >> E.g., for a topology A -> B, where A and B are in different slot sharing >> groups, and the current parallelism for both A and B are 2. When 1 new >> slot >> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. But >> this may not be a significant beneficial change for the job because the >> overall performance is still bottlenecked by the parallelism of B. >> >> ## Cluster vs. Job configuration >> I'm not entirely sure about specifying which scheduler to be used via a >> cluster level configuration option. Each job in a Flink cluster has its >> own >> JobMaster, and which scheduler to use is internal to that JobMaster. I >> understand that the declarative scheduler requires the cluster to use >> declarative resource management. However, other jobs should still be >> allowed to use other scheduler implementations that also support >> declarative resource management (e.g. the current `DefaultScheduler`). >> Maybe we should consider the cluster level configuration option as a >> default scheduler, and allow the job to specify a different scheduler in >> its execution config. This is similar to how we specify which state >> backend >> to be used. >> >> ## Minor: It might be better to also show in the state machine figure that >> it can go from `Executing` to `Restarting` when new resources appear. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <[hidden email]> wrote: >> >> > Till, thanks a lot for the proposal. >> > >> > Even if the initial phase is only to support scale-up, maybe the >> > "ScaleUpController" interface should be called "RescaleController" so >> that >> > in the future scale-down can be added. >> > >> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <[hidden email]> >> > wrote: >> > >> > > Hi everyone, >> > > >> > > I would like to start a discussion about adding a new type of >> scheduler >> > to >> > > Flink. The declarative scheduler will first declare the required >> > resources >> > > and wait for them before deciding on the actual parallelism of a job. >> > > Thereby it can better handle situations where resources cannot be >> fully >> > > fulfilled. Moreover, it will act as a building block for the reactive >> > mode >> > > where Flink should scale to the maximum of the currently available >> > > resources. >> > > >> > > Please find more details in the FLIP wiki document [1]. Looking >> forward >> > to >> > > your feedback. >> > > >> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg >> > > >> > > Cheers, >> > > Till >> > > >> > >> > |
Thanks Till for the explanation and the follow up actions.
That sounds good to me. Thanks, Zhu Till Rohrmann <[hidden email]> 于2021年1月26日周二 下午7:28写道: > Thanks a lot for all your feedback. Let me try to answer it. > > # ScaleUpController vs. RescaleController > > At the moment, the idea of the declarative scheduler is to run a job with > a parallelism which is as close to the target value as possible but to > never exceed it. Since the target value is currently fixed (this will > change once auto-scaling is supported), we never have to scale down. In > fact, scale down events only happen if the system loses slots and then you > cannot choose whether to scale down or not. This and the idea to keep the > initial design as simple as possible motivated the naming of > ScaleUpController. Once we support auto-scaling, we might have to rename > this interface. However, since I don't fully know how things will look like > with auto-scaling, I would like to refrain from this change now. > > # Relationship between declarative scheduler and existing implementations > > The declarative scheduler will implement the SchedulerNG interface. At the > moment I am not aware of any required changes to this interface. Also the > way the JobMaster interacts with the scheduler won't change to the best of > my knowledge. > > # Awareness of SlotAllocator of the ExecutionGraph > > The idea is to make the SlotAllocator not aware of the ExecutionGraph > because it can only be created after the SlotAllocator has decided on the > parallelism the job can be run with. Moreover, the idea is that the > JobInformation instance contains all the required information of a job to > make this decision. > > The SlotAllocator also reserves the slots for a job before the > ExecutionGraph is created. Consequently, we need a way to associate the > slots with the Executions of the ExecutionGraph. Here we have decided to > use the ExecutionVertexID as the identificator. We could also introduce a > new identificator as long as we can map this one to the ExecutionVertexID. > We could for example use JobVertexID and subtask id as the identificator > but this is exactly what the ExecutionVertexID is. That's why we decided to > reuse it for the time being. > > # ScaleUpController > > ## Cumulative parallelism > > Yes, the cumulative parallelism is the sum of all tasks. > > ## When to call the ScaleUpController > > Yes, the idea is that the scheduler will check whether one can run the job > with an increased parallelism if there are new slots available. If this is > the case, then we will ask the ScaleUpController, whether we actually > should scale up (e.g. whether the increase is significant enough or whether > enough time has passed between the last scale up operation). > > ## Do we provide enough information to the ScaleUpController > > I am pretty sure that we don't provide the ScaleUpController enough > information to make the best possible decision. We wanted to keep it simple > in the first version and iterate on the interface with the help of user > feedback. I think that this interface won't fundamentally change the > scheduler and, hence, it shouldn't block future extensions by starting with > a simple interface. > > # Cluster vs. job configuration > > I think you are right that it would be most flexible if one could select a > scheduler on a per job basis with falling back to the cluster configuration > if nothing has been specified. For the sake of simplicity and narrowing > down the scope I would consider this as a follow up. > > # Performance on failover > > I agree that the performance won't be optimal in the failover case because > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I > agree that FLINK-21110 will help a lot. For 2) moving the IO operations out > of the EG could be a good improvement. With the same argument as above, I > would consider this a follow up because I would like to keep the initial > design as simple as possible and I think that these performance > optimizations are not required to make this feature work. > > # Lost execution history > > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss > of information. We are currently investigating which information is > strictly required and needs to be maintained across ExecutionGraph > creations (mainly for tests to succeed). One idea is to either store this > information outside of the ExecutionGraph or to initialize the > ExecutionGraph with the information from the previous instance. Most > likely, we won't give a guarantee that all counters, timestamps and metrics > are correctly maintained across failovers in the first version, though. It > is a bit the same argument as above that this is not strictly required to > make this feature work. Maybe this means that this feature is not > production ready for some users, but I think this is ok. > > In general, to fully integrate the declarative scheduler with the web ui > we have to be able to display changing ExecutionGraphs. Ideally, we would > have something like a timeline where one can select the time for which to > display the state of the job execution. If one goes all the way to the > right, the system shows the live state and all other positions are the > present time minus some offset. > > I will add the follow ups to the FLIP as potential improvements in order > to keep track of them. > > Cheers, > Till > > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <[hidden email]> wrote: > >> Thanks for the proposal! @Till Rohrmann <[hidden email]> >> >> The design looks generally good to me. I have 2 concerns though: >> >> # performance on failover >> I can see is that an ExecutionGraph will be built and initialized on each >> task failure. The process is possibly to be slow: >> 1. the topology building can be very slow (much slower than the >> scheduling >> or restarting process) when the job scale becomes large (see >> FLINK-20612). >> Luckily FLINK-21110 will improve it. >> 2. the input/output format can be slow due to possible IO work to >> external services. >> Maybe we should extract it out from ExecutionGraph building? >> >> # execution history lost >> After building and using a new ExecutionGraph, the execution history in >> the >> old graph will be lost, including status timestamps of the job, prior >> execution >> attempts and their failures. Should we let the new EG inherit these >> information >> from prior EG? Maybe as a future plan with further discussions regarding >> the >> varying parallelism. >> >> Thanks, >> Zhu >> >> Xintong Song <[hidden email]> 于2021年1月24日周日 上午11:55写道: >> >>> Thanks for preparing the FLIP and starting the discussion, Till. >>> >>> I have a few questions trying to understand the design. >>> >>> ## What is the relationship between the current and new schedulers? >>> IIUC, the new declarative scheduler will coexist with the current >>> scheduler, as an alternative that the user needs to explicitly switch to. >>> Then does it require any changes to the scheduler interfaces and how the >>> JobMaster interacts with it? >>> >>> ## Is `SlotAllocator` aware of `ExecutionGraph`? >>> Looking at the interfaces, it seems to me that `SlotAllocator` only takes >>> `JobInformation` and `VertexInformation` as topology inputs. However, it >>> generates `ParallelismAndResourceAssignments` which maps slots to >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are >>> generated >>> inside `SlotAllocator`. >>> >>> ## About `ScaleUpController#canScaleUp` >>> >>> ### What is cumulative parallelism? >>> The interface shows that the cumulative parallelism is a single number >>> per >>> job graph. I assume this is the sum of parallelism of all vertices? >>> >>> ### Is this interface expected to be called before or after >>> `SlotAllocator#determineParallelism`? >>> IIUC, on new resources appear, the scheduler always calls >>> `SlotAllocator#determineParallelism` to generate a new plan based on the >>> available resources, and `ScaleUpController#canScaleUp` is then called to >>> decide whether to apply the new plan, according to whether the >>> increasement >>> is significant, how long the job has been running since last restart, >>> etc. >>> Is that correct? >>> >>> ### The cumulative parallelism may not be enough for deciding whether the >>> job should be scaled up. >>> I'm assuming my understanding for the above questions are correct. Please >>> correct me if not. >>> >>> I noticed Chesnay's comment on the wiki page about making the decision >>> based on when the job entered the execution state last time. >>> >>> In addition to that, there could also be situations where jobs may not >>> benefit much an increment in cumulative parallelism. >>> E.g., for a topology A -> B, where A and B are in different slot sharing >>> groups, and the current parallelism for both A and B are 2. When 1 new >>> slot >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. >>> But >>> this may not be a significant beneficial change for the job because the >>> overall performance is still bottlenecked by the parallelism of B. >>> >>> ## Cluster vs. Job configuration >>> I'm not entirely sure about specifying which scheduler to be used via a >>> cluster level configuration option. Each job in a Flink cluster has its >>> own >>> JobMaster, and which scheduler to use is internal to that JobMaster. I >>> understand that the declarative scheduler requires the cluster to use >>> declarative resource management. However, other jobs should still be >>> allowed to use other scheduler implementations that also support >>> declarative resource management (e.g. the current `DefaultScheduler`). >>> Maybe we should consider the cluster level configuration option as a >>> default scheduler, and allow the job to specify a different scheduler in >>> its execution config. This is similar to how we specify which state >>> backend >>> to be used. >>> >>> ## Minor: It might be better to also show in the state machine figure >>> that >>> it can go from `Executing` to `Restarting` when new resources appear. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <[hidden email]> wrote: >>> >>> > Till, thanks a lot for the proposal. >>> > >>> > Even if the initial phase is only to support scale-up, maybe the >>> > "ScaleUpController" interface should be called "RescaleController" so >>> that >>> > in the future scale-down can be added. >>> > >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <[hidden email]> >>> > wrote: >>> > >>> > > Hi everyone, >>> > > >>> > > I would like to start a discussion about adding a new type of >>> scheduler >>> > to >>> > > Flink. The declarative scheduler will first declare the required >>> > resources >>> > > and wait for them before deciding on the actual parallelism of a job. >>> > > Thereby it can better handle situations where resources cannot be >>> fully >>> > > fulfilled. Moreover, it will act as a building block for the reactive >>> > mode >>> > > where Flink should scale to the maximum of the currently available >>> > > resources. >>> > > >>> > > Please find more details in the FLIP wiki document [1]. Looking >>> forward >>> > to >>> > > your feedback. >>> > > >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg >>> > > >>> > > Cheers, >>> > > Till >>> > > >>> > >>> >> |
Thanks for the explanations, Till.
Keeping the initial design as simple as possible sounds good to me. There's no further concern from my side. Thank you~ Xintong Song On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <[hidden email]> wrote: > Thanks Till for the explanation and the follow up actions. > That sounds good to me. > > Thanks, > Zhu > > Till Rohrmann <[hidden email]> 于2021年1月26日周二 下午7:28写道: > > > Thanks a lot for all your feedback. Let me try to answer it. > > > > # ScaleUpController vs. RescaleController > > > > At the moment, the idea of the declarative scheduler is to run a job with > > a parallelism which is as close to the target value as possible but to > > never exceed it. Since the target value is currently fixed (this will > > change once auto-scaling is supported), we never have to scale down. In > > fact, scale down events only happen if the system loses slots and then > you > > cannot choose whether to scale down or not. This and the idea to keep the > > initial design as simple as possible motivated the naming of > > ScaleUpController. Once we support auto-scaling, we might have to rename > > this interface. However, since I don't fully know how things will look > like > > with auto-scaling, I would like to refrain from this change now. > > > > # Relationship between declarative scheduler and existing implementations > > > > The declarative scheduler will implement the SchedulerNG interface. At > the > > moment I am not aware of any required changes to this interface. Also the > > way the JobMaster interacts with the scheduler won't change to the best > of > > my knowledge. > > > > # Awareness of SlotAllocator of the ExecutionGraph > > > > The idea is to make the SlotAllocator not aware of the ExecutionGraph > > because it can only be created after the SlotAllocator has decided on the > > parallelism the job can be run with. Moreover, the idea is that the > > JobInformation instance contains all the required information of a job to > > make this decision. > > > > The SlotAllocator also reserves the slots for a job before the > > ExecutionGraph is created. Consequently, we need a way to associate the > > slots with the Executions of the ExecutionGraph. Here we have decided to > > use the ExecutionVertexID as the identificator. We could also introduce a > > new identificator as long as we can map this one to the > ExecutionVertexID. > > We could for example use JobVertexID and subtask id as the identificator > > but this is exactly what the ExecutionVertexID is. That's why we decided > to > > reuse it for the time being. > > > > # ScaleUpController > > > > ## Cumulative parallelism > > > > Yes, the cumulative parallelism is the sum of all tasks. > > > > ## When to call the ScaleUpController > > > > Yes, the idea is that the scheduler will check whether one can run the > job > > with an increased parallelism if there are new slots available. If this > is > > the case, then we will ask the ScaleUpController, whether we actually > > should scale up (e.g. whether the increase is significant enough or > whether > > enough time has passed between the last scale up operation). > > > > ## Do we provide enough information to the ScaleUpController > > > > I am pretty sure that we don't provide the ScaleUpController enough > > information to make the best possible decision. We wanted to keep it > simple > > in the first version and iterate on the interface with the help of user > > feedback. I think that this interface won't fundamentally change the > > scheduler and, hence, it shouldn't block future extensions by starting > with > > a simple interface. > > > > # Cluster vs. job configuration > > > > I think you are right that it would be most flexible if one could select > a > > scheduler on a per job basis with falling back to the cluster > configuration > > if nothing has been specified. For the sake of simplicity and narrowing > > down the scope I would consider this as a follow up. > > > > # Performance on failover > > > > I agree that the performance won't be optimal in the failover case > because > > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I > > agree that FLINK-21110 will help a lot. For 2) moving the IO operations > out > > of the EG could be a good improvement. With the same argument as above, I > > would consider this a follow up because I would like to keep the initial > > design as simple as possible and I think that these performance > > optimizations are not required to make this feature work. > > > > # Lost execution history > > > > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss > > of information. We are currently investigating which information is > > strictly required and needs to be maintained across ExecutionGraph > > creations (mainly for tests to succeed). One idea is to either store this > > information outside of the ExecutionGraph or to initialize the > > ExecutionGraph with the information from the previous instance. Most > > likely, we won't give a guarantee that all counters, timestamps and > metrics > > are correctly maintained across failovers in the first version, though. > It > > is a bit the same argument as above that this is not strictly required to > > make this feature work. Maybe this means that this feature is not > > production ready for some users, but I think this is ok. > > > > In general, to fully integrate the declarative scheduler with the web ui > > we have to be able to display changing ExecutionGraphs. Ideally, we would > > have something like a timeline where one can select the time for which to > > display the state of the job execution. If one goes all the way to the > > right, the system shows the live state and all other positions are the > > present time minus some offset. > > > > I will add the follow ups to the FLIP as potential improvements in order > > to keep track of them. > > > > Cheers, > > Till > > > > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <[hidden email]> wrote: > > > >> Thanks for the proposal! @Till Rohrmann <[hidden email]> > >> > >> The design looks generally good to me. I have 2 concerns though: > >> > >> # performance on failover > >> I can see is that an ExecutionGraph will be built and initialized on > each > >> task failure. The process is possibly to be slow: > >> 1. the topology building can be very slow (much slower than the > >> scheduling > >> or restarting process) when the job scale becomes large (see > >> FLINK-20612). > >> Luckily FLINK-21110 will improve it. > >> 2. the input/output format can be slow due to possible IO work to > >> external services. > >> Maybe we should extract it out from ExecutionGraph building? > >> > >> # execution history lost > >> After building and using a new ExecutionGraph, the execution history in > >> the > >> old graph will be lost, including status timestamps of the job, prior > >> execution > >> attempts and their failures. Should we let the new EG inherit these > >> information > >> from prior EG? Maybe as a future plan with further discussions regarding > >> the > >> varying parallelism. > >> > >> Thanks, > >> Zhu > >> > >> Xintong Song <[hidden email]> 于2021年1月24日周日 上午11:55写道: > >> > >>> Thanks for preparing the FLIP and starting the discussion, Till. > >>> > >>> I have a few questions trying to understand the design. > >>> > >>> ## What is the relationship between the current and new schedulers? > >>> IIUC, the new declarative scheduler will coexist with the current > >>> scheduler, as an alternative that the user needs to explicitly switch > to. > >>> Then does it require any changes to the scheduler interfaces and how > the > >>> JobMaster interacts with it? > >>> > >>> ## Is `SlotAllocator` aware of `ExecutionGraph`? > >>> Looking at the interfaces, it seems to me that `SlotAllocator` only > takes > >>> `JobInformation` and `VertexInformation` as topology inputs. However, > it > >>> generates `ParallelismAndResourceAssignments` which maps slots to > >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is > >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are > >>> generated > >>> inside `SlotAllocator`. > >>> > >>> ## About `ScaleUpController#canScaleUp` > >>> > >>> ### What is cumulative parallelism? > >>> The interface shows that the cumulative parallelism is a single number > >>> per > >>> job graph. I assume this is the sum of parallelism of all vertices? > >>> > >>> ### Is this interface expected to be called before or after > >>> `SlotAllocator#determineParallelism`? > >>> IIUC, on new resources appear, the scheduler always calls > >>> `SlotAllocator#determineParallelism` to generate a new plan based on > the > >>> available resources, and `ScaleUpController#canScaleUp` is then called > to > >>> decide whether to apply the new plan, according to whether the > >>> increasement > >>> is significant, how long the job has been running since last restart, > >>> etc. > >>> Is that correct? > >>> > >>> ### The cumulative parallelism may not be enough for deciding whether > the > >>> job should be scaled up. > >>> I'm assuming my understanding for the above questions are correct. > Please > >>> correct me if not. > >>> > >>> I noticed Chesnay's comment on the wiki page about making the decision > >>> based on when the job entered the execution state last time. > >>> > >>> In addition to that, there could also be situations where jobs may not > >>> benefit much an increment in cumulative parallelism. > >>> E.g., for a topology A -> B, where A and B are in different slot > sharing > >>> groups, and the current parallelism for both A and B are 2. When 1 new > >>> slot > >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. > >>> But > >>> this may not be a significant beneficial change for the job because the > >>> overall performance is still bottlenecked by the parallelism of B. > >>> > >>> ## Cluster vs. Job configuration > >>> I'm not entirely sure about specifying which scheduler to be used via a > >>> cluster level configuration option. Each job in a Flink cluster has its > >>> own > >>> JobMaster, and which scheduler to use is internal to that JobMaster. I > >>> understand that the declarative scheduler requires the cluster to use > >>> declarative resource management. However, other jobs should still be > >>> allowed to use other scheduler implementations that also support > >>> declarative resource management (e.g. the current `DefaultScheduler`). > >>> Maybe we should consider the cluster level configuration option as a > >>> default scheduler, and allow the job to specify a different scheduler > in > >>> its execution config. This is similar to how we specify which state > >>> backend > >>> to be used. > >>> > >>> ## Minor: It might be better to also show in the state machine figure > >>> that > >>> it can go from `Executing` to `Restarting` when new resources appear. > >>> > >>> Thank you~ > >>> > >>> Xintong Song > >>> > >>> > >>> > >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <[hidden email]> > wrote: > >>> > >>> > Till, thanks a lot for the proposal. > >>> > > >>> > Even if the initial phase is only to support scale-up, maybe the > >>> > "ScaleUpController" interface should be called "RescaleController" so > >>> that > >>> > in the future scale-down can be added. > >>> > > >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <[hidden email]> > >>> > wrote: > >>> > > >>> > > Hi everyone, > >>> > > > >>> > > I would like to start a discussion about adding a new type of > >>> scheduler > >>> > to > >>> > > Flink. The declarative scheduler will first declare the required > >>> > resources > >>> > > and wait for them before deciding on the actual parallelism of a > job. > >>> > > Thereby it can better handle situations where resources cannot be > >>> fully > >>> > > fulfilled. Moreover, it will act as a building block for the > reactive > >>> > mode > >>> > > where Flink should scale to the maximum of the currently available > >>> > > resources. > >>> > > > >>> > > Please find more details in the FLIP wiki document [1]. Looking > >>> forward > >>> > to > >>> > > your feedback. > >>> > > > >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > >>> > > > >>> > > Cheers, > >>> > > Till > >>> > > > >>> > > >>> > >> > |
Thanks for preparing the FLIP and driving the discussion, Till. All of
my questions have already been answered in the previous discussion. I just have one minor reminder regarding using ExecutionVertexID as the identificator. The JobVertexID is generated according to the topology instead of generated randomly. Thus, it's not guaranteed to be unique across different jobs and different execution. This characteristic is also inherited by the ExecutionVertexID. UUIC, the ParallelismAndResourceAssignments is a job-level concept so it will work well. Best, Yangze Guo On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <[hidden email]> wrote: > > Thanks for the explanations, Till. > Keeping the initial design as simple as possible sounds good to me. > There's no further concern from my side. > > Thank you~ > > Xintong Song > > > > On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <[hidden email]> wrote: > > > Thanks Till for the explanation and the follow up actions. > > That sounds good to me. > > > > Thanks, > > Zhu > > > > Till Rohrmann <[hidden email]> 于2021年1月26日周二 下午7:28写道: > > > > > Thanks a lot for all your feedback. Let me try to answer it. > > > > > > # ScaleUpController vs. RescaleController > > > > > > At the moment, the idea of the declarative scheduler is to run a job with > > > a parallelism which is as close to the target value as possible but to > > > never exceed it. Since the target value is currently fixed (this will > > > change once auto-scaling is supported), we never have to scale down. In > > > fact, scale down events only happen if the system loses slots and then > > you > > > cannot choose whether to scale down or not. This and the idea to keep the > > > initial design as simple as possible motivated the naming of > > > ScaleUpController. Once we support auto-scaling, we might have to rename > > > this interface. However, since I don't fully know how things will look > > like > > > with auto-scaling, I would like to refrain from this change now. > > > > > > # Relationship between declarative scheduler and existing implementations > > > > > > The declarative scheduler will implement the SchedulerNG interface. At > > the > > > moment I am not aware of any required changes to this interface. Also the > > > way the JobMaster interacts with the scheduler won't change to the best > > of > > > my knowledge. > > > > > > # Awareness of SlotAllocator of the ExecutionGraph > > > > > > The idea is to make the SlotAllocator not aware of the ExecutionGraph > > > because it can only be created after the SlotAllocator has decided on the > > > parallelism the job can be run with. Moreover, the idea is that the > > > JobInformation instance contains all the required information of a job to > > > make this decision. > > > > > > The SlotAllocator also reserves the slots for a job before the > > > ExecutionGraph is created. Consequently, we need a way to associate the > > > slots with the Executions of the ExecutionGraph. Here we have decided to > > > use the ExecutionVertexID as the identificator. We could also introduce a > > > new identificator as long as we can map this one to the > > ExecutionVertexID. > > > We could for example use JobVertexID and subtask id as the identificator > > > but this is exactly what the ExecutionVertexID is. That's why we decided > > to > > > reuse it for the time being. > > > > > > # ScaleUpController > > > > > > ## Cumulative parallelism > > > > > > Yes, the cumulative parallelism is the sum of all tasks. > > > > > > ## When to call the ScaleUpController > > > > > > Yes, the idea is that the scheduler will check whether one can run the > > job > > > with an increased parallelism if there are new slots available. If this > > is > > > the case, then we will ask the ScaleUpController, whether we actually > > > should scale up (e.g. whether the increase is significant enough or > > whether > > > enough time has passed between the last scale up operation). > > > > > > ## Do we provide enough information to the ScaleUpController > > > > > > I am pretty sure that we don't provide the ScaleUpController enough > > > information to make the best possible decision. We wanted to keep it > > simple > > > in the first version and iterate on the interface with the help of user > > > feedback. I think that this interface won't fundamentally change the > > > scheduler and, hence, it shouldn't block future extensions by starting > > with > > > a simple interface. > > > > > > # Cluster vs. job configuration > > > > > > I think you are right that it would be most flexible if one could select > > a > > > scheduler on a per job basis with falling back to the cluster > > configuration > > > if nothing has been specified. For the sake of simplicity and narrowing > > > down the scope I would consider this as a follow up. > > > > > > # Performance on failover > > > > > > I agree that the performance won't be optimal in the failover case > > because > > > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I > > > agree that FLINK-21110 will help a lot. For 2) moving the IO operations > > out > > > of the EG could be a good improvement. With the same argument as above, I > > > would consider this a follow up because I would like to keep the initial > > > design as simple as possible and I think that these performance > > > optimizations are not required to make this feature work. > > > > > > # Lost execution history > > > > > > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss > > > of information. We are currently investigating which information is > > > strictly required and needs to be maintained across ExecutionGraph > > > creations (mainly for tests to succeed). One idea is to either store this > > > information outside of the ExecutionGraph or to initialize the > > > ExecutionGraph with the information from the previous instance. Most > > > likely, we won't give a guarantee that all counters, timestamps and > > metrics > > > are correctly maintained across failovers in the first version, though. > > It > > > is a bit the same argument as above that this is not strictly required to > > > make this feature work. Maybe this means that this feature is not > > > production ready for some users, but I think this is ok. > > > > > > In general, to fully integrate the declarative scheduler with the web ui > > > we have to be able to display changing ExecutionGraphs. Ideally, we would > > > have something like a timeline where one can select the time for which to > > > display the state of the job execution. If one goes all the way to the > > > right, the system shows the live state and all other positions are the > > > present time minus some offset. > > > > > > I will add the follow ups to the FLIP as potential improvements in order > > > to keep track of them. > > > > > > Cheers, > > > Till > > > > > > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <[hidden email]> wrote: > > > > > >> Thanks for the proposal! @Till Rohrmann <[hidden email]> > > >> > > >> The design looks generally good to me. I have 2 concerns though: > > >> > > >> # performance on failover > > >> I can see is that an ExecutionGraph will be built and initialized on > > each > > >> task failure. The process is possibly to be slow: > > >> 1. the topology building can be very slow (much slower than the > > >> scheduling > > >> or restarting process) when the job scale becomes large (see > > >> FLINK-20612). > > >> Luckily FLINK-21110 will improve it. > > >> 2. the input/output format can be slow due to possible IO work to > > >> external services. > > >> Maybe we should extract it out from ExecutionGraph building? > > >> > > >> # execution history lost > > >> After building and using a new ExecutionGraph, the execution history in > > >> the > > >> old graph will be lost, including status timestamps of the job, prior > > >> execution > > >> attempts and their failures. Should we let the new EG inherit these > > >> information > > >> from prior EG? Maybe as a future plan with further discussions regarding > > >> the > > >> varying parallelism. > > >> > > >> Thanks, > > >> Zhu > > >> > > >> Xintong Song <[hidden email]> 于2021年1月24日周日 上午11:55写道: > > >> > > >>> Thanks for preparing the FLIP and starting the discussion, Till. > > >>> > > >>> I have a few questions trying to understand the design. > > >>> > > >>> ## What is the relationship between the current and new schedulers? > > >>> IIUC, the new declarative scheduler will coexist with the current > > >>> scheduler, as an alternative that the user needs to explicitly switch > > to. > > >>> Then does it require any changes to the scheduler interfaces and how > > the > > >>> JobMaster interacts with it? > > >>> > > >>> ## Is `SlotAllocator` aware of `ExecutionGraph`? > > >>> Looking at the interfaces, it seems to me that `SlotAllocator` only > > takes > > >>> `JobInformation` and `VertexInformation` as topology inputs. However, > > it > > >>> generates `ParallelismAndResourceAssignments` which maps slots to > > >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is > > >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are > > >>> generated > > >>> inside `SlotAllocator`. > > >>> > > >>> ## About `ScaleUpController#canScaleUp` > > >>> > > >>> ### What is cumulative parallelism? > > >>> The interface shows that the cumulative parallelism is a single number > > >>> per > > >>> job graph. I assume this is the sum of parallelism of all vertices? > > >>> > > >>> ### Is this interface expected to be called before or after > > >>> `SlotAllocator#determineParallelism`? > > >>> IIUC, on new resources appear, the scheduler always calls > > >>> `SlotAllocator#determineParallelism` to generate a new plan based on > > the > > >>> available resources, and `ScaleUpController#canScaleUp` is then called > > to > > >>> decide whether to apply the new plan, according to whether the > > >>> increasement > > >>> is significant, how long the job has been running since last restart, > > >>> etc. > > >>> Is that correct? > > >>> > > >>> ### The cumulative parallelism may not be enough for deciding whether > > the > > >>> job should be scaled up. > > >>> I'm assuming my understanding for the above questions are correct. > > Please > > >>> correct me if not. > > >>> > > >>> I noticed Chesnay's comment on the wiki page about making the decision > > >>> based on when the job entered the execution state last time. > > >>> > > >>> In addition to that, there could also be situations where jobs may not > > >>> benefit much an increment in cumulative parallelism. > > >>> E.g., for a topology A -> B, where A and B are in different slot > > sharing > > >>> groups, and the current parallelism for both A and B are 2. When 1 new > > >>> slot > > >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. > > >>> But > > >>> this may not be a significant beneficial change for the job because the > > >>> overall performance is still bottlenecked by the parallelism of B. > > >>> > > >>> ## Cluster vs. Job configuration > > >>> I'm not entirely sure about specifying which scheduler to be used via a > > >>> cluster level configuration option. Each job in a Flink cluster has its > > >>> own > > >>> JobMaster, and which scheduler to use is internal to that JobMaster. I > > >>> understand that the declarative scheduler requires the cluster to use > > >>> declarative resource management. However, other jobs should still be > > >>> allowed to use other scheduler implementations that also support > > >>> declarative resource management (e.g. the current `DefaultScheduler`). > > >>> Maybe we should consider the cluster level configuration option as a > > >>> default scheduler, and allow the job to specify a different scheduler > > in > > >>> its execution config. This is similar to how we specify which state > > >>> backend > > >>> to be used. > > >>> > > >>> ## Minor: It might be better to also show in the state machine figure > > >>> that > > >>> it can go from `Executing` to `Restarting` when new resources appear. > > >>> > > >>> Thank you~ > > >>> > > >>> Xintong Song > > >>> > > >>> > > >>> > > >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <[hidden email]> > > wrote: > > >>> > > >>> > Till, thanks a lot for the proposal. > > >>> > > > >>> > Even if the initial phase is only to support scale-up, maybe the > > >>> > "ScaleUpController" interface should be called "RescaleController" so > > >>> that > > >>> > in the future scale-down can be added. > > >>> > > > >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <[hidden email]> > > >>> > wrote: > > >>> > > > >>> > > Hi everyone, > > >>> > > > > >>> > > I would like to start a discussion about adding a new type of > > >>> scheduler > > >>> > to > > >>> > > Flink. The declarative scheduler will first declare the required > > >>> > resources > > >>> > > and wait for them before deciding on the actual parallelism of a > > job. > > >>> > > Thereby it can better handle situations where resources cannot be > > >>> fully > > >>> > > fulfilled. Moreover, it will act as a building block for the > > reactive > > >>> > mode > > >>> > > where Flink should scale to the maximum of the currently available > > >>> > > resources. > > >>> > > > > >>> > > Please find more details in the FLIP wiki document [1]. Looking > > >>> forward > > >>> > to > > >>> > > your feedback. > > >>> > > > > >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg > > >>> > > > > >>> > > Cheers, > > >>> > > Till > > >>> > > > > >>> > > > >>> > > >> > > |
Yes, since we're only operating within the scheduler, which exists
separately for each job, we don't have to worry about collisions with other jobs. On 1/27/2021 11:08 AM, Yangze Guo wrote: > Thanks for preparing the FLIP and driving the discussion, Till. All of > my questions have already been answered in the previous discussion. > > I just have one minor reminder regarding using ExecutionVertexID as > the identificator. The JobVertexID is generated according to the > topology instead of generated randomly. Thus, it's not guaranteed to > be unique across different jobs and different execution. This > characteristic is also inherited by the ExecutionVertexID. UUIC, the > ParallelismAndResourceAssignments is a job-level concept so it will > work well. > > Best, > Yangze Guo > > On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <[hidden email]> wrote: >> Thanks for the explanations, Till. >> Keeping the initial design as simple as possible sounds good to me. >> There's no further concern from my side. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <[hidden email]> wrote: >> >>> Thanks Till for the explanation and the follow up actions. >>> That sounds good to me. >>> >>> Thanks, >>> Zhu >>> >>> Till Rohrmann <[hidden email]> 于2021年1月26日周二 下午7:28写道: >>> >>>> Thanks a lot for all your feedback. Let me try to answer it. >>>> >>>> # ScaleUpController vs. RescaleController >>>> >>>> At the moment, the idea of the declarative scheduler is to run a job with >>>> a parallelism which is as close to the target value as possible but to >>>> never exceed it. Since the target value is currently fixed (this will >>>> change once auto-scaling is supported), we never have to scale down. In >>>> fact, scale down events only happen if the system loses slots and then >>> you >>>> cannot choose whether to scale down or not. This and the idea to keep the >>>> initial design as simple as possible motivated the naming of >>>> ScaleUpController. Once we support auto-scaling, we might have to rename >>>> this interface. However, since I don't fully know how things will look >>> like >>>> with auto-scaling, I would like to refrain from this change now. >>>> >>>> # Relationship between declarative scheduler and existing implementations >>>> >>>> The declarative scheduler will implement the SchedulerNG interface. At >>> the >>>> moment I am not aware of any required changes to this interface. Also the >>>> way the JobMaster interacts with the scheduler won't change to the best >>> of >>>> my knowledge. >>>> >>>> # Awareness of SlotAllocator of the ExecutionGraph >>>> >>>> The idea is to make the SlotAllocator not aware of the ExecutionGraph >>>> because it can only be created after the SlotAllocator has decided on the >>>> parallelism the job can be run with. Moreover, the idea is that the >>>> JobInformation instance contains all the required information of a job to >>>> make this decision. >>>> >>>> The SlotAllocator also reserves the slots for a job before the >>>> ExecutionGraph is created. Consequently, we need a way to associate the >>>> slots with the Executions of the ExecutionGraph. Here we have decided to >>>> use the ExecutionVertexID as the identificator. We could also introduce a >>>> new identificator as long as we can map this one to the >>> ExecutionVertexID. >>>> We could for example use JobVertexID and subtask id as the identificator >>>> but this is exactly what the ExecutionVertexID is. That's why we decided >>> to >>>> reuse it for the time being. >>>> >>>> # ScaleUpController >>>> >>>> ## Cumulative parallelism >>>> >>>> Yes, the cumulative parallelism is the sum of all tasks. >>>> >>>> ## When to call the ScaleUpController >>>> >>>> Yes, the idea is that the scheduler will check whether one can run the >>> job >>>> with an increased parallelism if there are new slots available. If this >>> is >>>> the case, then we will ask the ScaleUpController, whether we actually >>>> should scale up (e.g. whether the increase is significant enough or >>> whether >>>> enough time has passed between the last scale up operation). >>>> >>>> ## Do we provide enough information to the ScaleUpController >>>> >>>> I am pretty sure that we don't provide the ScaleUpController enough >>>> information to make the best possible decision. We wanted to keep it >>> simple >>>> in the first version and iterate on the interface with the help of user >>>> feedback. I think that this interface won't fundamentally change the >>>> scheduler and, hence, it shouldn't block future extensions by starting >>> with >>>> a simple interface. >>>> >>>> # Cluster vs. job configuration >>>> >>>> I think you are right that it would be most flexible if one could select >>> a >>>> scheduler on a per job basis with falling back to the cluster >>> configuration >>>> if nothing has been specified. For the sake of simplicity and narrowing >>>> down the scope I would consider this as a follow up. >>>> >>>> # Performance on failover >>>> >>>> I agree that the performance won't be optimal in the failover case >>> because >>>> of 1) having to recreate the EG and 2) redundant IO operations. For 1) I >>>> agree that FLINK-21110 will help a lot. For 2) moving the IO operations >>> out >>>> of the EG could be a good improvement. With the same argument as above, I >>>> would consider this a follow up because I would like to keep the initial >>>> design as simple as possible and I think that these performance >>>> optimizations are not required to make this feature work. >>>> >>>> # Lost execution history >>>> >>>> You are right Zhu Zhu that recreating the ExecutionGraph causes the loss >>>> of information. We are currently investigating which information is >>>> strictly required and needs to be maintained across ExecutionGraph >>>> creations (mainly for tests to succeed). One idea is to either store this >>>> information outside of the ExecutionGraph or to initialize the >>>> ExecutionGraph with the information from the previous instance. Most >>>> likely, we won't give a guarantee that all counters, timestamps and >>> metrics >>>> are correctly maintained across failovers in the first version, though. >>> It >>>> is a bit the same argument as above that this is not strictly required to >>>> make this feature work. Maybe this means that this feature is not >>>> production ready for some users, but I think this is ok. >>>> >>>> In general, to fully integrate the declarative scheduler with the web ui >>>> we have to be able to display changing ExecutionGraphs. Ideally, we would >>>> have something like a timeline where one can select the time for which to >>>> display the state of the job execution. If one goes all the way to the >>>> right, the system shows the live state and all other positions are the >>>> present time minus some offset. >>>> >>>> I will add the follow ups to the FLIP as potential improvements in order >>>> to keep track of them. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <[hidden email]> wrote: >>>> >>>>> Thanks for the proposal! @Till Rohrmann <[hidden email]> >>>>> >>>>> The design looks generally good to me. I have 2 concerns though: >>>>> >>>>> # performance on failover >>>>> I can see is that an ExecutionGraph will be built and initialized on >>> each >>>>> task failure. The process is possibly to be slow: >>>>> 1. the topology building can be very slow (much slower than the >>>>> scheduling >>>>> or restarting process) when the job scale becomes large (see >>>>> FLINK-20612). >>>>> Luckily FLINK-21110 will improve it. >>>>> 2. the input/output format can be slow due to possible IO work to >>>>> external services. >>>>> Maybe we should extract it out from ExecutionGraph building? >>>>> >>>>> # execution history lost >>>>> After building and using a new ExecutionGraph, the execution history in >>>>> the >>>>> old graph will be lost, including status timestamps of the job, prior >>>>> execution >>>>> attempts and their failures. Should we let the new EG inherit these >>>>> information >>>>> from prior EG? Maybe as a future plan with further discussions regarding >>>>> the >>>>> varying parallelism. >>>>> >>>>> Thanks, >>>>> Zhu >>>>> >>>>> Xintong Song <[hidden email]> 于2021年1月24日周日 上午11:55写道: >>>>> >>>>>> Thanks for preparing the FLIP and starting the discussion, Till. >>>>>> >>>>>> I have a few questions trying to understand the design. >>>>>> >>>>>> ## What is the relationship between the current and new schedulers? >>>>>> IIUC, the new declarative scheduler will coexist with the current >>>>>> scheduler, as an alternative that the user needs to explicitly switch >>> to. >>>>>> Then does it require any changes to the scheduler interfaces and how >>> the >>>>>> JobMaster interacts with it? >>>>>> >>>>>> ## Is `SlotAllocator` aware of `ExecutionGraph`? >>>>>> Looking at the interfaces, it seems to me that `SlotAllocator` only >>> takes >>>>>> `JobInformation` and `VertexInformation` as topology inputs. However, >>> it >>>>>> generates `ParallelismAndResourceAssignments` which maps slots to >>>>>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is >>>>>> generated outside `SlotAllocator` while `ExecutionVertexID`s are >>>>>> generated >>>>>> inside `SlotAllocator`. >>>>>> >>>>>> ## About `ScaleUpController#canScaleUp` >>>>>> >>>>>> ### What is cumulative parallelism? >>>>>> The interface shows that the cumulative parallelism is a single number >>>>>> per >>>>>> job graph. I assume this is the sum of parallelism of all vertices? >>>>>> >>>>>> ### Is this interface expected to be called before or after >>>>>> `SlotAllocator#determineParallelism`? >>>>>> IIUC, on new resources appear, the scheduler always calls >>>>>> `SlotAllocator#determineParallelism` to generate a new plan based on >>> the >>>>>> available resources, and `ScaleUpController#canScaleUp` is then called >>> to >>>>>> decide whether to apply the new plan, according to whether the >>>>>> increasement >>>>>> is significant, how long the job has been running since last restart, >>>>>> etc. >>>>>> Is that correct? >>>>>> >>>>>> ### The cumulative parallelism may not be enough for deciding whether >>> the >>>>>> job should be scaled up. >>>>>> I'm assuming my understanding for the above questions are correct. >>> Please >>>>>> correct me if not. >>>>>> >>>>>> I noticed Chesnay's comment on the wiki page about making the decision >>>>>> based on when the job entered the execution state last time. >>>>>> >>>>>> In addition to that, there could also be situations where jobs may not >>>>>> benefit much an increment in cumulative parallelism. >>>>>> E.g., for a topology A -> B, where A and B are in different slot >>> sharing >>>>>> groups, and the current parallelism for both A and B are 2. When 1 new >>>>>> slot >>>>>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. >>>>>> But >>>>>> this may not be a significant beneficial change for the job because the >>>>>> overall performance is still bottlenecked by the parallelism of B. >>>>>> >>>>>> ## Cluster vs. Job configuration >>>>>> I'm not entirely sure about specifying which scheduler to be used via a >>>>>> cluster level configuration option. Each job in a Flink cluster has its >>>>>> own >>>>>> JobMaster, and which scheduler to use is internal to that JobMaster. I >>>>>> understand that the declarative scheduler requires the cluster to use >>>>>> declarative resource management. However, other jobs should still be >>>>>> allowed to use other scheduler implementations that also support >>>>>> declarative resource management (e.g. the current `DefaultScheduler`). >>>>>> Maybe we should consider the cluster level configuration option as a >>>>>> default scheduler, and allow the job to specify a different scheduler >>> in >>>>>> its execution config. This is similar to how we specify which state >>>>>> backend >>>>>> to be used. >>>>>> >>>>>> ## Minor: It might be better to also show in the state machine figure >>>>>> that >>>>>> it can go from `Executing` to `Restarting` when new resources appear. >>>>>> >>>>>> Thank you~ >>>>>> >>>>>> Xintong Song >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <[hidden email]> >>> wrote: >>>>>>> Till, thanks a lot for the proposal. >>>>>>> >>>>>>> Even if the initial phase is only to support scale-up, maybe the >>>>>>> "ScaleUpController" interface should be called "RescaleController" so >>>>>> that >>>>>>> in the future scale-down can be added. >>>>>>> >>>>>>> On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <[hidden email]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> I would like to start a discussion about adding a new type of >>>>>> scheduler >>>>>>> to >>>>>>>> Flink. The declarative scheduler will first declare the required >>>>>>> resources >>>>>>>> and wait for them before deciding on the actual parallelism of a >>> job. >>>>>>>> Thereby it can better handle situations where resources cannot be >>>>>> fully >>>>>>>> fulfilled. Moreover, it will act as a building block for the >>> reactive >>>>>>> mode >>>>>>>> where Flink should scale to the maximum of the currently available >>>>>>>> resources. >>>>>>>> >>>>>>>> Please find more details in the FLIP wiki document [1]. Looking >>>>>> forward >>>>>>> to >>>>>>>> your feedback. >>>>>>>> >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mwtRCg >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> |
Thanks a lot for all your input. I have update the FLIP-160 with your
suggestions: 1) Add job configuration as a follow up 2) Pull out IO operations out of the ExecutionGraph if the failover becomes too slow 3) Introduce a configuration parameter for the timeout in the "Waiting for resources" state (coming from the FLIP-159 discussion) Next, I will create the vote thread for this FLIP. Cheers, Till On Fri, Jan 29, 2021 at 10:06 AM Chesnay Schepler <[hidden email]> wrote: > Yes, since we're only operating within the scheduler, which exists > separately for each job, we don't have to worry about collisions with > other jobs. > > On 1/27/2021 11:08 AM, Yangze Guo wrote: > > Thanks for preparing the FLIP and driving the discussion, Till. All of > > my questions have already been answered in the previous discussion. > > > > I just have one minor reminder regarding using ExecutionVertexID as > > the identificator. The JobVertexID is generated according to the > > topology instead of generated randomly. Thus, it's not guaranteed to > > be unique across different jobs and different execution. This > > characteristic is also inherited by the ExecutionVertexID. UUIC, the > > ParallelismAndResourceAssignments is a job-level concept so it will > > work well. > > > > Best, > > Yangze Guo > > > > On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <[hidden email]> > wrote: > >> Thanks for the explanations, Till. > >> Keeping the initial design as simple as possible sounds good to me. > >> There's no further concern from my side. > >> > >> Thank you~ > >> > >> Xintong Song > >> > >> > >> > >> On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <[hidden email]> wrote: > >> > >>> Thanks Till for the explanation and the follow up actions. > >>> That sounds good to me. > >>> > >>> Thanks, > >>> Zhu > >>> > >>> Till Rohrmann <[hidden email]> 于2021年1月26日周二 下午7:28写道: > >>> > >>>> Thanks a lot for all your feedback. Let me try to answer it. > >>>> > >>>> # ScaleUpController vs. RescaleController > >>>> > >>>> At the moment, the idea of the declarative scheduler is to run a job > with > >>>> a parallelism which is as close to the target value as possible but to > >>>> never exceed it. Since the target value is currently fixed (this will > >>>> change once auto-scaling is supported), we never have to scale down. > In > >>>> fact, scale down events only happen if the system loses slots and then > >>> you > >>>> cannot choose whether to scale down or not. This and the idea to keep > the > >>>> initial design as simple as possible motivated the naming of > >>>> ScaleUpController. Once we support auto-scaling, we might have to > rename > >>>> this interface. However, since I don't fully know how things will look > >>> like > >>>> with auto-scaling, I would like to refrain from this change now. > >>>> > >>>> # Relationship between declarative scheduler and existing > implementations > >>>> > >>>> The declarative scheduler will implement the SchedulerNG interface. At > >>> the > >>>> moment I am not aware of any required changes to this interface. Also > the > >>>> way the JobMaster interacts with the scheduler won't change to the > best > >>> of > >>>> my knowledge. > >>>> > >>>> # Awareness of SlotAllocator of the ExecutionGraph > >>>> > >>>> The idea is to make the SlotAllocator not aware of the ExecutionGraph > >>>> because it can only be created after the SlotAllocator has decided on > the > >>>> parallelism the job can be run with. Moreover, the idea is that the > >>>> JobInformation instance contains all the required information of a > job to > >>>> make this decision. > >>>> > >>>> The SlotAllocator also reserves the slots for a job before the > >>>> ExecutionGraph is created. Consequently, we need a way to associate > the > >>>> slots with the Executions of the ExecutionGraph. Here we have decided > to > >>>> use the ExecutionVertexID as the identificator. We could also > introduce a > >>>> new identificator as long as we can map this one to the > >>> ExecutionVertexID. > >>>> We could for example use JobVertexID and subtask id as the > identificator > >>>> but this is exactly what the ExecutionVertexID is. That's why we > decided > >>> to > >>>> reuse it for the time being. > >>>> > >>>> # ScaleUpController > >>>> > >>>> ## Cumulative parallelism > >>>> > >>>> Yes, the cumulative parallelism is the sum of all tasks. > >>>> > >>>> ## When to call the ScaleUpController > >>>> > >>>> Yes, the idea is that the scheduler will check whether one can run the > >>> job > >>>> with an increased parallelism if there are new slots available. If > this > >>> is > >>>> the case, then we will ask the ScaleUpController, whether we actually > >>>> should scale up (e.g. whether the increase is significant enough or > >>> whether > >>>> enough time has passed between the last scale up operation). > >>>> > >>>> ## Do we provide enough information to the ScaleUpController > >>>> > >>>> I am pretty sure that we don't provide the ScaleUpController enough > >>>> information to make the best possible decision. We wanted to keep it > >>> simple > >>>> in the first version and iterate on the interface with the help of > user > >>>> feedback. I think that this interface won't fundamentally change the > >>>> scheduler and, hence, it shouldn't block future extensions by starting > >>> with > >>>> a simple interface. > >>>> > >>>> # Cluster vs. job configuration > >>>> > >>>> I think you are right that it would be most flexible if one could > select > >>> a > >>>> scheduler on a per job basis with falling back to the cluster > >>> configuration > >>>> if nothing has been specified. For the sake of simplicity and > narrowing > >>>> down the scope I would consider this as a follow up. > >>>> > >>>> # Performance on failover > >>>> > >>>> I agree that the performance won't be optimal in the failover case > >>> because > >>>> of 1) having to recreate the EG and 2) redundant IO operations. For > 1) I > >>>> agree that FLINK-21110 will help a lot. For 2) moving the IO > operations > >>> out > >>>> of the EG could be a good improvement. With the same argument as > above, I > >>>> would consider this a follow up because I would like to keep the > initial > >>>> design as simple as possible and I think that these performance > >>>> optimizations are not required to make this feature work. > >>>> > >>>> # Lost execution history > >>>> > >>>> You are right Zhu Zhu that recreating the ExecutionGraph causes the > loss > >>>> of information. We are currently investigating which information is > >>>> strictly required and needs to be maintained across ExecutionGraph > >>>> creations (mainly for tests to succeed). One idea is to either store > this > >>>> information outside of the ExecutionGraph or to initialize the > >>>> ExecutionGraph with the information from the previous instance. Most > >>>> likely, we won't give a guarantee that all counters, timestamps and > >>> metrics > >>>> are correctly maintained across failovers in the first version, > though. > >>> It > >>>> is a bit the same argument as above that this is not strictly > required to > >>>> make this feature work. Maybe this means that this feature is not > >>>> production ready for some users, but I think this is ok. > >>>> > >>>> In general, to fully integrate the declarative scheduler with the web > ui > >>>> we have to be able to display changing ExecutionGraphs. Ideally, we > would > >>>> have something like a timeline where one can select the time for > which to > >>>> display the state of the job execution. If one goes all the way to the > >>>> right, the system shows the live state and all other positions are the > >>>> present time minus some offset. > >>>> > >>>> I will add the follow ups to the FLIP as potential improvements in > order > >>>> to keep track of them. > >>>> > >>>> Cheers, > >>>> Till > >>>> > >>>> On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <[hidden email]> wrote: > >>>> > >>>>> Thanks for the proposal! @Till Rohrmann <[hidden email]> > >>>>> > >>>>> The design looks generally good to me. I have 2 concerns though: > >>>>> > >>>>> # performance on failover > >>>>> I can see is that an ExecutionGraph will be built and initialized on > >>> each > >>>>> task failure. The process is possibly to be slow: > >>>>> 1. the topology building can be very slow (much slower than the > >>>>> scheduling > >>>>> or restarting process) when the job scale becomes large (see > >>>>> FLINK-20612). > >>>>> Luckily FLINK-21110 will improve it. > >>>>> 2. the input/output format can be slow due to possible IO work to > >>>>> external services. > >>>>> Maybe we should extract it out from ExecutionGraph building? > >>>>> > >>>>> # execution history lost > >>>>> After building and using a new ExecutionGraph, the execution history > in > >>>>> the > >>>>> old graph will be lost, including status timestamps of the job, prior > >>>>> execution > >>>>> attempts and their failures. Should we let the new EG inherit these > >>>>> information > >>>>> from prior EG? Maybe as a future plan with further discussions > regarding > >>>>> the > >>>>> varying parallelism. > >>>>> > >>>>> Thanks, > >>>>> Zhu > >>>>> > >>>>> Xintong Song <[hidden email]> 于2021年1月24日周日 上午11:55写道: > >>>>> > >>>>>> Thanks for preparing the FLIP and starting the discussion, Till. > >>>>>> > >>>>>> I have a few questions trying to understand the design. > >>>>>> > >>>>>> ## What is the relationship between the current and new schedulers? > >>>>>> IIUC, the new declarative scheduler will coexist with the current > >>>>>> scheduler, as an alternative that the user needs to explicitly > switch > >>> to. > >>>>>> Then does it require any changes to the scheduler interfaces and how > >>> the > >>>>>> JobMaster interacts with it? > >>>>>> > >>>>>> ## Is `SlotAllocator` aware of `ExecutionGraph`? > >>>>>> Looking at the interfaces, it seems to me that `SlotAllocator` only > >>> takes > >>>>>> `JobInformation` and `VertexInformation` as topology inputs. > However, > >>> it > >>>>>> generates `ParallelismAndResourceAssignments` which maps slots to > >>>>>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is > >>>>>> generated outside `SlotAllocator` while `ExecutionVertexID`s are > >>>>>> generated > >>>>>> inside `SlotAllocator`. > >>>>>> > >>>>>> ## About `ScaleUpController#canScaleUp` > >>>>>> > >>>>>> ### What is cumulative parallelism? > >>>>>> The interface shows that the cumulative parallelism is a single > number > >>>>>> per > >>>>>> job graph. I assume this is the sum of parallelism of all vertices? > >>>>>> > >>>>>> ### Is this interface expected to be called before or after > >>>>>> `SlotAllocator#determineParallelism`? > >>>>>> IIUC, on new resources appear, the scheduler always calls > >>>>>> `SlotAllocator#determineParallelism` to generate a new plan based on > >>> the > >>>>>> available resources, and `ScaleUpController#canScaleUp` is then > called > >>> to > >>>>>> decide whether to apply the new plan, according to whether the > >>>>>> increasement > >>>>>> is significant, how long the job has been running since last > restart, > >>>>>> etc. > >>>>>> Is that correct? > >>>>>> > >>>>>> ### The cumulative parallelism may not be enough for deciding > whether > >>> the > >>>>>> job should be scaled up. > >>>>>> I'm assuming my understanding for the above questions are correct. > >>> Please > >>>>>> correct me if not. > >>>>>> > >>>>>> I noticed Chesnay's comment on the wiki page about making the > decision > >>>>>> based on when the job entered the execution state last time. > >>>>>> > >>>>>> In addition to that, there could also be situations where jobs may > not > >>>>>> benefit much an increment in cumulative parallelism. > >>>>>> E.g., for a topology A -> B, where A and B are in different slot > >>> sharing > >>>>>> groups, and the current parallelism for both A and B are 2. When 1 > new > >>>>>> slot > >>>>>> appears, `SlotAllocator` may suggest increasing parallelism of A to > 3. > >>>>>> But > >>>>>> this may not be a significant beneficial change for the job because > the > >>>>>> overall performance is still bottlenecked by the parallelism of B. > >>>>>> > >>>>>> ## Cluster vs. Job configuration > >>>>>> I'm not entirely sure about specifying which scheduler to be used > via a > >>>>>> cluster level configuration option. Each job in a Flink cluster has > its > >>>>>> own > >>>>>> JobMaster, and which scheduler to use is internal to that > JobMaster. I > >>>>>> understand that the declarative scheduler requires the cluster to > use > >>>>>> declarative resource management. However, other jobs should still be > >>>>>> allowed to use other scheduler implementations that also support > >>>>>> declarative resource management (e.g. the current > `DefaultScheduler`). > >>>>>> Maybe we should consider the cluster level configuration option as a > >>>>>> default scheduler, and allow the job to specify a different > scheduler > >>> in > >>>>>> its execution config. This is similar to how we specify which state > >>>>>> backend > >>>>>> to be used. > >>>>>> > >>>>>> ## Minor: It might be better to also show in the state machine > figure > >>>>>> that > >>>>>> it can go from `Executing` to `Restarting` when new resources > appear. > >>>>>> > >>>>>> Thank you~ > >>>>>> > >>>>>> Xintong Song > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <[hidden email]> > >>> wrote: > >>>>>>> Till, thanks a lot for the proposal. > >>>>>>> > >>>>>>> Even if the initial phase is only to support scale-up, maybe the > >>>>>>> "ScaleUpController" interface should be called "RescaleController" > so > >>>>>> that > >>>>>>> in the future scale-down can be added. > >>>>>>> > >>>>>>> On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann < > [hidden email]> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi everyone, > >>>>>>>> > >>>>>>>> I would like to start a discussion about adding a new type of > >>>>>> scheduler > >>>>>>> to > >>>>>>>> Flink. The declarative scheduler will first declare the required > >>>>>>> resources > >>>>>>>> and wait for them before deciding on the actual parallelism of a > >>> job. > >>>>>>>> Thereby it can better handle situations where resources cannot be > >>>>>> fully > >>>>>>>> fulfilled. Moreover, it will act as a building block for the > >>> reactive > >>>>>>> mode > >>>>>>>> where Flink should scale to the maximum of the currently available > >>>>>>>> resources. > >>>>>>>> > >>>>>>>> Please find more details in the FLIP wiki document [1]. Looking > >>>>>> forward > >>>>>>> to > >>>>>>>> your feedback. > >>>>>>>> > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mwtRCg > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Till > >>>>>>>> > > |
Free forum by Nabble | Edit this page |