[DISCUSS] FLIP-138: Declarative Resource management

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-138: Declarative Resource management

Chesnay Schepler-3
Hello,

in FLIP-138 we want to rework the way the JobMaster acquires slots, such
that required resources are declared before a job is scheduled and th
job execution is adjusted according to the provided resources (e.g.,
reducing parallelism), instead of asking for a fixed number of resources
during scheduling and failing midway through if not enough resources are
available.

This is a stepping stone towards reactive mode, where Flink will
automatically make use of new TaskExecutors being started.

More details can be found here
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management>.

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Xintong Song
Thanks for preparing the FLIP and driving this discussion, @Chesnay & @Till.

I really like the idea. I see a great value in the proposed declarative
resource management, in terms of flexibility, usability and efficiency.

I have a few comments and questions regarding the FLIP design. In general,
the protocol design makes good sense to me. My main concern is that it is
not very clear to me what changes are required from the
Resource/SlotManager side to adapt to the new protocol.

*1. Distributed slots across different jobs*

Jobs which register their requirements first, will have precedence over
> other jobs also if the requirements change during the runtime.

Just trying to understand, does this mean jobs are prioritized by the order
of their first resource declaring?

*2. AllocationID*

Is this FLIP suggesting to completely remove AllocationID?

I'm fine with this change. It seems where AllocationID is used can either
be removed or be replaced by JobID. This reflects the concept that slots
are now assigned to a job instead of its individual slot requests.

I would like to bring to attention that this also requires changes on the
TM side, with respect to FLIP-56[1].

In the context of dynamic slot allocation introduced by FLIP-56, slots do
not pre-exist on TM and are dynamically created when RM calls
TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor
their SlotIDs, RM requests slots from TM with a special SlotID (negative
slot index). The semantic changes from "requesting the slot identified by
the given SlotID" to "requesting a slot with the given resource profile".
The AllocationID is used for identifying the dynamic slots in such cases.

From the perspective of FLIP-56 and fine grained resource management, I'm
fine with removing AllocationID. In the meantime, we would need TM to
recognize the special negative indexed SlotID and generate a new unique
SlotID for identifying the slot.

*3. Minimum resource requirement*

However, we can let the JobMaster know if we cannot fulfill the minimum
> resource requirement for a job after
> resourcemanager.standalone.start-up-time has passed.

What is the "minimum resource requirement for a job"? Did I overlook
anything?

*4. Offer/free slots between JM/TM*

This probably deserves a separate discussion thread. Just want to bring it
up.

The idea has been coming to me for quite some time. Is this design, that JM
requests resources from RM while accepting/releasing resources from/to TM,
the right thing?

The pain point is that events of JM's activities (requesting/releasing
resources) arrive at RM out of order. This leads to several problems.

   - When a job fails and task cancelation takes long, some of the slots
   might be released from the slot pool due to being unused for a while. Then
   the job restarts and requests these slots again. At this time, RM may
   receive slot requests before noticing from TM heartbeats that previous
   slots are released, thus requesting new resources. I've seen many times
   that the Yarn cluster has a heavy load and is not allocating resources
   quickly enough, which leads to slot request timeout and job failover, and
   during the failover more resources are requested which adds more load to
   the Yarn cluster. Happily, this should be improved with the declarative
   resource management. :)
   - As described in this FLIP, it is possible that RM learns the releasing
   of slots from TM heartbeat before noticing the resource requirement
   decreasing, it may allocate more resources which need to be released soon.
   - It complicates the ResourceManager/SlotManager, by requiring an
   additional slot state PENDING, which means the slot is assigned by RM but
   is not confirmed successfully ordered by TM.

Why not just make RM offer the allocated resources (TM address, SlotID,
etc.) to JM, and JM release resources to RM? So that for all the resource
management JM talks to RM, and for the task deployment and execution it
talks to TM?

I tried to understand the benefits for having the current design, and found
the following in FLIP-6[2].

> All that the ResourceManager does is negotiate between the
> cluster-manager, the JobManager, and the TaskManagers. Its state can hence
> be reconstructed from re-acquiring containers and re-registration from
> JobManagers and TaskManagers


Correct me if I'm wrong, it seems the original purpose is to make sure the
assignment between jobs and slots are confirmed between JM and TMs, so that
failures of RM will not lead to any inconsistency. However, this only
benefits scenarios where RM fails while JM and TMs live. Currently, JM and
RM are in the same process. We do not really have any scenario where RM
fails alone. We might separate JM and RM to different processes in future,
but as far as I can see we don't have such requirements at the moment. It
seems to me that we are suffering the current problems, complying to
potential future benefits.

Maybe I overlooked something.

*5. Implementation Plan*

For SlotPool, it sounds quite straightforward to "aggregate individual slot
requests".

For Resource/SlotManager, it seems there are quite a lot changes needed,
with the removal of individual slot requests and AllocationID. It's not
clear to me what is the first step plan for RM/SM? Do we internally treat
the resource requirements as individual slot requests as the first step, so
only the interfaces are changed? Or do we actually change (practically
re-write) the slot allocation logics?

Thank you~

Xintong Song


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077

On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <[hidden email]> wrote:

> Hello,
>
> in FLIP-138 we want to rework the way the JobMaster acquires slots, such
> that required resources are declared before a job is scheduled and th
> job execution is adjusted according to the provided resources (e.g.,
> reducing parallelism), instead of asking for a fixed number of resources
> during scheduling and failing midway through if not enough resources are
> available.
>
> This is a stepping stone towards reactive mode, where Flink will
> automatically make use of new TaskExecutors being started.
>
> More details can be found here
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> >.
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Chesnay Schepler-3
Thank you Xintong for your questions!


        Job prioritization

Yes, the job which declares it's initial requirements first is prioritized.
This is very much for simplicity; for example this avoids the nasty case
where all jobs get some resources, but none get enough to actually run
the job.


        Minimum resource requirements

My bad; at some point we want to allow the JobMaster to declare a range
of resources it could use to run a job, for example min=1, target=10,
max=+inf.

With this model, the RM would then try to balance the resources such
that as many jobs as possible are as close to the target state as possible.

Currently, the minimum/target/maximum resources are all the same. So the
notification is sent whenever the current requirements cannot be met.


        Allocation IDs

We do intend to, at the very least, remove AllocationIDs on the
SlotManager side, as they are just not required there.

On the slotpool side we have to keep them around at least until the
existing Slotpool implementations are removed (not sure whether we'll
fully commit to this in 1.12), since the interfaces use AllocationIDs,
which also bleed into the JobMaster.
The TaskExecutor is in a similar position.
But in the long-term, yes they will be removed, and most usages will
probably be replaced by the SlotID.


        FLIP-56

Dynamic slot allocations are indeed quite interesting and raise a few
questions; for example, the main purpose of it is to ensure maximum
resource utilization. In that case, should the JobMaster be allowed to
re-use a slot it if the task requires less resources than the slot
provides, or should it always request a new slot that exactly matches?

There is a trade-off to be made between maximum resource utilization
(request exactly matching slots, and only re-use exact matches) and
quicker job deployment (re-use slot even if they don't exactly match,
skip round-trip to RM).

As for how to handle the lack of a preemptively known SlotIDs, that
should be fine in and of itself; we already handle a similar case when
we request a new TaskExecutor to be started. So long as there is some
way to know how many resources the TaskExecutor has in total I do not
see a problem at the moment. We will get the SlotID eventually by virtue
of the heartbeat SlotReport.


        Implementation plan (SlotManager)

You are on the right track. The SlotManager tracks the declared resource
requirements, and if the requirements increased it creates a
SlotRequest, which then goes through similar code paths as we have at
the moment (try to find a free slot, if found tell the TM, otherwise try
to request new TM).
The SlotManager changes are not that substantial to get a working
version; we have a PoC and most of the work went into refactoring the
SlotManager into a more manageable state. (split into several
components, stricter and simplified Slot life-cycle, ...).


        Offer/free slots between JM/TM

Gotta run, but that's a good question and I'll think about. But I think
it comes down to making less changes, and being able to leverage
existing reconciliation protocols.
Do note that TaskExecutor also explicitly inform the RM about freed
slots; the heartbeat slot report is just a safety net.
I'm not sure whether slot requests are able to overtake a slot release;
@till do you have thoughts on that?
As for the race condition between the requirements reduction and slot
release, if we run into problems we have the backup plan of only
releasing the slot after the requirement reduction has been acknowledged.

On 26/08/2020 10:31, Xintong Song wrote:

> Thanks for preparing the FLIP and driving this discussion, @Chesnay & @Till.
>
> I really like the idea. I see a great value in the proposed declarative
> resource management, in terms of flexibility, usability and efficiency.
>
> I have a few comments and questions regarding the FLIP design. In general,
> the protocol design makes good sense to me. My main concern is that it is
> not very clear to me what changes are required from the
> Resource/SlotManager side to adapt to the new protocol.
>
> *1. Distributed slots across different jobs*
>
> Jobs which register their requirements first, will have precedence over
>> other jobs also if the requirements change during the runtime.
> Just trying to understand, does this mean jobs are prioritized by the order
> of their first resource declaring?
>
> *2. AllocationID*
>
> Is this FLIP suggesting to completely remove AllocationID?
>
> I'm fine with this change. It seems where AllocationID is used can either
> be removed or be replaced by JobID. This reflects the concept that slots
> are now assigned to a job instead of its individual slot requests.
>
> I would like to bring to attention that this also requires changes on the
> TM side, with respect to FLIP-56[1].
>
> In the context of dynamic slot allocation introduced by FLIP-56, slots do
> not pre-exist on TM and are dynamically created when RM calls
> TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor
> their SlotIDs, RM requests slots from TM with a special SlotID (negative
> slot index). The semantic changes from "requesting the slot identified by
> the given SlotID" to "requesting a slot with the given resource profile".
> The AllocationID is used for identifying the dynamic slots in such cases.
>
> >From the perspective of FLIP-56 and fine grained resource management, I'm
> fine with removing AllocationID. In the meantime, we would need TM to
> recognize the special negative indexed SlotID and generate a new unique
> SlotID for identifying the slot.
>
> *3. Minimum resource requirement*
>
> However, we can let the JobMaster know if we cannot fulfill the minimum
>> resource requirement for a job after
>> resourcemanager.standalone.start-up-time has passed.
> What is the "minimum resource requirement for a job"? Did I overlook
> anything?
>
> *4. Offer/free slots between JM/TM*
>
> This probably deserves a separate discussion thread. Just want to bring it
> up.
>
> The idea has been coming to me for quite some time. Is this design, that JM
> requests resources from RM while accepting/releasing resources from/to TM,
> the right thing?
>
> The pain point is that events of JM's activities (requesting/releasing
> resources) arrive at RM out of order. This leads to several problems.
>
>     - When a job fails and task cancelation takes long, some of the slots
>     might be released from the slot pool due to being unused for a while. Then
>     the job restarts and requests these slots again. At this time, RM may
>     receive slot requests before noticing from TM heartbeats that previous
>     slots are released, thus requesting new resources. I've seen many times
>     that the Yarn cluster has a heavy load and is not allocating resources
>     quickly enough, which leads to slot request timeout and job failover, and
>     during the failover more resources are requested which adds more load to
>     the Yarn cluster. Happily, this should be improved with the declarative
>     resource management. :)
>     - As described in this FLIP, it is possible that RM learns the releasing
>     of slots from TM heartbeat before noticing the resource requirement
>     decreasing, it may allocate more resources which need to be released soon.
>     - It complicates the ResourceManager/SlotManager, by requiring an
>     additional slot state PENDING, which means the slot is assigned by RM but
>     is not confirmed successfully ordered by TM.
>
> Why not just make RM offer the allocated resources (TM address, SlotID,
> etc.) to JM, and JM release resources to RM? So that for all the resource
> management JM talks to RM, and for the task deployment and execution it
> talks to TM?
>
> I tried to understand the benefits for having the current design, and found
> the following in FLIP-6[2].
>
>> All that the ResourceManager does is negotiate between the
>> cluster-manager, the JobManager, and the TaskManagers. Its state can hence
>> be reconstructed from re-acquiring containers and re-registration from
>> JobManagers and TaskManagers
>
> Correct me if I'm wrong, it seems the original purpose is to make sure the
> assignment between jobs and slots are confirmed between JM and TMs, so that
> failures of RM will not lead to any inconsistency. However, this only
> benefits scenarios where RM fails while JM and TMs live. Currently, JM and
> RM are in the same process. We do not really have any scenario where RM
> fails alone. We might separate JM and RM to different processes in future,
> but as far as I can see we don't have such requirements at the moment. It
> seems to me that we are suffering the current problems, complying to
> potential future benefits.
>
> Maybe I overlooked something.
>
> *5. Implementation Plan*
>
> For SlotPool, it sounds quite straightforward to "aggregate individual slot
> requests".
>
> For Resource/SlotManager, it seems there are quite a lot changes needed,
> with the removal of individual slot requests and AllocationID. It's not
> clear to me what is the first step plan for RM/SM? Do we internally treat
> the resource requirements as individual slot requests as the first step, so
> only the interfaces are changed? Or do we actually change (practically
> re-write) the slot allocation logics?
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>
> On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <[hidden email]> wrote:
>
>> Hello,
>>
>> in FLIP-138 we want to rework the way the JobMaster acquires slots, such
>> that required resources are declared before a job is scheduled and th
>> job execution is adjusted according to the provided resources (e.g.,
>> reducing parallelism), instead of asking for a fixed number of resources
>> during scheduling and failing midway through if not enough resources are
>> available.
>>
>> This is a stepping stone towards reactive mode, where Flink will
>> automatically make use of new TaskExecutors being started.
>>
>> More details can be found here
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>>> .
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Xintong Song
Thanks for the quick response.

*Job prioritization, Allocation IDs, Minimum resource
requirements, SlotManager Implementation Plan:* Sounds good to me.

*FLIP-56*
Good point about the trade-off. I believe maximum resource utilization and
quick deployment are desired in different scenarios. E.g., a long running
streaming job deserves some deployment latency to improve the resource
utilization, which benefits the entire lifecycle of the job. On the other
hand, short batch queries may prefer quick deployment, otherwise the time
for resource allocation might significantly increase the response time.
It would be good enough for me to bring these questions to attention.
Nothing that I'm aware of should block this FLIP.

Thank you~

Xintong Song



On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <[hidden email]> wrote:

> Thank you Xintong for your questions!
> Job prioritization
> Yes, the job which declares it's initial requirements first is prioritized.
> This is very much for simplicity; for example this avoids the nasty case
> where all jobs get some resources, but none get enough to actually run the
> job.
> Minimum resource requirements
>
> My bad; at some point we want to allow the JobMaster to declare a range of
> resources it could use to run a job, for example min=1, target=10,
> max=+inf.
>
> With this model, the RM would then try to balance the resources such that
> as many jobs as possible are as close to the target state as possible.
>
> Currently, the minimum/target/maximum resources are all the same. So the
> notification is sent whenever the current requirements cannot be met.
> Allocation IDs
> We do intend to, at the very least, remove AllocationIDs on the
> SlotManager side, as they are just not required there.
>
> On the slotpool side we have to keep them around at least until the
> existing Slotpool implementations are removed (not sure whether we'll fully
> commit to this in 1.12), since the interfaces use AllocationIDs, which also
> bleed into the JobMaster.
> The TaskExecutor is in a similar position.
> But in the long-term, yes they will be removed, and most usages will
> probably be replaced by the SlotID.
> FLIP-56
>
> Dynamic slot allocations are indeed quite interesting and raise a few
> questions; for example, the main purpose of it is to ensure maximum
> resource utilization. In that case, should the JobMaster be allowed to
> re-use a slot it if the task requires less resources than the slot
> provides, or should it always request a new slot that exactly matches?
>
> There is a trade-off to be made between maximum resource utilization
> (request exactly matching slots, and only re-use exact matches) and quicker
> job deployment (re-use slot even if they don't exactly match, skip
> round-trip to RM).
>
> As for how to handle the lack of a preemptively known SlotIDs, that should
> be fine in and of itself; we already handle a similar case when we request
> a new TaskExecutor to be started. So long as there is some way to know how
> many resources the TaskExecutor has in total I do not see a problem at the
> moment. We will get the SlotID eventually by virtue of the heartbeat
> SlotReport.
> Implementation plan (SlotManager)
> You are on the right track. The SlotManager tracks the declared resource
> requirements, and if the requirements increased it creates a SlotRequest,
> which then goes through similar code paths as we have at the moment (try to
> find a free slot, if found tell the TM, otherwise try to request new TM).
> The SlotManager changes are not that substantial to get a working version;
> we have a PoC and most of the work went into refactoring the SlotManager
> into a more manageable state. (split into several components, stricter and
> simplified Slot life-cycle, ...).
> Offer/free slots between JM/TM
> Gotta run, but that's a good question and I'll think about. But I think it
> comes down to making less changes, and being able to leverage existing
> reconciliation protocols.
> Do note that TaskExecutor also explicitly inform the RM about freed slots;
> the heartbeat slot report is just a safety net.
> I'm not sure whether slot requests are able to overtake a slot release;
> @till do you have thoughts on that?
> As for the race condition between the requirements reduction and slot
> release, if we run into problems we have the backup plan of only releasing
> the slot after the requirement reduction has been acknowledged.
>
> On 26/08/2020 10:31, Xintong Song wrote:
>
> Thanks for preparing the FLIP and driving this discussion, @Chesnay & @Till.
>
> I really like the idea. I see a great value in the proposed declarative
> resource management, in terms of flexibility, usability and efficiency.
>
> I have a few comments and questions regarding the FLIP design. In general,
> the protocol design makes good sense to me. My main concern is that it is
> not very clear to me what changes are required from the
> Resource/SlotManager side to adapt to the new protocol.
>
> *1. Distributed slots across different jobs*
>
> Jobs which register their requirements first, will have precedence over
>
> other jobs also if the requirements change during the runtime.
>
> Just trying to understand, does this mean jobs are prioritized by the order
> of their first resource declaring?
>
> *2. AllocationID*
>
> Is this FLIP suggesting to completely remove AllocationID?
>
> I'm fine with this change. It seems where AllocationID is used can either
> be removed or be replaced by JobID. This reflects the concept that slots
> are now assigned to a job instead of its individual slot requests.
>
> I would like to bring to attention that this also requires changes on the
> TM side, with respect to FLIP-56[1].
>
> In the context of dynamic slot allocation introduced by FLIP-56, slots do
> not pre-exist on TM and are dynamically created when RM calls
> TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor
> their SlotIDs, RM requests slots from TM with a special SlotID (negative
> slot index). The semantic changes from "requesting the slot identified by
> the given SlotID" to "requesting a slot with the given resource profile".
> The AllocationID is used for identifying the dynamic slots in such cases.
>
> >From the perspective of FLIP-56 and fine grained resource management, I'm
> fine with removing AllocationID. In the meantime, we would need TM to
> recognize the special negative indexed SlotID and generate a new unique
> SlotID for identifying the slot.
>
> *3. Minimum resource requirement*
>
> However, we can let the JobMaster know if we cannot fulfill the minimum
>
> resource requirement for a job after
> resourcemanager.standalone.start-up-time has passed.
>
> What is the "minimum resource requirement for a job"? Did I overlook
> anything?
>
> *4. Offer/free slots between JM/TM*
>
> This probably deserves a separate discussion thread. Just want to bring it
> up.
>
> The idea has been coming to me for quite some time. Is this design, that JM
> requests resources from RM while accepting/releasing resources from/to TM,
> the right thing?
>
> The pain point is that events of JM's activities (requesting/releasing
> resources) arrive at RM out of order. This leads to several problems.
>
>    - When a job fails and task cancelation takes long, some of the slots
>    might be released from the slot pool due to being unused for a while. Then
>    the job restarts and requests these slots again. At this time, RM may
>    receive slot requests before noticing from TM heartbeats that previous
>    slots are released, thus requesting new resources. I've seen many times
>    that the Yarn cluster has a heavy load and is not allocating resources
>    quickly enough, which leads to slot request timeout and job failover, and
>    during the failover more resources are requested which adds more load to
>    the Yarn cluster. Happily, this should be improved with the declarative
>    resource management. :)
>    - As described in this FLIP, it is possible that RM learns the releasing
>    of slots from TM heartbeat before noticing the resource requirement
>    decreasing, it may allocate more resources which need to be released soon.
>    - It complicates the ResourceManager/SlotManager, by requiring an
>    additional slot state PENDING, which means the slot is assigned by RM but
>    is not confirmed successfully ordered by TM.
>
> Why not just make RM offer the allocated resources (TM address, SlotID,
> etc.) to JM, and JM release resources to RM? So that for all the resource
> management JM talks to RM, and for the task deployment and execution it
> talks to TM?
>
> I tried to understand the benefits for having the current design, and found
> the following in FLIP-6[2].
>
>
> All that the ResourceManager does is negotiate between the
> cluster-manager, the JobManager, and the TaskManagers. Its state can hence
> be reconstructed from re-acquiring containers and re-registration from
> JobManagers and TaskManagers
>
> Correct me if I'm wrong, it seems the original purpose is to make sure the
> assignment between jobs and slots are confirmed between JM and TMs, so that
> failures of RM will not lead to any inconsistency. However, this only
> benefits scenarios where RM fails while JM and TMs live. Currently, JM and
> RM are in the same process. We do not really have any scenario where RM
> fails alone. We might separate JM and RM to different processes in future,
> but as far as I can see we don't have such requirements at the moment. It
> seems to me that we are suffering the current problems, complying to
> potential future benefits.
>
> Maybe I overlooked something.
>
> *5. Implementation Plan*
>
> For SlotPool, it sounds quite straightforward to "aggregate individual slot
> requests".
>
> For Resource/SlotManager, it seems there are quite a lot changes needed,
> with the removal of individual slot requests and AllocationID. It's not
> clear to me what is the first step plan for RM/SM? Do we internally treat
> the resource requirements as individual slot requests as the first step, so
> only the interfaces are changed? Or do we actually change (practically
> re-write) the slot allocation logics?
>
> Thank you~
>
> Xintong Song
>
>
> [1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> [2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>
> On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <[hidden email]> <[hidden email]> wrote:
>
>
> Hello,
>
> in FLIP-138 we want to rework the way the JobMaster acquires slots, such
> that required resources are declared before a job is scheduled and th
> job execution is adjusted according to the provided resources (e.g.,
> reducing parallelism), instead of asking for a fixed number of resources
> during scheduling and failing midway through if not enough resources are
> available.
>
> This is a stepping stone towards reactive mode, where Flink will
> automatically make use of new TaskExecutors being started.
>
> More details can be found here
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>
> .
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Zhu Zhu
Thanks Chesnay&Till for proposing this improvement.
It's of good value to allow jobs to make best use of available resources
adaptively. Not
to mention it further supports reactive mode.
So big +1 for it.

I have a minor concern about possible regression in certain cases due to
the proposed
JobVertex-wise scheduling which replaces current ExecutionVertex-wise
scheduling.
In the proposal, looks to me it requires a stage to finish before its
consumer stage can be
scheduled. This limitation, however, does not exist in current scheduler.
In the case that there
exists a POINTWISE BLOCKING edge, the downstream execution region can be
scheduled
right after its connected upstream execution vertices finishes, even before
the whole upstream
stage finishes. This allows the region to be launched earlier and make use
of available resources.
Do we need to let the new scheduler retain this property?

Thanks,
Zhu

Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:

> Thanks for the quick response.
>
> *Job prioritization, Allocation IDs, Minimum resource
> requirements, SlotManager Implementation Plan:* Sounds good to me.
>
> *FLIP-56*
> Good point about the trade-off. I believe maximum resource utilization and
> quick deployment are desired in different scenarios. E.g., a long running
> streaming job deserves some deployment latency to improve the resource
> utilization, which benefits the entire lifecycle of the job. On the other
> hand, short batch queries may prefer quick deployment, otherwise the time
> for resource allocation might significantly increase the response time.
> It would be good enough for me to bring these questions to attention.
> Nothing that I'm aware of should block this FLIP.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <[hidden email]>
> wrote:
>
> > Thank you Xintong for your questions!
> > Job prioritization
> > Yes, the job which declares it's initial requirements first is
> prioritized.
> > This is very much for simplicity; for example this avoids the nasty case
> > where all jobs get some resources, but none get enough to actually run
> the
> > job.
> > Minimum resource requirements
> >
> > My bad; at some point we want to allow the JobMaster to declare a range
> of
> > resources it could use to run a job, for example min=1, target=10,
> > max=+inf.
> >
> > With this model, the RM would then try to balance the resources such that
> > as many jobs as possible are as close to the target state as possible.
> >
> > Currently, the minimum/target/maximum resources are all the same. So the
> > notification is sent whenever the current requirements cannot be met.
> > Allocation IDs
> > We do intend to, at the very least, remove AllocationIDs on the
> > SlotManager side, as they are just not required there.
> >
> > On the slotpool side we have to keep them around at least until the
> > existing Slotpool implementations are removed (not sure whether we'll
> fully
> > commit to this in 1.12), since the interfaces use AllocationIDs, which
> also
> > bleed into the JobMaster.
> > The TaskExecutor is in a similar position.
> > But in the long-term, yes they will be removed, and most usages will
> > probably be replaced by the SlotID.
> > FLIP-56
> >
> > Dynamic slot allocations are indeed quite interesting and raise a few
> > questions; for example, the main purpose of it is to ensure maximum
> > resource utilization. In that case, should the JobMaster be allowed to
> > re-use a slot it if the task requires less resources than the slot
> > provides, or should it always request a new slot that exactly matches?
> >
> > There is a trade-off to be made between maximum resource utilization
> > (request exactly matching slots, and only re-use exact matches) and
> quicker
> > job deployment (re-use slot even if they don't exactly match, skip
> > round-trip to RM).
> >
> > As for how to handle the lack of a preemptively known SlotIDs, that
> should
> > be fine in and of itself; we already handle a similar case when we
> request
> > a new TaskExecutor to be started. So long as there is some way to know
> how
> > many resources the TaskExecutor has in total I do not see a problem at
> the
> > moment. We will get the SlotID eventually by virtue of the heartbeat
> > SlotReport.
> > Implementation plan (SlotManager)
> > You are on the right track. The SlotManager tracks the declared resource
> > requirements, and if the requirements increased it creates a SlotRequest,
> > which then goes through similar code paths as we have at the moment (try
> to
> > find a free slot, if found tell the TM, otherwise try to request new TM).
> > The SlotManager changes are not that substantial to get a working
> version;
> > we have a PoC and most of the work went into refactoring the SlotManager
> > into a more manageable state. (split into several components, stricter
> and
> > simplified Slot life-cycle, ...).
> > Offer/free slots between JM/TM
> > Gotta run, but that's a good question and I'll think about. But I think
> it
> > comes down to making less changes, and being able to leverage existing
> > reconciliation protocols.
> > Do note that TaskExecutor also explicitly inform the RM about freed
> slots;
> > the heartbeat slot report is just a safety net.
> > I'm not sure whether slot requests are able to overtake a slot release;
> > @till do you have thoughts on that?
> > As for the race condition between the requirements reduction and slot
> > release, if we run into problems we have the backup plan of only
> releasing
> > the slot after the requirement reduction has been acknowledged.
> >
> > On 26/08/2020 10:31, Xintong Song wrote:
> >
> > Thanks for preparing the FLIP and driving this discussion, @Chesnay &
> @Till.
> >
> > I really like the idea. I see a great value in the proposed declarative
> > resource management, in terms of flexibility, usability and efficiency.
> >
> > I have a few comments and questions regarding the FLIP design. In
> general,
> > the protocol design makes good sense to me. My main concern is that it is
> > not very clear to me what changes are required from the
> > Resource/SlotManager side to adapt to the new protocol.
> >
> > *1. Distributed slots across different jobs*
> >
> > Jobs which register their requirements first, will have precedence over
> >
> > other jobs also if the requirements change during the runtime.
> >
> > Just trying to understand, does this mean jobs are prioritized by the
> order
> > of their first resource declaring?
> >
> > *2. AllocationID*
> >
> > Is this FLIP suggesting to completely remove AllocationID?
> >
> > I'm fine with this change. It seems where AllocationID is used can either
> > be removed or be replaced by JobID. This reflects the concept that slots
> > are now assigned to a job instead of its individual slot requests.
> >
> > I would like to bring to attention that this also requires changes on the
> > TM side, with respect to FLIP-56[1].
> >
> > In the context of dynamic slot allocation introduced by FLIP-56, slots do
> > not pre-exist on TM and are dynamically created when RM calls
> > TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor
> > their SlotIDs, RM requests slots from TM with a special SlotID (negative
> > slot index). The semantic changes from "requesting the slot identified by
> > the given SlotID" to "requesting a slot with the given resource profile".
> > The AllocationID is used for identifying the dynamic slots in such cases.
> >
> > >From the perspective of FLIP-56 and fine grained resource management,
> I'm
> > fine with removing AllocationID. In the meantime, we would need TM to
> > recognize the special negative indexed SlotID and generate a new unique
> > SlotID for identifying the slot.
> >
> > *3. Minimum resource requirement*
> >
> > However, we can let the JobMaster know if we cannot fulfill the minimum
> >
> > resource requirement for a job after
> > resourcemanager.standalone.start-up-time has passed.
> >
> > What is the "minimum resource requirement for a job"? Did I overlook
> > anything?
> >
> > *4. Offer/free slots between JM/TM*
> >
> > This probably deserves a separate discussion thread. Just want to bring
> it
> > up.
> >
> > The idea has been coming to me for quite some time. Is this design, that
> JM
> > requests resources from RM while accepting/releasing resources from/to
> TM,
> > the right thing?
> >
> > The pain point is that events of JM's activities (requesting/releasing
> > resources) arrive at RM out of order. This leads to several problems.
> >
> >    - When a job fails and task cancelation takes long, some of the slots
> >    might be released from the slot pool due to being unused for a while.
> Then
> >    the job restarts and requests these slots again. At this time, RM may
> >    receive slot requests before noticing from TM heartbeats that previous
> >    slots are released, thus requesting new resources. I've seen many
> times
> >    that the Yarn cluster has a heavy load and is not allocating resources
> >    quickly enough, which leads to slot request timeout and job failover,
> and
> >    during the failover more resources are requested which adds more load
> to
> >    the Yarn cluster. Happily, this should be improved with the
> declarative
> >    resource management. :)
> >    - As described in this FLIP, it is possible that RM learns the
> releasing
> >    of slots from TM heartbeat before noticing the resource requirement
> >    decreasing, it may allocate more resources which need to be released
> soon.
> >    - It complicates the ResourceManager/SlotManager, by requiring an
> >    additional slot state PENDING, which means the slot is assigned by RM
> but
> >    is not confirmed successfully ordered by TM.
> >
> > Why not just make RM offer the allocated resources (TM address, SlotID,
> > etc.) to JM, and JM release resources to RM? So that for all the resource
> > management JM talks to RM, and for the task deployment and execution it
> > talks to TM?
> >
> > I tried to understand the benefits for having the current design, and
> found
> > the following in FLIP-6[2].
> >
> >
> > All that the ResourceManager does is negotiate between the
> > cluster-manager, the JobManager, and the TaskManagers. Its state can
> hence
> > be reconstructed from re-acquiring containers and re-registration from
> > JobManagers and TaskManagers
> >
> > Correct me if I'm wrong, it seems the original purpose is to make sure
> the
> > assignment between jobs and slots are confirmed between JM and TMs, so
> that
> > failures of RM will not lead to any inconsistency. However, this only
> > benefits scenarios where RM fails while JM and TMs live. Currently, JM
> and
> > RM are in the same process. We do not really have any scenario where RM
> > fails alone. We might separate JM and RM to different processes in
> future,
> > but as far as I can see we don't have such requirements at the moment. It
> > seems to me that we are suffering the current problems, complying to
> > potential future benefits.
> >
> > Maybe I overlooked something.
> >
> > *5. Implementation Plan*
> >
> > For SlotPool, it sounds quite straightforward to "aggregate individual
> slot
> > requests".
> >
> > For Resource/SlotManager, it seems there are quite a lot changes needed,
> > with the removal of individual slot requests and AllocationID. It's not
> > clear to me what is the first step plan for RM/SM? Do we internally treat
> > the resource requirements as individual slot requests as the first step,
> so
> > only the interfaces are changed? Or do we actually change (practically
> > re-write) the slot allocation logics?
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> > [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> >
> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <[hidden email]> <
> [hidden email]> wrote:
> >
> >
> > Hello,
> >
> > in FLIP-138 we want to rework the way the JobMaster acquires slots, such
> > that required resources are declared before a job is scheduled and th
> > job execution is adjusted according to the provided resources (e.g.,
> > reducing parallelism), instead of asking for a fixed number of resources
> > during scheduling and failing midway through if not enough resources are
> > available.
> >
> > This is a stepping stone towards reactive mode, where Flink will
> > automatically make use of new TaskExecutors being started.
> >
> > More details can be found here
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> >
> > .
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Chesnay Schepler-3
The scheduler doesn't have to wait for one stage to finish. It is still
aware that the upstream execution vertex has finished, and can
request/use slots accordingly to schedule the consumer.

This will get more complicated once we allow the scheduler to change the
parallelism while the job is running, for which we will need some
enhancements to the network stack to allow the producer to run without
knowing the consumer parallelism ahead of time. I'm not too clear on the
details, but we'll some form of keygroup-like approach for sub
partitions (maxParallelism and all that).


On 27/08/2020 20:05, Zhu Zhu wrote:

> Thanks Chesnay&Till for proposing this improvement.
> It's of good value to allow jobs to make best use of available
> resources adaptively. Not
> to mention it further supports reactive mode.
> So big +1 for it.
>
> I have a minor concern about possible regression in certain cases due
> to the proposed
> JobVertex-wise scheduling which replaces current ExecutionVertex-wise
> scheduling.
> In the proposal, looks to me it requires a stage to finish before its
> consumer stage can be
> scheduled. This limitation, however, does not exist in current
> scheduler. In the case that there
> exists a POINTWISE BLOCKING edge, the downstream execution region can
> be scheduled
> right after its connected upstream execution vertices finishes, even
> before the whole upstream
> stage finishes. This allows the region to be launched earlier and make
> use of available resources.
> Do we need to let the new scheduler retain this property?
>
> Thanks,
> Zhu
>
> Xintong Song <[hidden email] <mailto:[hidden email]>>
> 于2020年8月26日周三 下午6:59写道:
>
>     Thanks for the quick response.
>
>     *Job prioritization, Allocation IDs, Minimum resource
>     requirements, SlotManager Implementation Plan:* Sounds good to me.
>
>     *FLIP-56*
>     Good point about the trade-off. I believe maximum resource
>     utilization and
>     quick deployment are desired in different scenarios. E.g., a long
>     running
>     streaming job deserves some deployment latency to improve the resource
>     utilization, which benefits the entire lifecycle of the job. On
>     the other
>     hand, short batch queries may prefer quick deployment, otherwise
>     the time
>     for resource allocation might significantly increase the response
>     time.
>     It would be good enough for me to bring these questions to attention.
>     Nothing that I'm aware of should block this FLIP.
>
>     Thank you~
>
>     Xintong Song
>
>
>
>     On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler
>     <[hidden email] <mailto:[hidden email]>> wrote:
>
>     > Thank you Xintong for your questions!
>     > Job prioritization
>     > Yes, the job which declares it's initial requirements first is
>     prioritized.
>     > This is very much for simplicity; for example this avoids the
>     nasty case
>     > where all jobs get some resources, but none get enough to
>     actually run the
>     > job.
>     > Minimum resource requirements
>     >
>     > My bad; at some point we want to allow the JobMaster to declare
>     a range of
>     > resources it could use to run a job, for example min=1, target=10,
>     > max=+inf.
>     >
>     > With this model, the RM would then try to balance the resources
>     such that
>     > as many jobs as possible are as close to the target state as
>     possible.
>     >
>     > Currently, the minimum/target/maximum resources are all the
>     same. So the
>     > notification is sent whenever the current requirements cannot be
>     met.
>     > Allocation IDs
>     > We do intend to, at the very least, remove AllocationIDs on the
>     > SlotManager side, as they are just not required there.
>     >
>     > On the slotpool side we have to keep them around at least until the
>     > existing Slotpool implementations are removed (not sure whether
>     we'll fully
>     > commit to this in 1.12), since the interfaces use AllocationIDs,
>     which also
>     > bleed into the JobMaster.
>     > The TaskExecutor is in a similar position.
>     > But in the long-term, yes they will be removed, and most usages will
>     > probably be replaced by the SlotID.
>     > FLIP-56
>     >
>     > Dynamic slot allocations are indeed quite interesting and raise
>     a few
>     > questions; for example, the main purpose of it is to ensure maximum
>     > resource utilization. In that case, should the JobMaster be
>     allowed to
>     > re-use a slot it if the task requires less resources than the slot
>     > provides, or should it always request a new slot that exactly
>     matches?
>     >
>     > There is a trade-off to be made between maximum resource utilization
>     > (request exactly matching slots, and only re-use exact matches)
>     and quicker
>     > job deployment (re-use slot even if they don't exactly match, skip
>     > round-trip to RM).
>     >
>     > As for how to handle the lack of a preemptively known SlotIDs,
>     that should
>     > be fine in and of itself; we already handle a similar case when
>     we request
>     > a new TaskExecutor to be started. So long as there is some way
>     to know how
>     > many resources the TaskExecutor has in total I do not see a
>     problem at the
>     > moment. We will get the SlotID eventually by virtue of the heartbeat
>     > SlotReport.
>     > Implementation plan (SlotManager)
>     > You are on the right track. The SlotManager tracks the declared
>     resource
>     > requirements, and if the requirements increased it creates a
>     SlotRequest,
>     > which then goes through similar code paths as we have at the
>     moment (try to
>     > find a free slot, if found tell the TM, otherwise try to request
>     new TM).
>     > The SlotManager changes are not that substantial to get a
>     working version;
>     > we have a PoC and most of the work went into refactoring the
>     SlotManager
>     > into a more manageable state. (split into several components,
>     stricter and
>     > simplified Slot life-cycle, ...).
>     > Offer/free slots between JM/TM
>     > Gotta run, but that's a good question and I'll think about. But
>     I think it
>     > comes down to making less changes, and being able to leverage
>     existing
>     > reconciliation protocols.
>     > Do note that TaskExecutor also explicitly inform the RM about
>     freed slots;
>     > the heartbeat slot report is just a safety net.
>     > I'm not sure whether slot requests are able to overtake a slot
>     release;
>     > @till do you have thoughts on that?
>     > As for the race condition between the requirements reduction and
>     slot
>     > release, if we run into problems we have the backup plan of only
>     releasing
>     > the slot after the requirement reduction has been acknowledged.
>     >
>     > On 26/08/2020 10:31, Xintong Song wrote:
>     >
>     > Thanks for preparing the FLIP and driving this discussion,
>     @Chesnay & @Till.
>     >
>     > I really like the idea. I see a great value in the proposed
>     declarative
>     > resource management, in terms of flexibility, usability and
>     efficiency.
>     >
>     > I have a few comments and questions regarding the FLIP design.
>     In general,
>     > the protocol design makes good sense to me. My main concern is
>     that it is
>     > not very clear to me what changes are required from the
>     > Resource/SlotManager side to adapt to the new protocol.
>     >
>     > *1. Distributed slots across different jobs*
>     >
>     > Jobs which register their requirements first, will have
>     precedence over
>     >
>     > other jobs also if the requirements change during the runtime.
>     >
>     > Just trying to understand, does this mean jobs are prioritized
>     by the order
>     > of their first resource declaring?
>     >
>     > *2. AllocationID*
>     >
>     > Is this FLIP suggesting to completely remove AllocationID?
>     >
>     > I'm fine with this change. It seems where AllocationID is used
>     can either
>     > be removed or be replaced by JobID. This reflects the concept
>     that slots
>     > are now assigned to a job instead of its individual slot requests.
>     >
>     > I would like to bring to attention that this also requires
>     changes on the
>     > TM side, with respect to FLIP-56[1].
>     >
>     > In the context of dynamic slot allocation introduced by FLIP-56,
>     slots do
>     > not pre-exist on TM and are dynamically created when RM calls
>     > TaskExecutorGateway.requestSlot. Since the slots do not
>     pre-exist, nor
>     > their SlotIDs, RM requests slots from TM with a special SlotID
>     (negative
>     > slot index). The semantic changes from "requesting the slot
>     identified by
>     > the given SlotID" to "requesting a slot with the given resource
>     profile".
>     > The AllocationID is used for identifying the dynamic slots in
>     such cases.
>     >
>     > >From the perspective of FLIP-56 and fine grained resource
>     management, I'm
>     > fine with removing AllocationID. In the meantime, we would need
>     TM to
>     > recognize the special negative indexed SlotID and generate a new
>     unique
>     > SlotID for identifying the slot.
>     >
>     > *3. Minimum resource requirement*
>     >
>     > However, we can let the JobMaster know if we cannot fulfill the
>     minimum
>     >
>     > resource requirement for a job after
>     > resourcemanager.standalone.start-up-time has passed.
>     >
>     > What is the "minimum resource requirement for a job"? Did I overlook
>     > anything?
>     >
>     > *4. Offer/free slots between JM/TM*
>     >
>     > This probably deserves a separate discussion thread. Just want
>     to bring it
>     > up.
>     >
>     > The idea has been coming to me for quite some time. Is this
>     design, that JM
>     > requests resources from RM while accepting/releasing resources
>     from/to TM,
>     > the right thing?
>     >
>     > The pain point is that events of JM's activities
>     (requesting/releasing
>     > resources) arrive at RM out of order. This leads to several
>     problems.
>     >
>     >    - When a job fails and task cancelation takes long, some of
>     the slots
>     >    might be released from the slot pool due to being unused for
>     a while. Then
>     >    the job restarts and requests these slots again. At this
>     time, RM may
>     >    receive slot requests before noticing from TM heartbeats that
>     previous
>     >    slots are released, thus requesting new resources. I've seen
>     many times
>     >    that the Yarn cluster has a heavy load and is not allocating
>     resources
>     >    quickly enough, which leads to slot request timeout and job
>     failover, and
>     >    during the failover more resources are requested which adds
>     more load to
>     >    the Yarn cluster. Happily, this should be improved with the
>     declarative
>     >    resource management. :)
>     >    - As described in this FLIP, it is possible that RM learns
>     the releasing
>     >    of slots from TM heartbeat before noticing the resource
>     requirement
>     >    decreasing, it may allocate more resources which need to be
>     released soon.
>     >    - It complicates the ResourceManager/SlotManager, by requiring an
>     >    additional slot state PENDING, which means the slot is
>     assigned by RM but
>     >    is not confirmed successfully ordered by TM.
>     >
>     > Why not just make RM offer the allocated resources (TM address,
>     SlotID,
>     > etc.) to JM, and JM release resources to RM? So that for all the
>     resource
>     > management JM talks to RM, and for the task deployment and
>     execution it
>     > talks to TM?
>     >
>     > I tried to understand the benefits for having the current
>     design, and found
>     > the following in FLIP-6[2].
>     >
>     >
>     > All that the ResourceManager does is negotiate between the
>     > cluster-manager, the JobManager, and the TaskManagers. Its state
>     can hence
>     > be reconstructed from re-acquiring containers and
>     re-registration from
>     > JobManagers and TaskManagers
>     >
>     > Correct me if I'm wrong, it seems the original purpose is to
>     make sure the
>     > assignment between jobs and slots are confirmed between JM and
>     TMs, so that
>     > failures of RM will not lead to any inconsistency. However, this
>     only
>     > benefits scenarios where RM fails while JM and TMs live.
>     Currently, JM and
>     > RM are in the same process. We do not really have any scenario
>     where RM
>     > fails alone. We might separate JM and RM to different processes
>     in future,
>     > but as far as I can see we don't have such requirements at the
>     moment. It
>     > seems to me that we are suffering the current problems, complying to
>     > potential future benefits.
>     >
>     > Maybe I overlooked something.
>     >
>     > *5. Implementation Plan*
>     >
>     > For SlotPool, it sounds quite straightforward to "aggregate
>     individual slot
>     > requests".
>     >
>     > For Resource/SlotManager, it seems there are quite a lot changes
>     needed,
>     > with the removal of individual slot requests and AllocationID.
>     It's not
>     > clear to me what is the first step plan for RM/SM? Do we
>     internally treat
>     > the resource requirements as individual slot requests as the
>     first step, so
>     > only the interfaces are changed? Or do we actually change
>     (practically
>     > re-write) the slot allocation logics?
>     >
>     > Thank you~
>     >
>     > Xintong Song
>     >
>     >
>     >
>     [1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>     >
>     [2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>     >
>     > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler
>     <[hidden email] <mailto:[hidden email]>>
>     <[hidden email] <mailto:[hidden email]>> wrote:
>     >
>     >
>     > Hello,
>     >
>     > in FLIP-138 we want to rework the way the JobMaster acquires
>     slots, such
>     > that required resources are declared before a job is scheduled
>     and th
>     > job execution is adjusted according to the provided resources (e.g.,
>     > reducing parallelism), instead of asking for a fixed number of
>     resources
>     > during scheduling and failing midway through if not enough
>     resources are
>     > available.
>     >
>     > This is a stepping stone towards reactive mode, where Flink will
>     > automatically make use of new TaskExecutors being started.
>     >
>     > More details can be found here
>     >
>     <https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>     >
>     > .
>     >
>     >
>     >
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Zhu Zhu
Thanks for the response!

>> The scheduler doesn't have to wait for one stage to finish
Does it mean we will declare resources and decide the parallelism for a
stage which is partially
schedulable, i.e. when input data are ready just for part of the execution
vertices?

>> This will get more complicated once we allow the scheduler to change the
parallelism while the job is running
Agreed. Looks to me it's a problem for batch jobs only and can be avoided
for streaming jobs.
Will this FLIP limit its scope to streaming jobs, and improvements for
batch jobs are to be done later?

Thanks,
Zhu

Chesnay Schepler <[hidden email]> 于2020年8月28日周五 上午2:27写道:

> The scheduler doesn't have to wait for one stage to finish. It is still
> aware that the upstream execution vertex has finished, and can request/use
> slots accordingly to schedule the consumer.
>
> This will get more complicated once we allow the scheduler to change the
> parallelism while the job is running, for which we will need some
> enhancements to the network stack to allow the producer to run without
> knowing the consumer parallelism ahead of time. I'm not too clear on the
> details, but we'll some form of keygroup-like approach for sub partitions
> (maxParallelism and all that).
>
> On 27/08/2020 20:05, Zhu Zhu wrote:
>
> Thanks Chesnay&Till for proposing this improvement.
> It's of good value to allow jobs to make best use of available resources
> adaptively. Not
> to mention it further supports reactive mode.
> So big +1 for it.
>
> I have a minor concern about possible regression in certain cases due to
> the proposed
> JobVertex-wise scheduling which replaces current ExecutionVertex-wise
> scheduling.
> In the proposal, looks to me it requires a stage to finish before its
> consumer stage can be
> scheduled. This limitation, however, does not exist in current scheduler.
> In the case that there
> exists a POINTWISE BLOCKING edge, the downstream execution region can be
> scheduled
> right after its connected upstream execution vertices finishes, even
> before the whole upstream
> stage finishes. This allows the region to be launched earlier and make use
> of available resources.
> Do we need to let the new scheduler retain this property?
>
> Thanks,
> Zhu
>
> Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:
>
>> Thanks for the quick response.
>>
>> *Job prioritization, Allocation IDs, Minimum resource
>> requirements, SlotManager Implementation Plan:* Sounds good to me.
>>
>> *FLIP-56*
>> Good point about the trade-off. I believe maximum resource utilization and
>> quick deployment are desired in different scenarios. E.g., a long running
>> streaming job deserves some deployment latency to improve the resource
>> utilization, which benefits the entire lifecycle of the job. On the other
>> hand, short batch queries may prefer quick deployment, otherwise the time
>> for resource allocation might significantly increase the response time.
>> It would be good enough for me to bring these questions to attention.
>> Nothing that I'm aware of should block this FLIP.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>> > Thank you Xintong for your questions!
>> > Job prioritization
>> > Yes, the job which declares it's initial requirements first is
>> prioritized.
>> > This is very much for simplicity; for example this avoids the nasty case
>> > where all jobs get some resources, but none get enough to actually run
>> the
>> > job.
>> > Minimum resource requirements
>> >
>> > My bad; at some point we want to allow the JobMaster to declare a range
>> of
>> > resources it could use to run a job, for example min=1, target=10,
>> > max=+inf.
>> >
>> > With this model, the RM would then try to balance the resources such
>> that
>> > as many jobs as possible are as close to the target state as possible.
>> >
>> > Currently, the minimum/target/maximum resources are all the same. So the
>> > notification is sent whenever the current requirements cannot be met.
>> > Allocation IDs
>> > We do intend to, at the very least, remove AllocationIDs on the
>> > SlotManager side, as they are just not required there.
>> >
>> > On the slotpool side we have to keep them around at least until the
>> > existing Slotpool implementations are removed (not sure whether we'll
>> fully
>> > commit to this in 1.12), since the interfaces use AllocationIDs, which
>> also
>> > bleed into the JobMaster.
>> > The TaskExecutor is in a similar position.
>> > But in the long-term, yes they will be removed, and most usages will
>> > probably be replaced by the SlotID.
>> > FLIP-56
>> >
>> > Dynamic slot allocations are indeed quite interesting and raise a few
>> > questions; for example, the main purpose of it is to ensure maximum
>> > resource utilization. In that case, should the JobMaster be allowed to
>> > re-use a slot it if the task requires less resources than the slot
>> > provides, or should it always request a new slot that exactly matches?
>> >
>> > There is a trade-off to be made between maximum resource utilization
>> > (request exactly matching slots, and only re-use exact matches) and
>> quicker
>> > job deployment (re-use slot even if they don't exactly match, skip
>> > round-trip to RM).
>> >
>> > As for how to handle the lack of a preemptively known SlotIDs, that
>> should
>> > be fine in and of itself; we already handle a similar case when we
>> request
>> > a new TaskExecutor to be started. So long as there is some way to know
>> how
>> > many resources the TaskExecutor has in total I do not see a problem at
>> the
>> > moment. We will get the SlotID eventually by virtue of the heartbeat
>> > SlotReport.
>> > Implementation plan (SlotManager)
>> > You are on the right track. The SlotManager tracks the declared resource
>> > requirements, and if the requirements increased it creates a
>> SlotRequest,
>> > which then goes through similar code paths as we have at the moment
>> (try to
>> > find a free slot, if found tell the TM, otherwise try to request new
>> TM).
>> > The SlotManager changes are not that substantial to get a working
>> version;
>> > we have a PoC and most of the work went into refactoring the SlotManager
>> > into a more manageable state. (split into several components, stricter
>> and
>> > simplified Slot life-cycle, ...).
>> > Offer/free slots between JM/TM
>> > Gotta run, but that's a good question and I'll think about. But I think
>> it
>> > comes down to making less changes, and being able to leverage existing
>> > reconciliation protocols.
>> > Do note that TaskExecutor also explicitly inform the RM about freed
>> slots;
>> > the heartbeat slot report is just a safety net.
>> > I'm not sure whether slot requests are able to overtake a slot release;
>> > @till do you have thoughts on that?
>> > As for the race condition between the requirements reduction and slot
>> > release, if we run into problems we have the backup plan of only
>> releasing
>> > the slot after the requirement reduction has been acknowledged.
>> >
>> > On 26/08/2020 10:31, Xintong Song wrote:
>> >
>> > Thanks for preparing the FLIP and driving this discussion, @Chesnay &
>> @Till.
>> >
>> > I really like the idea. I see a great value in the proposed declarative
>> > resource management, in terms of flexibility, usability and efficiency.
>> >
>> > I have a few comments and questions regarding the FLIP design. In
>> general,
>> > the protocol design makes good sense to me. My main concern is that it
>> is
>> > not very clear to me what changes are required from the
>> > Resource/SlotManager side to adapt to the new protocol.
>> >
>> > *1. Distributed slots across different jobs*
>> >
>> > Jobs which register their requirements first, will have precedence over
>> >
>> > other jobs also if the requirements change during the runtime.
>> >
>> > Just trying to understand, does this mean jobs are prioritized by the
>> order
>> > of their first resource declaring?
>> >
>> > *2. AllocationID*
>> >
>> > Is this FLIP suggesting to completely remove AllocationID?
>> >
>> > I'm fine with this change. It seems where AllocationID is used can
>> either
>> > be removed or be replaced by JobID. This reflects the concept that slots
>> > are now assigned to a job instead of its individual slot requests.
>> >
>> > I would like to bring to attention that this also requires changes on
>> the
>> > TM side, with respect to FLIP-56[1].
>> >
>> > In the context of dynamic slot allocation introduced by FLIP-56, slots
>> do
>> > not pre-exist on TM and are dynamically created when RM calls
>> > TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor
>> > their SlotIDs, RM requests slots from TM with a special SlotID (negative
>> > slot index). The semantic changes from "requesting the slot identified
>> by
>> > the given SlotID" to "requesting a slot with the given resource
>> profile".
>> > The AllocationID is used for identifying the dynamic slots in such
>> cases.
>> >
>> > >From the perspective of FLIP-56 and fine grained resource management,
>> I'm
>> > fine with removing AllocationID. In the meantime, we would need TM to
>> > recognize the special negative indexed SlotID and generate a new unique
>> > SlotID for identifying the slot.
>> >
>> > *3. Minimum resource requirement*
>> >
>> > However, we can let the JobMaster know if we cannot fulfill the minimum
>> >
>> > resource requirement for a job after
>> > resourcemanager.standalone.start-up-time has passed.
>> >
>> > What is the "minimum resource requirement for a job"? Did I overlook
>> > anything?
>> >
>> > *4. Offer/free slots between JM/TM*
>> >
>> > This probably deserves a separate discussion thread. Just want to bring
>> it
>> > up.
>> >
>> > The idea has been coming to me for quite some time. Is this design,
>> that JM
>> > requests resources from RM while accepting/releasing resources from/to
>> TM,
>> > the right thing?
>> >
>> > The pain point is that events of JM's activities (requesting/releasing
>> > resources) arrive at RM out of order. This leads to several problems.
>> >
>> >    - When a job fails and task cancelation takes long, some of the slots
>> >    might be released from the slot pool due to being unused for a
>> while. Then
>> >    the job restarts and requests these slots again. At this time, RM may
>> >    receive slot requests before noticing from TM heartbeats that
>> previous
>> >    slots are released, thus requesting new resources. I've seen many
>> times
>> >    that the Yarn cluster has a heavy load and is not allocating
>> resources
>> >    quickly enough, which leads to slot request timeout and job
>> failover, and
>> >    during the failover more resources are requested which adds more
>> load to
>> >    the Yarn cluster. Happily, this should be improved with the
>> declarative
>> >    resource management. :)
>> >    - As described in this FLIP, it is possible that RM learns the
>> releasing
>> >    of slots from TM heartbeat before noticing the resource requirement
>> >    decreasing, it may allocate more resources which need to be released
>> soon.
>> >    - It complicates the ResourceManager/SlotManager, by requiring an
>> >    additional slot state PENDING, which means the slot is assigned by
>> RM but
>> >    is not confirmed successfully ordered by TM.
>> >
>> > Why not just make RM offer the allocated resources (TM address, SlotID,
>> > etc.) to JM, and JM release resources to RM? So that for all the
>> resource
>> > management JM talks to RM, and for the task deployment and execution it
>> > talks to TM?
>> >
>> > I tried to understand the benefits for having the current design, and
>> found
>> > the following in FLIP-6[2].
>> >
>> >
>> > All that the ResourceManager does is negotiate between the
>> > cluster-manager, the JobManager, and the TaskManagers. Its state can
>> hence
>> > be reconstructed from re-acquiring containers and re-registration from
>> > JobManagers and TaskManagers
>> >
>> > Correct me if I'm wrong, it seems the original purpose is to make sure
>> the
>> > assignment between jobs and slots are confirmed between JM and TMs, so
>> that
>> > failures of RM will not lead to any inconsistency. However, this only
>> > benefits scenarios where RM fails while JM and TMs live. Currently, JM
>> and
>> > RM are in the same process. We do not really have any scenario where RM
>> > fails alone. We might separate JM and RM to different processes in
>> future,
>> > but as far as I can see we don't have such requirements at the moment.
>> It
>> > seems to me that we are suffering the current problems, complying to
>> > potential future benefits.
>> >
>> > Maybe I overlooked something.
>> >
>> > *5. Implementation Plan*
>> >
>> > For SlotPool, it sounds quite straightforward to "aggregate individual
>> slot
>> > requests".
>> >
>> > For Resource/SlotManager, it seems there are quite a lot changes needed,
>> > with the removal of individual slot requests and AllocationID. It's not
>> > clear to me what is the first step plan for RM/SM? Do we internally
>> treat
>> > the resource requirements as individual slot requests as the first
>> step, so
>> > only the interfaces are changed? Or do we actually change (practically
>> > re-write) the slot allocation logics?
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> > [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>> > [2]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>> >
>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <[hidden email]> <
>> [hidden email]> wrote:
>> >
>> >
>> > Hello,
>> >
>> > in FLIP-138 we want to rework the way the JobMaster acquires slots, such
>> > that required resources are declared before a job is scheduled and th
>> > job execution is adjusted according to the provided resources (e.g.,
>> > reducing parallelism), instead of asking for a fixed number of resources
>> > during scheduling and failing midway through if not enough resources are
>> > available.
>> >
>> > This is a stepping stone towards reactive mode, where Flink will
>> > automatically make use of new TaskExecutors being started.
>> >
>> > More details can be found here
>> > <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>> >
>> > .
>> >
>> >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Chesnay Schepler-3
Maybe :)

Imagine a case where the producer and consumer have the same
ResourceProfile, or at least one where the consumer requirements are
less than the producer ones.
In this case, the scheduler can happily schedule consumers, because it
knows it will get enough slots.

If the profiles are different, then the Scheduler _may_ wait
numberOf(producer) slots; it _may_ also stick with the parallelism and
schedule right away, in the worst case running the consumers in sequence.
In fact, for batch jobs there is probably(?) never a reason for the
scheduler to _reduce_ the parallelism; it can always try to run things
in sequence if it doesn't get enough slots.
Reducing the parallelism would just mean that you'd have to wait for
more producers to finish.

The scope of this FLIP is just the protocol, without changes to the
scheduler; in other words just changing how slots are acquired, but
change nothing about the scheduling. That is tackled in a follow-up FLIP.

On 28/08/2020 07:34, Zhu Zhu wrote:

> Thanks for the response!
>
> >> The scheduler doesn't have to wait for one stage to finish
> Does it mean we will declare resources and decide the parallelism for
> a stage which is partially
> schedulable, i.e. when input data are ready just for part of the
> execution vertices?
>
> >> This will get more complicated once we allow the scheduler to
> change the parallelism while the job is running
> Agreed. Looks to me it's a problem for batch jobs only and can be
> avoided for streaming jobs.
> Will this FLIP limit its scope to streaming jobs, and improvements for
> batch jobs are to be done later?
>
> Thanks,
> Zhu
>
> Chesnay Schepler <[hidden email] <mailto:[hidden email]>>
> 于2020年8月28日周五 上午2:27写道:
>
>     The scheduler doesn't have to wait for one stage to finish. It is
>     still aware that the upstream execution vertex has finished, and
>     can request/use slots accordingly to schedule the consumer.
>
>     This will get more complicated once we allow the scheduler to
>     change the parallelism while the job is running, for which we will
>     need some enhancements to the network stack to allow the producer
>     to run without knowing the consumer parallelism ahead of time. I'm
>     not too clear on the details, but we'll some form of keygroup-like
>     approach for sub partitions (maxParallelism and all that).
>
>
>     On 27/08/2020 20:05, Zhu Zhu wrote:
>>     Thanks Chesnay&Till for proposing this improvement.
>>     It's of good value to allow jobs to make best use of available
>>     resources adaptively. Not
>>     to mention it further supports reactive mode.
>>     So big +1 for it.
>>
>>     I have a minor concern about possible regression in certain cases
>>     due to the proposed
>>     JobVertex-wise scheduling which replaces current
>>     ExecutionVertex-wise scheduling.
>>     In the proposal, looks to me it requires a stage to finish before
>>     its consumer stage can be
>>     scheduled. This limitation, however, does not exist in current
>>     scheduler. In the case that there
>>     exists a POINTWISE BLOCKING edge, the downstream execution region
>>     can be scheduled
>>     right after its connected upstream execution vertices finishes,
>>     even before the whole upstream
>>     stage finishes. This allows the region to be launched earlier and
>>     make use of available resources.
>>     Do we need to let the new scheduler retain this property?
>>
>>     Thanks,
>>     Zhu
>>
>>     Xintong Song <[hidden email]
>>     <mailto:[hidden email]>> 于2020年8月26日周三 下午6:59写道:
>>
>>         Thanks for the quick response.
>>
>>         *Job prioritization, Allocation IDs, Minimum resource
>>         requirements, SlotManager Implementation Plan:* Sounds good
>>         to me.
>>
>>         *FLIP-56*
>>         Good point about the trade-off. I believe maximum resource
>>         utilization and
>>         quick deployment are desired in different scenarios. E.g., a
>>         long running
>>         streaming job deserves some deployment latency to improve the
>>         resource
>>         utilization, which benefits the entire lifecycle of the job.
>>         On the other
>>         hand, short batch queries may prefer quick deployment,
>>         otherwise the time
>>         for resource allocation might significantly increase the
>>         response time.
>>         It would be good enough for me to bring these questions to
>>         attention.
>>         Nothing that I'm aware of should block this FLIP.
>>
>>         Thank you~
>>
>>         Xintong Song
>>
>>
>>
>>         On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler
>>         <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>         > Thank you Xintong for your questions!
>>         > Job prioritization
>>         > Yes, the job which declares it's initial requirements first
>>         is prioritized.
>>         > This is very much for simplicity; for example this avoids
>>         the nasty case
>>         > where all jobs get some resources, but none get enough to
>>         actually run the
>>         > job.
>>         > Minimum resource requirements
>>         >
>>         > My bad; at some point we want to allow the JobMaster to
>>         declare a range of
>>         > resources it could use to run a job, for example min=1,
>>         target=10,
>>         > max=+inf.
>>         >
>>         > With this model, the RM would then try to balance the
>>         resources such that
>>         > as many jobs as possible are as close to the target state
>>         as possible.
>>         >
>>         > Currently, the minimum/target/maximum resources are all the
>>         same. So the
>>         > notification is sent whenever the current requirements
>>         cannot be met.
>>         > Allocation IDs
>>         > We do intend to, at the very least, remove AllocationIDs on the
>>         > SlotManager side, as they are just not required there.
>>         >
>>         > On the slotpool side we have to keep them around at least
>>         until the
>>         > existing Slotpool implementations are removed (not sure
>>         whether we'll fully
>>         > commit to this in 1.12), since the interfaces use
>>         AllocationIDs, which also
>>         > bleed into the JobMaster.
>>         > The TaskExecutor is in a similar position.
>>         > But in the long-term, yes they will be removed, and most
>>         usages will
>>         > probably be replaced by the SlotID.
>>         > FLIP-56
>>         >
>>         > Dynamic slot allocations are indeed quite interesting and
>>         raise a few
>>         > questions; for example, the main purpose of it is to ensure
>>         maximum
>>         > resource utilization. In that case, should the JobMaster be
>>         allowed to
>>         > re-use a slot it if the task requires less resources than
>>         the slot
>>         > provides, or should it always request a new slot that
>>         exactly matches?
>>         >
>>         > There is a trade-off to be made between maximum resource
>>         utilization
>>         > (request exactly matching slots, and only re-use exact
>>         matches) and quicker
>>         > job deployment (re-use slot even if they don't exactly
>>         match, skip
>>         > round-trip to RM).
>>         >
>>         > As for how to handle the lack of a preemptively known
>>         SlotIDs, that should
>>         > be fine in and of itself; we already handle a similar case
>>         when we request
>>         > a new TaskExecutor to be started. So long as there is some
>>         way to know how
>>         > many resources the TaskExecutor has in total I do not see a
>>         problem at the
>>         > moment. We will get the SlotID eventually by virtue of the
>>         heartbeat
>>         > SlotReport.
>>         > Implementation plan (SlotManager)
>>         > You are on the right track. The SlotManager tracks the
>>         declared resource
>>         > requirements, and if the requirements increased it creates
>>         a SlotRequest,
>>         > which then goes through similar code paths as we have at
>>         the moment (try to
>>         > find a free slot, if found tell the TM, otherwise try to
>>         request new TM).
>>         > The SlotManager changes are not that substantial to get a
>>         working version;
>>         > we have a PoC and most of the work went into refactoring
>>         the SlotManager
>>         > into a more manageable state. (split into several
>>         components, stricter and
>>         > simplified Slot life-cycle, ...).
>>         > Offer/free slots between JM/TM
>>         > Gotta run, but that's a good question and I'll think about.
>>         But I think it
>>         > comes down to making less changes, and being able to
>>         leverage existing
>>         > reconciliation protocols.
>>         > Do note that TaskExecutor also explicitly inform the RM
>>         about freed slots;
>>         > the heartbeat slot report is just a safety net.
>>         > I'm not sure whether slot requests are able to overtake a
>>         slot release;
>>         > @till do you have thoughts on that?
>>         > As for the race condition between the requirements
>>         reduction and slot
>>         > release, if we run into problems we have the backup plan of
>>         only releasing
>>         > the slot after the requirement reduction has been acknowledged.
>>         >
>>         > On 26/08/2020 10:31, Xintong Song wrote:
>>         >
>>         > Thanks for preparing the FLIP and driving this discussion,
>>         @Chesnay & @Till.
>>         >
>>         > I really like the idea. I see a great value in the proposed
>>         declarative
>>         > resource management, in terms of flexibility, usability and
>>         efficiency.
>>         >
>>         > I have a few comments and questions regarding the FLIP
>>         design. In general,
>>         > the protocol design makes good sense to me. My main concern
>>         is that it is
>>         > not very clear to me what changes are required from the
>>         > Resource/SlotManager side to adapt to the new protocol.
>>         >
>>         > *1. Distributed slots across different jobs*
>>         >
>>         > Jobs which register their requirements first, will have
>>         precedence over
>>         >
>>         > other jobs also if the requirements change during the runtime.
>>         >
>>         > Just trying to understand, does this mean jobs are
>>         prioritized by the order
>>         > of their first resource declaring?
>>         >
>>         > *2. AllocationID*
>>         >
>>         > Is this FLIP suggesting to completely remove AllocationID?
>>         >
>>         > I'm fine with this change. It seems where AllocationID is
>>         used can either
>>         > be removed or be replaced by JobID. This reflects the
>>         concept that slots
>>         > are now assigned to a job instead of its individual slot
>>         requests.
>>         >
>>         > I would like to bring to attention that this also requires
>>         changes on the
>>         > TM side, with respect to FLIP-56[1].
>>         >
>>         > In the context of dynamic slot allocation introduced by
>>         FLIP-56, slots do
>>         > not pre-exist on TM and are dynamically created when RM calls
>>         > TaskExecutorGateway.requestSlot. Since the slots do not
>>         pre-exist, nor
>>         > their SlotIDs, RM requests slots from TM with a special
>>         SlotID (negative
>>         > slot index). The semantic changes from "requesting the slot
>>         identified by
>>         > the given SlotID" to "requesting a slot with the given
>>         resource profile".
>>         > The AllocationID is used for identifying the dynamic slots
>>         in such cases.
>>         >
>>         > >From the perspective of FLIP-56 and fine grained resource
>>         management, I'm
>>         > fine with removing AllocationID. In the meantime, we would
>>         need TM to
>>         > recognize the special negative indexed SlotID and generate
>>         a new unique
>>         > SlotID for identifying the slot.
>>         >
>>         > *3. Minimum resource requirement*
>>         >
>>         > However, we can let the JobMaster know if we cannot fulfill
>>         the minimum
>>         >
>>         > resource requirement for a job after
>>         > resourcemanager.standalone.start-up-time has passed.
>>         >
>>         > What is the "minimum resource requirement for a job"? Did I
>>         overlook
>>         > anything?
>>         >
>>         > *4. Offer/free slots between JM/TM*
>>         >
>>         > This probably deserves a separate discussion thread. Just
>>         want to bring it
>>         > up.
>>         >
>>         > The idea has been coming to me for quite some time. Is this
>>         design, that JM
>>         > requests resources from RM while accepting/releasing
>>         resources from/to TM,
>>         > the right thing?
>>         >
>>         > The pain point is that events of JM's activities
>>         (requesting/releasing
>>         > resources) arrive at RM out of order. This leads to several
>>         problems.
>>         >
>>         >    - When a job fails and task cancelation takes long, some
>>         of the slots
>>         >    might be released from the slot pool due to being unused
>>         for a while. Then
>>         >    the job restarts and requests these slots again. At this
>>         time, RM may
>>         >    receive slot requests before noticing from TM heartbeats
>>         that previous
>>         >    slots are released, thus requesting new resources. I've
>>         seen many times
>>         >    that the Yarn cluster has a heavy load and is not
>>         allocating resources
>>         >    quickly enough, which leads to slot request timeout and
>>         job failover, and
>>         >    during the failover more resources are requested which
>>         adds more load to
>>         >    the Yarn cluster. Happily, this should be improved with
>>         the declarative
>>         >    resource management. :)
>>         >    - As described in this FLIP, it is possible that RM
>>         learns the releasing
>>         >    of slots from TM heartbeat before noticing the resource
>>         requirement
>>         >    decreasing, it may allocate more resources which need to
>>         be released soon.
>>         >    - It complicates the ResourceManager/SlotManager, by
>>         requiring an
>>         >    additional slot state PENDING, which means the slot is
>>         assigned by RM but
>>         >    is not confirmed successfully ordered by TM.
>>         >
>>         > Why not just make RM offer the allocated resources (TM
>>         address, SlotID,
>>         > etc.) to JM, and JM release resources to RM? So that for
>>         all the resource
>>         > management JM talks to RM, and for the task deployment and
>>         execution it
>>         > talks to TM?
>>         >
>>         > I tried to understand the benefits for having the current
>>         design, and found
>>         > the following in FLIP-6[2].
>>         >
>>         >
>>         > All that the ResourceManager does is negotiate between the
>>         > cluster-manager, the JobManager, and the TaskManagers. Its
>>         state can hence
>>         > be reconstructed from re-acquiring containers and
>>         re-registration from
>>         > JobManagers and TaskManagers
>>         >
>>         > Correct me if I'm wrong, it seems the original purpose is
>>         to make sure the
>>         > assignment between jobs and slots are confirmed between JM
>>         and TMs, so that
>>         > failures of RM will not lead to any inconsistency. However,
>>         this only
>>         > benefits scenarios where RM fails while JM and TMs live.
>>         Currently, JM and
>>         > RM are in the same process. We do not really have any
>>         scenario where RM
>>         > fails alone. We might separate JM and RM to different
>>         processes in future,
>>         > but as far as I can see we don't have such requirements at
>>         the moment. It
>>         > seems to me that we are suffering the current problems,
>>         complying to
>>         > potential future benefits.
>>         >
>>         > Maybe I overlooked something.
>>         >
>>         > *5. Implementation Plan*
>>         >
>>         > For SlotPool, it sounds quite straightforward to "aggregate
>>         individual slot
>>         > requests".
>>         >
>>         > For Resource/SlotManager, it seems there are quite a lot
>>         changes needed,
>>         > with the removal of individual slot requests and
>>         AllocationID. It's not
>>         > clear to me what is the first step plan for RM/SM? Do we
>>         internally treat
>>         > the resource requirements as individual slot requests as
>>         the first step, so
>>         > only the interfaces are changed? Or do we actually change
>>         (practically
>>         > re-write) the slot allocation logics?
>>         >
>>         > Thank you~
>>         >
>>         > Xintong Song
>>         >
>>         >
>>         >
>>         [1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>>         >
>>         [2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>>         >
>>         > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler
>>         <[hidden email] <mailto:[hidden email]>>
>>         <[hidden email] <mailto:[hidden email]>> wrote:
>>         >
>>         >
>>         > Hello,
>>         >
>>         > in FLIP-138 we want to rework the way the JobMaster
>>         acquires slots, such
>>         > that required resources are declared before a job is
>>         scheduled and th
>>         > job execution is adjusted according to the provided
>>         resources (e.g.,
>>         > reducing parallelism), instead of asking for a fixed number
>>         of resources
>>         > during scheduling and failing midway through if not enough
>>         resources are
>>         > available.
>>         >
>>         > This is a stepping stone towards reactive mode, where Flink
>>         will
>>         > automatically make use of new TaskExecutors being started.
>>         >
>>         > More details can be found here
>>         >
>>         <https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>>         >
>>         > .
>>         >
>>         >
>>         >
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Zhu Zhu
Thanks for the explanation @Chesnay Schepler <[hidden email]> .

Yes, for batch jobs it can be safe to schedule downstream vertices if there
are enough slots in the pool, even if these slots are still in use at that
moment.
And the job can still progress even if the vertices stick to the original
parallelism.

Looks to me several decision makings can be different for streaming and
batch jobs.
Looking forward to the follow-up FLIP on the lazy ExecutionGraph
construction!

Thanks,
Zhu

Chesnay Schepler <[hidden email]> 于2020年8月28日周五 下午4:35写道:

> Maybe :)
>
> Imagine a case where the producer and consumer have the same
> ResourceProfile, or at least one where the consumer requirements are less
> than the producer ones.
> In this case, the scheduler can happily schedule consumers, because it
> knows it will get enough slots.
>
> If the profiles are different, then the Scheduler _may_ wait
> numberOf(producer) slots; it _may_ also stick with the parallelism and
> schedule right away, in the worst case running the consumers in sequence.
> In fact, for batch jobs there is probably(?) never a reason for the
> scheduler to _reduce_ the parallelism; it can always try to run things in
> sequence if it doesn't get enough slots.
> Reducing the parallelism would just mean that you'd have to wait for more
> producers to finish.
>
> The scope of this FLIP is just the protocol, without changes to the
> scheduler; in other words just changing how slots are acquired, but change
> nothing about the scheduling. That is tackled in a follow-up FLIP.
>
> On 28/08/2020 07:34, Zhu Zhu wrote:
>
> Thanks for the response!
>
> >> The scheduler doesn't have to wait for one stage to finish
> Does it mean we will declare resources and decide the parallelism for a
> stage which is partially
> schedulable, i.e. when input data are ready just for part of the execution
> vertices?
>
> >> This will get more complicated once we allow the scheduler to change
> the parallelism while the job is running
> Agreed. Looks to me it's a problem for batch jobs only and can be avoided
> for streaming jobs.
> Will this FLIP limit its scope to streaming jobs, and improvements for
> batch jobs are to be done later?
>
> Thanks,
> Zhu
>
> Chesnay Schepler <[hidden email]> 于2020年8月28日周五 上午2:27写道:
>
>> The scheduler doesn't have to wait for one stage to finish. It is still
>> aware that the upstream execution vertex has finished, and can request/use
>> slots accordingly to schedule the consumer.
>>
>> This will get more complicated once we allow the scheduler to change the
>> parallelism while the job is running, for which we will need some
>> enhancements to the network stack to allow the producer to run without
>> knowing the consumer parallelism ahead of time. I'm not too clear on the
>> details, but we'll some form of keygroup-like approach for sub partitions
>> (maxParallelism and all that).
>>
>> On 27/08/2020 20:05, Zhu Zhu wrote:
>>
>> Thanks Chesnay&Till for proposing this improvement.
>> It's of good value to allow jobs to make best use of available resources
>> adaptively. Not
>> to mention it further supports reactive mode.
>> So big +1 for it.
>>
>> I have a minor concern about possible regression in certain cases due to
>> the proposed
>> JobVertex-wise scheduling which replaces current ExecutionVertex-wise
>> scheduling.
>> In the proposal, looks to me it requires a stage to finish before its
>> consumer stage can be
>> scheduled. This limitation, however, does not exist in current scheduler.
>> In the case that there
>> exists a POINTWISE BLOCKING edge, the downstream execution region can be
>> scheduled
>> right after its connected upstream execution vertices finishes, even
>> before the whole upstream
>> stage finishes. This allows the region to be launched earlier and make
>> use of available resources.
>> Do we need to let the new scheduler retain this property?
>>
>> Thanks,
>> Zhu
>>
>> Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:
>>
>>> Thanks for the quick response.
>>>
>>> *Job prioritization, Allocation IDs, Minimum resource
>>> requirements, SlotManager Implementation Plan:* Sounds good to me.
>>>
>>> *FLIP-56*
>>> Good point about the trade-off. I believe maximum resource utilization
>>> and
>>> quick deployment are desired in different scenarios. E.g., a long running
>>> streaming job deserves some deployment latency to improve the resource
>>> utilization, which benefits the entire lifecycle of the job. On the other
>>> hand, short batch queries may prefer quick deployment, otherwise the time
>>> for resource allocation might significantly increase the response time.
>>> It would be good enough for me to bring these questions to attention.
>>> Nothing that I'm aware of should block this FLIP.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <[hidden email]>
>>> wrote:
>>>
>>> > Thank you Xintong for your questions!
>>> > Job prioritization
>>> > Yes, the job which declares it's initial requirements first is
>>> prioritized.
>>> > This is very much for simplicity; for example this avoids the nasty
>>> case
>>> > where all jobs get some resources, but none get enough to actually run
>>> the
>>> > job.
>>> > Minimum resource requirements
>>> >
>>> > My bad; at some point we want to allow the JobMaster to declare a
>>> range of
>>> > resources it could use to run a job, for example min=1, target=10,
>>> > max=+inf.
>>> >
>>> > With this model, the RM would then try to balance the resources such
>>> that
>>> > as many jobs as possible are as close to the target state as possible.
>>> >
>>> > Currently, the minimum/target/maximum resources are all the same. So
>>> the
>>> > notification is sent whenever the current requirements cannot be met.
>>> > Allocation IDs
>>> > We do intend to, at the very least, remove AllocationIDs on the
>>> > SlotManager side, as they are just not required there.
>>> >
>>> > On the slotpool side we have to keep them around at least until the
>>> > existing Slotpool implementations are removed (not sure whether we'll
>>> fully
>>> > commit to this in 1.12), since the interfaces use AllocationIDs, which
>>> also
>>> > bleed into the JobMaster.
>>> > The TaskExecutor is in a similar position.
>>> > But in the long-term, yes they will be removed, and most usages will
>>> > probably be replaced by the SlotID.
>>> > FLIP-56
>>> >
>>> > Dynamic slot allocations are indeed quite interesting and raise a few
>>> > questions; for example, the main purpose of it is to ensure maximum
>>> > resource utilization. In that case, should the JobMaster be allowed to
>>> > re-use a slot it if the task requires less resources than the slot
>>> > provides, or should it always request a new slot that exactly matches?
>>> >
>>> > There is a trade-off to be made between maximum resource utilization
>>> > (request exactly matching slots, and only re-use exact matches) and
>>> quicker
>>> > job deployment (re-use slot even if they don't exactly match, skip
>>> > round-trip to RM).
>>> >
>>> > As for how to handle the lack of a preemptively known SlotIDs, that
>>> should
>>> > be fine in and of itself; we already handle a similar case when we
>>> request
>>> > a new TaskExecutor to be started. So long as there is some way to know
>>> how
>>> > many resources the TaskExecutor has in total I do not see a problem at
>>> the
>>> > moment. We will get the SlotID eventually by virtue of the heartbeat
>>> > SlotReport.
>>> > Implementation plan (SlotManager)
>>> > You are on the right track. The SlotManager tracks the declared
>>> resource
>>> > requirements, and if the requirements increased it creates a
>>> SlotRequest,
>>> > which then goes through similar code paths as we have at the moment
>>> (try to
>>> > find a free slot, if found tell the TM, otherwise try to request new
>>> TM).
>>> > The SlotManager changes are not that substantial to get a working
>>> version;
>>> > we have a PoC and most of the work went into refactoring the
>>> SlotManager
>>> > into a more manageable state. (split into several components, stricter
>>> and
>>> > simplified Slot life-cycle, ...).
>>> > Offer/free slots between JM/TM
>>> > Gotta run, but that's a good question and I'll think about. But I
>>> think it
>>> > comes down to making less changes, and being able to leverage existing
>>> > reconciliation protocols.
>>> > Do note that TaskExecutor also explicitly inform the RM about freed
>>> slots;
>>> > the heartbeat slot report is just a safety net.
>>> > I'm not sure whether slot requests are able to overtake a slot release;
>>> > @till do you have thoughts on that?
>>> > As for the race condition between the requirements reduction and slot
>>> > release, if we run into problems we have the backup plan of only
>>> releasing
>>> > the slot after the requirement reduction has been acknowledged.
>>> >
>>> > On 26/08/2020 10:31, Xintong Song wrote:
>>> >
>>> > Thanks for preparing the FLIP and driving this discussion, @Chesnay &
>>> @Till.
>>> >
>>> > I really like the idea. I see a great value in the proposed declarative
>>> > resource management, in terms of flexibility, usability and efficiency.
>>> >
>>> > I have a few comments and questions regarding the FLIP design. In
>>> general,
>>> > the protocol design makes good sense to me. My main concern is that it
>>> is
>>> > not very clear to me what changes are required from the
>>> > Resource/SlotManager side to adapt to the new protocol.
>>> >
>>> > *1. Distributed slots across different jobs*
>>> >
>>> > Jobs which register their requirements first, will have precedence over
>>> >
>>> > other jobs also if the requirements change during the runtime.
>>> >
>>> > Just trying to understand, does this mean jobs are prioritized by the
>>> order
>>> > of their first resource declaring?
>>> >
>>> > *2. AllocationID*
>>> >
>>> > Is this FLIP suggesting to completely remove AllocationID?
>>> >
>>> > I'm fine with this change. It seems where AllocationID is used can
>>> either
>>> > be removed or be replaced by JobID. This reflects the concept that
>>> slots
>>> > are now assigned to a job instead of its individual slot requests.
>>> >
>>> > I would like to bring to attention that this also requires changes on
>>> the
>>> > TM side, with respect to FLIP-56[1].
>>> >
>>> > In the context of dynamic slot allocation introduced by FLIP-56, slots
>>> do
>>> > not pre-exist on TM and are dynamically created when RM calls
>>> > TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor
>>> > their SlotIDs, RM requests slots from TM with a special SlotID
>>> (negative
>>> > slot index). The semantic changes from "requesting the slot identified
>>> by
>>> > the given SlotID" to "requesting a slot with the given resource
>>> profile".
>>> > The AllocationID is used for identifying the dynamic slots in such
>>> cases.
>>> >
>>> > >From the perspective of FLIP-56 and fine grained resource management,
>>> I'm
>>> > fine with removing AllocationID. In the meantime, we would need TM to
>>> > recognize the special negative indexed SlotID and generate a new unique
>>> > SlotID for identifying the slot.
>>> >
>>> > *3. Minimum resource requirement*
>>> >
>>> > However, we can let the JobMaster know if we cannot fulfill the minimum
>>> >
>>> > resource requirement for a job after
>>> > resourcemanager.standalone.start-up-time has passed.
>>> >
>>> > What is the "minimum resource requirement for a job"? Did I overlook
>>> > anything?
>>> >
>>> > *4. Offer/free slots between JM/TM*
>>> >
>>> > This probably deserves a separate discussion thread. Just want to
>>> bring it
>>> > up.
>>> >
>>> > The idea has been coming to me for quite some time. Is this design,
>>> that JM
>>> > requests resources from RM while accepting/releasing resources from/to
>>> TM,
>>> > the right thing?
>>> >
>>> > The pain point is that events of JM's activities (requesting/releasing
>>> > resources) arrive at RM out of order. This leads to several problems.
>>> >
>>> >    - When a job fails and task cancelation takes long, some of the
>>> slots
>>> >    might be released from the slot pool due to being unused for a
>>> while. Then
>>> >    the job restarts and requests these slots again. At this time, RM
>>> may
>>> >    receive slot requests before noticing from TM heartbeats that
>>> previous
>>> >    slots are released, thus requesting new resources. I've seen many
>>> times
>>> >    that the Yarn cluster has a heavy load and is not allocating
>>> resources
>>> >    quickly enough, which leads to slot request timeout and job
>>> failover, and
>>> >    during the failover more resources are requested which adds more
>>> load to
>>> >    the Yarn cluster. Happily, this should be improved with the
>>> declarative
>>> >    resource management. :)
>>> >    - As described in this FLIP, it is possible that RM learns the
>>> releasing
>>> >    of slots from TM heartbeat before noticing the resource requirement
>>> >    decreasing, it may allocate more resources which need to be
>>> released soon.
>>> >    - It complicates the ResourceManager/SlotManager, by requiring an
>>> >    additional slot state PENDING, which means the slot is assigned by
>>> RM but
>>> >    is not confirmed successfully ordered by TM.
>>> >
>>> > Why not just make RM offer the allocated resources (TM address, SlotID,
>>> > etc.) to JM, and JM release resources to RM? So that for all the
>>> resource
>>> > management JM talks to RM, and for the task deployment and execution it
>>> > talks to TM?
>>> >
>>> > I tried to understand the benefits for having the current design, and
>>> found
>>> > the following in FLIP-6[2].
>>> >
>>> >
>>> > All that the ResourceManager does is negotiate between the
>>> > cluster-manager, the JobManager, and the TaskManagers. Its state can
>>> hence
>>> > be reconstructed from re-acquiring containers and re-registration from
>>> > JobManagers and TaskManagers
>>> >
>>> > Correct me if I'm wrong, it seems the original purpose is to make sure
>>> the
>>> > assignment between jobs and slots are confirmed between JM and TMs, so
>>> that
>>> > failures of RM will not lead to any inconsistency. However, this only
>>> > benefits scenarios where RM fails while JM and TMs live. Currently, JM
>>> and
>>> > RM are in the same process. We do not really have any scenario where RM
>>> > fails alone. We might separate JM and RM to different processes in
>>> future,
>>> > but as far as I can see we don't have such requirements at the moment.
>>> It
>>> > seems to me that we are suffering the current problems, complying to
>>> > potential future benefits.
>>> >
>>> > Maybe I overlooked something.
>>> >
>>> > *5. Implementation Plan*
>>> >
>>> > For SlotPool, it sounds quite straightforward to "aggregate individual
>>> slot
>>> > requests".
>>> >
>>> > For Resource/SlotManager, it seems there are quite a lot changes
>>> needed,
>>> > with the removal of individual slot requests and AllocationID. It's not
>>> > clear to me what is the first step plan for RM/SM? Do we internally
>>> treat
>>> > the resource requirements as individual slot requests as the first
>>> step, so
>>> > only the interfaces are changed? Or do we actually change (practically
>>> > re-write) the slot allocation logics?
>>> >
>>> > Thank you~
>>> >
>>> > Xintong Song
>>> >
>>> >
>>> > [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>>> > [2]
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>>> >
>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <[hidden email]>
>>> <[hidden email]> wrote:
>>> >
>>> >
>>> > Hello,
>>> >
>>> > in FLIP-138 we want to rework the way the JobMaster acquires slots,
>>> such
>>> > that required resources are declared before a job is scheduled and th
>>> > job execution is adjusted according to the provided resources (e.g.,
>>> > reducing parallelism), instead of asking for a fixed number of
>>> resources
>>> > during scheduling and failing midway through if not enough resources
>>> are
>>> > available.
>>> >
>>> > This is a stepping stone towards reactive mode, where Flink will
>>> > automatically make use of new TaskExecutors being started.
>>> >
>>> > More details can be found here
>>> > <
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>>> >
>>> > .
>>> >
>>> >
>>> >
>>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Till Rohrmann
Thanks for creating this FLIP @Chesnay and the good input @Xintong and @Zhu
Zhu.

Let me try to add some comments concerning your questions:

# FLIP-56

I think there is nothing fundamentally contradicting FLIP-56 in the FLIP
for declarative resource management. As Chesnay said, we have to keep the
AllocationID around as long as we have the old scheduler implementation.
Once it is replaced, we can think about using the SlotID instead of
AllocationIDs for identifying allocated slots. For dynamic slots we can
keep the special meaning of a SlotID with a negative index. In the future
we might think about making this encoding a bit more explicit by sending a
richer slot request object and reporting the actual SlotID back to the RM.

For the question of resource utilization vs. deployment latency I believe
that this will be a question of requirements and preferences as you've said
Xintong. I can see that we will have different strategies to fulfill the
different needs.

# Offer/free slots between JM/TM

You are right Xintong that the existing slot protocol was developed with
the assumption in mind that the RM and JM can run in separate processes and
that a failure of the RM should only affect the JM in the sense that it
cannot ask for more resources. I believe that one could simplify things a
bit under the assumption that the RM and JM are always colocated in the
same process. However, the discussion whether to change it or not should
indeed be a separate one.

Changing the slot protocol to a declarative resource management should
already solve the first problem you have described because we won't ask for
new slots in case of a failover but simply keep the same resource
requirements declared and let the RM make sure that we will receive at
least this amount of slots.

If releasing a slot should lead to allocating new resources because
decreasing the resource requirement declaration takes longer than releasing
the slot on the TM, then we could apply what Chesnay said. By waiting on
the confirmation of the resource requirement decrease and then freeing the
slot on the TM gives you effectively the same behaviour as if the freeing
of the slot would be done by the RM.

I am not entirely sure whether allocating the slots and receiving the slot
offers through the RM will allow us to get rid of the pending slot state on
the RM side. If the RM needs to communicate with the TM and we want to have
a reconciliation protocol between these components, then I think we would
have to solve the exact same problem of currently waiting on the TM for
confirming that a slot has been allocated.

# Implications for the scheduling

The FLIP does not fully cover the changes for the scheduler and mainly
drafts the rough idea. For the batch scheduling, I believe that we have a
couple degrees of freedom in how to do things. In the scenario you
described, one could choose a simple strategy where we wait for all
producers to stop before deciding on the parallelism of the consumer and
scheduling the respective tasks (even though they have POINTWISE BLOCKING
edges). Or we can try to be smart and say if we get at least one slot that
we can run the consumers with the same parallelism as the producers it just
might be that we have to run them one after another in a single slot. One
advantage of not directly schedule the first consumer when the first
producer is finished is that one might schedule the consumer stage with a
higher parallelism because one might acquire more resources a bit later.
But I would see this as different execution strategies which have different
properties.

Cheers,
Till

On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <[hidden email]> wrote:

> Thanks for the explanation @Chesnay Schepler <[hidden email]> .
>
> Yes, for batch jobs it can be safe to schedule downstream vertices if
> there
> are enough slots in the pool, even if these slots are still in use at that
> moment.
> And the job can still progress even if the vertices stick to the original
> parallelism.
>
> Looks to me several decision makings can be different for streaming and
> batch jobs.
> Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> construction!
>
> Thanks,
> Zhu
>
> Chesnay Schepler <[hidden email]> 于2020年8月28日周五 下午4:35写道:
>
>> Maybe :)
>>
>> Imagine a case where the producer and consumer have the same
>> ResourceProfile, or at least one where the consumer requirements are less
>> than the producer ones.
>> In this case, the scheduler can happily schedule consumers, because it
>> knows it will get enough slots.
>>
>> If the profiles are different, then the Scheduler _may_ wait
>> numberOf(producer) slots; it _may_ also stick with the parallelism and
>> schedule right away, in the worst case running the consumers in sequence.
>> In fact, for batch jobs there is probably(?) never a reason for the
>> scheduler to _reduce_ the parallelism; it can always try to run things in
>> sequence if it doesn't get enough slots.
>> Reducing the parallelism would just mean that you'd have to wait for more
>> producers to finish.
>>
>> The scope of this FLIP is just the protocol, without changes to the
>> scheduler; in other words just changing how slots are acquired, but change
>> nothing about the scheduling. That is tackled in a follow-up FLIP.
>>
>> On 28/08/2020 07:34, Zhu Zhu wrote:
>>
>> Thanks for the response!
>>
>> >> The scheduler doesn't have to wait for one stage to finish
>> Does it mean we will declare resources and decide the parallelism for a
>> stage which is partially
>> schedulable, i.e. when input data are ready just for part of the
>> execution vertices?
>>
>> >> This will get more complicated once we allow the scheduler to change
>> the parallelism while the job is running
>> Agreed. Looks to me it's a problem for batch jobs only and can be avoided
>> for streaming jobs.
>> Will this FLIP limit its scope to streaming jobs, and improvements for
>> batch jobs are to be done later?
>>
>> Thanks,
>> Zhu
>>
>> Chesnay Schepler <[hidden email]> 于2020年8月28日周五 上午2:27写道:
>>
>>> The scheduler doesn't have to wait for one stage to finish. It is still
>>> aware that the upstream execution vertex has finished, and can request/use
>>> slots accordingly to schedule the consumer.
>>>
>>> This will get more complicated once we allow the scheduler to change the
>>> parallelism while the job is running, for which we will need some
>>> enhancements to the network stack to allow the producer to run without
>>> knowing the consumer parallelism ahead of time. I'm not too clear on the
>>> details, but we'll some form of keygroup-like approach for sub partitions
>>> (maxParallelism and all that).
>>>
>>> On 27/08/2020 20:05, Zhu Zhu wrote:
>>>
>>> Thanks Chesnay&Till for proposing this improvement.
>>> It's of good value to allow jobs to make best use of available resources
>>> adaptively. Not
>>> to mention it further supports reactive mode.
>>> So big +1 for it.
>>>
>>> I have a minor concern about possible regression in certain cases due to
>>> the proposed
>>> JobVertex-wise scheduling which replaces current ExecutionVertex-wise
>>> scheduling.
>>> In the proposal, looks to me it requires a stage to finish before its
>>> consumer stage can be
>>> scheduled. This limitation, however, does not exist in current
>>> scheduler. In the case that there
>>> exists a POINTWISE BLOCKING edge, the downstream execution region can be
>>> scheduled
>>> right after its connected upstream execution vertices finishes, even
>>> before the whole upstream
>>> stage finishes. This allows the region to be launched earlier and make
>>> use of available resources.
>>> Do we need to let the new scheduler retain this property?
>>>
>>> Thanks,
>>> Zhu
>>>
>>> Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:
>>>
>>>> Thanks for the quick response.
>>>>
>>>> *Job prioritization, Allocation IDs, Minimum resource
>>>> requirements, SlotManager Implementation Plan:* Sounds good to me.
>>>>
>>>> *FLIP-56*
>>>> Good point about the trade-off. I believe maximum resource utilization
>>>> and
>>>> quick deployment are desired in different scenarios. E.g., a long
>>>> running
>>>> streaming job deserves some deployment latency to improve the resource
>>>> utilization, which benefits the entire lifecycle of the job. On the
>>>> other
>>>> hand, short batch queries may prefer quick deployment, otherwise the
>>>> time
>>>> for resource allocation might significantly increase the response time.
>>>> It would be good enough for me to bring these questions to attention.
>>>> Nothing that I'm aware of should block this FLIP.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <[hidden email]>
>>>> wrote:
>>>>
>>>> > Thank you Xintong for your questions!
>>>> > Job prioritization
>>>> > Yes, the job which declares it's initial requirements first is
>>>> prioritized.
>>>> > This is very much for simplicity; for example this avoids the nasty
>>>> case
>>>> > where all jobs get some resources, but none get enough to actually
>>>> run the
>>>> > job.
>>>> > Minimum resource requirements
>>>> >
>>>> > My bad; at some point we want to allow the JobMaster to declare a
>>>> range of
>>>> > resources it could use to run a job, for example min=1, target=10,
>>>> > max=+inf.
>>>> >
>>>> > With this model, the RM would then try to balance the resources such
>>>> that
>>>> > as many jobs as possible are as close to the target state as possible.
>>>> >
>>>> > Currently, the minimum/target/maximum resources are all the same. So
>>>> the
>>>> > notification is sent whenever the current requirements cannot be met.
>>>> > Allocation IDs
>>>> > We do intend to, at the very least, remove AllocationIDs on the
>>>> > SlotManager side, as they are just not required there.
>>>> >
>>>> > On the slotpool side we have to keep them around at least until the
>>>> > existing Slotpool implementations are removed (not sure whether we'll
>>>> fully
>>>> > commit to this in 1.12), since the interfaces use AllocationIDs,
>>>> which also
>>>> > bleed into the JobMaster.
>>>> > The TaskExecutor is in a similar position.
>>>> > But in the long-term, yes they will be removed, and most usages will
>>>> > probably be replaced by the SlotID.
>>>> > FLIP-56
>>>> >
>>>> > Dynamic slot allocations are indeed quite interesting and raise a few
>>>> > questions; for example, the main purpose of it is to ensure maximum
>>>> > resource utilization. In that case, should the JobMaster be allowed to
>>>> > re-use a slot it if the task requires less resources than the slot
>>>> > provides, or should it always request a new slot that exactly matches?
>>>> >
>>>> > There is a trade-off to be made between maximum resource utilization
>>>> > (request exactly matching slots, and only re-use exact matches) and
>>>> quicker
>>>> > job deployment (re-use slot even if they don't exactly match, skip
>>>> > round-trip to RM).
>>>> >
>>>> > As for how to handle the lack of a preemptively known SlotIDs, that
>>>> should
>>>> > be fine in and of itself; we already handle a similar case when we
>>>> request
>>>> > a new TaskExecutor to be started. So long as there is some way to
>>>> know how
>>>> > many resources the TaskExecutor has in total I do not see a problem
>>>> at the
>>>> > moment. We will get the SlotID eventually by virtue of the heartbeat
>>>> > SlotReport.
>>>> > Implementation plan (SlotManager)
>>>> > You are on the right track. The SlotManager tracks the declared
>>>> resource
>>>> > requirements, and if the requirements increased it creates a
>>>> SlotRequest,
>>>> > which then goes through similar code paths as we have at the moment
>>>> (try to
>>>> > find a free slot, if found tell the TM, otherwise try to request new
>>>> TM).
>>>> > The SlotManager changes are not that substantial to get a working
>>>> version;
>>>> > we have a PoC and most of the work went into refactoring the
>>>> SlotManager
>>>> > into a more manageable state. (split into several components,
>>>> stricter and
>>>> > simplified Slot life-cycle, ...).
>>>> > Offer/free slots between JM/TM
>>>> > Gotta run, but that's a good question and I'll think about. But I
>>>> think it
>>>> > comes down to making less changes, and being able to leverage existing
>>>> > reconciliation protocols.
>>>> > Do note that TaskExecutor also explicitly inform the RM about freed
>>>> slots;
>>>> > the heartbeat slot report is just a safety net.
>>>> > I'm not sure whether slot requests are able to overtake a slot
>>>> release;
>>>> > @till do you have thoughts on that?
>>>> > As for the race condition between the requirements reduction and slot
>>>> > release, if we run into problems we have the backup plan of only
>>>> releasing
>>>> > the slot after the requirement reduction has been acknowledged.
>>>> >
>>>> > On 26/08/2020 10:31, Xintong Song wrote:
>>>> >
>>>> > Thanks for preparing the FLIP and driving this discussion, @Chesnay &
>>>> @Till.
>>>> >
>>>> > I really like the idea. I see a great value in the proposed
>>>> declarative
>>>> > resource management, in terms of flexibility, usability and
>>>> efficiency.
>>>> >
>>>> > I have a few comments and questions regarding the FLIP design. In
>>>> general,
>>>> > the protocol design makes good sense to me. My main concern is that
>>>> it is
>>>> > not very clear to me what changes are required from the
>>>> > Resource/SlotManager side to adapt to the new protocol.
>>>> >
>>>> > *1. Distributed slots across different jobs*
>>>> >
>>>> > Jobs which register their requirements first, will have precedence
>>>> over
>>>> >
>>>> > other jobs also if the requirements change during the runtime.
>>>> >
>>>> > Just trying to understand, does this mean jobs are prioritized by the
>>>> order
>>>> > of their first resource declaring?
>>>> >
>>>> > *2. AllocationID*
>>>> >
>>>> > Is this FLIP suggesting to completely remove AllocationID?
>>>> >
>>>> > I'm fine with this change. It seems where AllocationID is used can
>>>> either
>>>> > be removed or be replaced by JobID. This reflects the concept that
>>>> slots
>>>> > are now assigned to a job instead of its individual slot requests.
>>>> >
>>>> > I would like to bring to attention that this also requires changes on
>>>> the
>>>> > TM side, with respect to FLIP-56[1].
>>>> >
>>>> > In the context of dynamic slot allocation introduced by FLIP-56,
>>>> slots do
>>>> > not pre-exist on TM and are dynamically created when RM calls
>>>> > TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor
>>>> > their SlotIDs, RM requests slots from TM with a special SlotID
>>>> (negative
>>>> > slot index). The semantic changes from "requesting the slot
>>>> identified by
>>>> > the given SlotID" to "requesting a slot with the given resource
>>>> profile".
>>>> > The AllocationID is used for identifying the dynamic slots in such
>>>> cases.
>>>> >
>>>> > >From the perspective of FLIP-56 and fine grained resource
>>>> management, I'm
>>>> > fine with removing AllocationID. In the meantime, we would need TM to
>>>> > recognize the special negative indexed SlotID and generate a new
>>>> unique
>>>> > SlotID for identifying the slot.
>>>> >
>>>> > *3. Minimum resource requirement*
>>>> >
>>>> > However, we can let the JobMaster know if we cannot fulfill the
>>>> minimum
>>>> >
>>>> > resource requirement for a job after
>>>> > resourcemanager.standalone.start-up-time has passed.
>>>> >
>>>> > What is the "minimum resource requirement for a job"? Did I overlook
>>>> > anything?
>>>> >
>>>> > *4. Offer/free slots between JM/TM*
>>>> >
>>>> > This probably deserves a separate discussion thread. Just want to
>>>> bring it
>>>> > up.
>>>> >
>>>> > The idea has been coming to me for quite some time. Is this design,
>>>> that JM
>>>> > requests resources from RM while accepting/releasing resources
>>>> from/to TM,
>>>> > the right thing?
>>>> >
>>>> > The pain point is that events of JM's activities (requesting/releasing
>>>> > resources) arrive at RM out of order. This leads to several problems.
>>>> >
>>>> >    - When a job fails and task cancelation takes long, some of the
>>>> slots
>>>> >    might be released from the slot pool due to being unused for a
>>>> while. Then
>>>> >    the job restarts and requests these slots again. At this time, RM
>>>> may
>>>> >    receive slot requests before noticing from TM heartbeats that
>>>> previous
>>>> >    slots are released, thus requesting new resources. I've seen many
>>>> times
>>>> >    that the Yarn cluster has a heavy load and is not allocating
>>>> resources
>>>> >    quickly enough, which leads to slot request timeout and job
>>>> failover, and
>>>> >    during the failover more resources are requested which adds more
>>>> load to
>>>> >    the Yarn cluster. Happily, this should be improved with the
>>>> declarative
>>>> >    resource management. :)
>>>> >    - As described in this FLIP, it is possible that RM learns the
>>>> releasing
>>>> >    of slots from TM heartbeat before noticing the resource requirement
>>>> >    decreasing, it may allocate more resources which need to be
>>>> released soon.
>>>> >    - It complicates the ResourceManager/SlotManager, by requiring an
>>>> >    additional slot state PENDING, which means the slot is assigned by
>>>> RM but
>>>> >    is not confirmed successfully ordered by TM.
>>>> >
>>>> > Why not just make RM offer the allocated resources (TM address,
>>>> SlotID,
>>>> > etc.) to JM, and JM release resources to RM? So that for all the
>>>> resource
>>>> > management JM talks to RM, and for the task deployment and execution
>>>> it
>>>> > talks to TM?
>>>> >
>>>> > I tried to understand the benefits for having the current design, and
>>>> found
>>>> > the following in FLIP-6[2].
>>>> >
>>>> >
>>>> > All that the ResourceManager does is negotiate between the
>>>> > cluster-manager, the JobManager, and the TaskManagers. Its state can
>>>> hence
>>>> > be reconstructed from re-acquiring containers and re-registration from
>>>> > JobManagers and TaskManagers
>>>> >
>>>> > Correct me if I'm wrong, it seems the original purpose is to make
>>>> sure the
>>>> > assignment between jobs and slots are confirmed between JM and TMs,
>>>> so that
>>>> > failures of RM will not lead to any inconsistency. However, this only
>>>> > benefits scenarios where RM fails while JM and TMs live. Currently,
>>>> JM and
>>>> > RM are in the same process. We do not really have any scenario where
>>>> RM
>>>> > fails alone. We might separate JM and RM to different processes in
>>>> future,
>>>> > but as far as I can see we don't have such requirements at the
>>>> moment. It
>>>> > seems to me that we are suffering the current problems, complying to
>>>> > potential future benefits.
>>>> >
>>>> > Maybe I overlooked something.
>>>> >
>>>> > *5. Implementation Plan*
>>>> >
>>>> > For SlotPool, it sounds quite straightforward to "aggregate
>>>> individual slot
>>>> > requests".
>>>> >
>>>> > For Resource/SlotManager, it seems there are quite a lot changes
>>>> needed,
>>>> > with the removal of individual slot requests and AllocationID. It's
>>>> not
>>>> > clear to me what is the first step plan for RM/SM? Do we internally
>>>> treat
>>>> > the resource requirements as individual slot requests as the first
>>>> step, so
>>>> > only the interfaces are changed? Or do we actually change (practically
>>>> > re-write) the slot allocation logics?
>>>> >
>>>> > Thank you~
>>>> >
>>>> > Xintong Song
>>>> >
>>>> >
>>>> > [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>>>> > [2]
>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>>>> >
>>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <[hidden email]>
>>>> <[hidden email]> wrote:
>>>> >
>>>> >
>>>> > Hello,
>>>> >
>>>> > in FLIP-138 we want to rework the way the JobMaster acquires slots,
>>>> such
>>>> > that required resources are declared before a job is scheduled and th
>>>> > job execution is adjusted according to the provided resources (e.g.,
>>>> > reducing parallelism), instead of asking for a fixed number of
>>>> resources
>>>> > during scheduling and failing midway through if not enough resources
>>>> are
>>>> > available.
>>>> >
>>>> > This is a stepping stone towards reactive mode, where Flink will
>>>> > automatically make use of new TaskExecutors being started.
>>>> >
>>>> > More details can be found here
>>>> > <
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>>>> >
>>>> > .
>>>> >
>>>> >
>>>> >
>>>>
>>>
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Xintong Song
Thanks for the clarification, @Till.

- For FLIP-56, sounds good to me. I think there should be no problem before
removing AllocationID. And even after replacing AllocationID, it should
only require limited effort to make FLIP-56 work with SlotID. I was just
trying to understand when the effort will be needed.

- For offer/release slots between JM/TM, I think you are right.
Waiting on the confirmation for resource requirement decrease before
freeing the slot is quite equivalent to releasing slots through RM, in
terms of it practically preventing JM from releasing slots when the RM is
absent. But this approach obviously requires less change to the current
mechanism.
Since the first problem can be solved by the declarative protocol, and the
second problem can be addressed by this confirmation based approach, ATM I
don't see any strong reason for changing to offering and releasing slots
through RM, especially considering the significant changes it requires.

Thank you~

Xintong Song



On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <[hidden email]> wrote:

> Thanks for creating this FLIP @Chesnay and the good input @Xintong and @Zhu
> Zhu.
>
> Let me try to add some comments concerning your questions:
>
> # FLIP-56
>
> I think there is nothing fundamentally contradicting FLIP-56 in the FLIP
> for declarative resource management. As Chesnay said, we have to keep the
> AllocationID around as long as we have the old scheduler implementation.
> Once it is replaced, we can think about using the SlotID instead of
> AllocationIDs for identifying allocated slots. For dynamic slots we can
> keep the special meaning of a SlotID with a negative index. In the future
> we might think about making this encoding a bit more explicit by sending a
> richer slot request object and reporting the actual SlotID back to the RM.
>
> For the question of resource utilization vs. deployment latency I believe
> that this will be a question of requirements and preferences as you've said
> Xintong. I can see that we will have different strategies to fulfill the
> different needs.
>
> # Offer/free slots between JM/TM
>
> You are right Xintong that the existing slot protocol was developed with
> the assumption in mind that the RM and JM can run in separate processes and
> that a failure of the RM should only affect the JM in the sense that it
> cannot ask for more resources. I believe that one could simplify things a
> bit under the assumption that the RM and JM are always colocated in the
> same process. However, the discussion whether to change it or not should
> indeed be a separate one.
>
> Changing the slot protocol to a declarative resource management should
> already solve the first problem you have described because we won't ask for
> new slots in case of a failover but simply keep the same resource
> requirements declared and let the RM make sure that we will receive at
> least this amount of slots.
>
> If releasing a slot should lead to allocating new resources because
> decreasing the resource requirement declaration takes longer than releasing
> the slot on the TM, then we could apply what Chesnay said. By waiting on
> the confirmation of the resource requirement decrease and then freeing the
> slot on the TM gives you effectively the same behaviour as if the freeing
> of the slot would be done by the RM.
>
> I am not entirely sure whether allocating the slots and receiving the slot
> offers through the RM will allow us to get rid of the pending slot state on
> the RM side. If the RM needs to communicate with the TM and we want to have
> a reconciliation protocol between these components, then I think we would
> have to solve the exact same problem of currently waiting on the TM for
> confirming that a slot has been allocated.
>
> # Implications for the scheduling
>
> The FLIP does not fully cover the changes for the scheduler and mainly
> drafts the rough idea. For the batch scheduling, I believe that we have a
> couple degrees of freedom in how to do things. In the scenario you
> described, one could choose a simple strategy where we wait for all
> producers to stop before deciding on the parallelism of the consumer and
> scheduling the respective tasks (even though they have POINTWISE BLOCKING
> edges). Or we can try to be smart and say if we get at least one slot that
> we can run the consumers with the same parallelism as the producers it just
> might be that we have to run them one after another in a single slot. One
> advantage of not directly schedule the first consumer when the first
> producer is finished is that one might schedule the consumer stage with a
> higher parallelism because one might acquire more resources a bit later.
> But I would see this as different execution strategies which have different
> properties.
>
> Cheers,
> Till
>
> On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <[hidden email]> wrote:
>
> > Thanks for the explanation @Chesnay Schepler <[hidden email]> .
> >
> > Yes, for batch jobs it can be safe to schedule downstream vertices if
> > there
> > are enough slots in the pool, even if these slots are still in use at
> that
> > moment.
> > And the job can still progress even if the vertices stick to the original
> > parallelism.
> >
> > Looks to me several decision makings can be different for streaming and
> > batch jobs.
> > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> > construction!
> >
> > Thanks,
> > Zhu
> >
> > Chesnay Schepler <[hidden email]> 于2020年8月28日周五 下午4:35写道:
> >
> >> Maybe :)
> >>
> >> Imagine a case where the producer and consumer have the same
> >> ResourceProfile, or at least one where the consumer requirements are
> less
> >> than the producer ones.
> >> In this case, the scheduler can happily schedule consumers, because it
> >> knows it will get enough slots.
> >>
> >> If the profiles are different, then the Scheduler _may_ wait
> >> numberOf(producer) slots; it _may_ also stick with the parallelism and
> >> schedule right away, in the worst case running the consumers in
> sequence.
> >> In fact, for batch jobs there is probably(?) never a reason for the
> >> scheduler to _reduce_ the parallelism; it can always try to run things
> in
> >> sequence if it doesn't get enough slots.
> >> Reducing the parallelism would just mean that you'd have to wait for
> more
> >> producers to finish.
> >>
> >> The scope of this FLIP is just the protocol, without changes to the
> >> scheduler; in other words just changing how slots are acquired, but
> change
> >> nothing about the scheduling. That is tackled in a follow-up FLIP.
> >>
> >> On 28/08/2020 07:34, Zhu Zhu wrote:
> >>
> >> Thanks for the response!
> >>
> >> >> The scheduler doesn't have to wait for one stage to finish
> >> Does it mean we will declare resources and decide the parallelism for a
> >> stage which is partially
> >> schedulable, i.e. when input data are ready just for part of the
> >> execution vertices?
> >>
> >> >> This will get more complicated once we allow the scheduler to change
> >> the parallelism while the job is running
> >> Agreed. Looks to me it's a problem for batch jobs only and can be
> avoided
> >> for streaming jobs.
> >> Will this FLIP limit its scope to streaming jobs, and improvements for
> >> batch jobs are to be done later?
> >>
> >> Thanks,
> >> Zhu
> >>
> >> Chesnay Schepler <[hidden email]> 于2020年8月28日周五 上午2:27写道:
> >>
> >>> The scheduler doesn't have to wait for one stage to finish. It is still
> >>> aware that the upstream execution vertex has finished, and can
> request/use
> >>> slots accordingly to schedule the consumer.
> >>>
> >>> This will get more complicated once we allow the scheduler to change
> the
> >>> parallelism while the job is running, for which we will need some
> >>> enhancements to the network stack to allow the producer to run without
> >>> knowing the consumer parallelism ahead of time. I'm not too clear on
> the
> >>> details, but we'll some form of keygroup-like approach for sub
> partitions
> >>> (maxParallelism and all that).
> >>>
> >>> On 27/08/2020 20:05, Zhu Zhu wrote:
> >>>
> >>> Thanks Chesnay&Till for proposing this improvement.
> >>> It's of good value to allow jobs to make best use of available
> resources
> >>> adaptively. Not
> >>> to mention it further supports reactive mode.
> >>> So big +1 for it.
> >>>
> >>> I have a minor concern about possible regression in certain cases due
> to
> >>> the proposed
> >>> JobVertex-wise scheduling which replaces current ExecutionVertex-wise
> >>> scheduling.
> >>> In the proposal, looks to me it requires a stage to finish before its
> >>> consumer stage can be
> >>> scheduled. This limitation, however, does not exist in current
> >>> scheduler. In the case that there
> >>> exists a POINTWISE BLOCKING edge, the downstream execution region can
> be
> >>> scheduled
> >>> right after its connected upstream execution vertices finishes, even
> >>> before the whole upstream
> >>> stage finishes. This allows the region to be launched earlier and make
> >>> use of available resources.
> >>> Do we need to let the new scheduler retain this property?
> >>>
> >>> Thanks,
> >>> Zhu
> >>>
> >>> Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:
> >>>
> >>>> Thanks for the quick response.
> >>>>
> >>>> *Job prioritization, Allocation IDs, Minimum resource
> >>>> requirements, SlotManager Implementation Plan:* Sounds good to me.
> >>>>
> >>>> *FLIP-56*
> >>>> Good point about the trade-off. I believe maximum resource utilization
> >>>> and
> >>>> quick deployment are desired in different scenarios. E.g., a long
> >>>> running
> >>>> streaming job deserves some deployment latency to improve the resource
> >>>> utilization, which benefits the entire lifecycle of the job. On the
> >>>> other
> >>>> hand, short batch queries may prefer quick deployment, otherwise the
> >>>> time
> >>>> for resource allocation might significantly increase the response
> time.
> >>>> It would be good enough for me to bring these questions to attention.
> >>>> Nothing that I'm aware of should block this FLIP.
> >>>>
> >>>> Thank you~
> >>>>
> >>>> Xintong Song
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <[hidden email]>
> >>>> wrote:
> >>>>
> >>>> > Thank you Xintong for your questions!
> >>>> > Job prioritization
> >>>> > Yes, the job which declares it's initial requirements first is
> >>>> prioritized.
> >>>> > This is very much for simplicity; for example this avoids the nasty
> >>>> case
> >>>> > where all jobs get some resources, but none get enough to actually
> >>>> run the
> >>>> > job.
> >>>> > Minimum resource requirements
> >>>> >
> >>>> > My bad; at some point we want to allow the JobMaster to declare a
> >>>> range of
> >>>> > resources it could use to run a job, for example min=1, target=10,
> >>>> > max=+inf.
> >>>> >
> >>>> > With this model, the RM would then try to balance the resources such
> >>>> that
> >>>> > as many jobs as possible are as close to the target state as
> possible.
> >>>> >
> >>>> > Currently, the minimum/target/maximum resources are all the same. So
> >>>> the
> >>>> > notification is sent whenever the current requirements cannot be
> met.
> >>>> > Allocation IDs
> >>>> > We do intend to, at the very least, remove AllocationIDs on the
> >>>> > SlotManager side, as they are just not required there.
> >>>> >
> >>>> > On the slotpool side we have to keep them around at least until the
> >>>> > existing Slotpool implementations are removed (not sure whether
> we'll
> >>>> fully
> >>>> > commit to this in 1.12), since the interfaces use AllocationIDs,
> >>>> which also
> >>>> > bleed into the JobMaster.
> >>>> > The TaskExecutor is in a similar position.
> >>>> > But in the long-term, yes they will be removed, and most usages will
> >>>> > probably be replaced by the SlotID.
> >>>> > FLIP-56
> >>>> >
> >>>> > Dynamic slot allocations are indeed quite interesting and raise a
> few
> >>>> > questions; for example, the main purpose of it is to ensure maximum
> >>>> > resource utilization. In that case, should the JobMaster be allowed
> to
> >>>> > re-use a slot it if the task requires less resources than the slot
> >>>> > provides, or should it always request a new slot that exactly
> matches?
> >>>> >
> >>>> > There is a trade-off to be made between maximum resource utilization
> >>>> > (request exactly matching slots, and only re-use exact matches) and
> >>>> quicker
> >>>> > job deployment (re-use slot even if they don't exactly match, skip
> >>>> > round-trip to RM).
> >>>> >
> >>>> > As for how to handle the lack of a preemptively known SlotIDs, that
> >>>> should
> >>>> > be fine in and of itself; we already handle a similar case when we
> >>>> request
> >>>> > a new TaskExecutor to be started. So long as there is some way to
> >>>> know how
> >>>> > many resources the TaskExecutor has in total I do not see a problem
> >>>> at the
> >>>> > moment. We will get the SlotID eventually by virtue of the heartbeat
> >>>> > SlotReport.
> >>>> > Implementation plan (SlotManager)
> >>>> > You are on the right track. The SlotManager tracks the declared
> >>>> resource
> >>>> > requirements, and if the requirements increased it creates a
> >>>> SlotRequest,
> >>>> > which then goes through similar code paths as we have at the moment
> >>>> (try to
> >>>> > find a free slot, if found tell the TM, otherwise try to request new
> >>>> TM).
> >>>> > The SlotManager changes are not that substantial to get a working
> >>>> version;
> >>>> > we have a PoC and most of the work went into refactoring the
> >>>> SlotManager
> >>>> > into a more manageable state. (split into several components,
> >>>> stricter and
> >>>> > simplified Slot life-cycle, ...).
> >>>> > Offer/free slots between JM/TM
> >>>> > Gotta run, but that's a good question and I'll think about. But I
> >>>> think it
> >>>> > comes down to making less changes, and being able to leverage
> existing
> >>>> > reconciliation protocols.
> >>>> > Do note that TaskExecutor also explicitly inform the RM about freed
> >>>> slots;
> >>>> > the heartbeat slot report is just a safety net.
> >>>> > I'm not sure whether slot requests are able to overtake a slot
> >>>> release;
> >>>> > @till do you have thoughts on that?
> >>>> > As for the race condition between the requirements reduction and
> slot
> >>>> > release, if we run into problems we have the backup plan of only
> >>>> releasing
> >>>> > the slot after the requirement reduction has been acknowledged.
> >>>> >
> >>>> > On 26/08/2020 10:31, Xintong Song wrote:
> >>>> >
> >>>> > Thanks for preparing the FLIP and driving this discussion, @Chesnay
> &
> >>>> @Till.
> >>>> >
> >>>> > I really like the idea. I see a great value in the proposed
> >>>> declarative
> >>>> > resource management, in terms of flexibility, usability and
> >>>> efficiency.
> >>>> >
> >>>> > I have a few comments and questions regarding the FLIP design. In
> >>>> general,
> >>>> > the protocol design makes good sense to me. My main concern is that
> >>>> it is
> >>>> > not very clear to me what changes are required from the
> >>>> > Resource/SlotManager side to adapt to the new protocol.
> >>>> >
> >>>> > *1. Distributed slots across different jobs*
> >>>> >
> >>>> > Jobs which register their requirements first, will have precedence
> >>>> over
> >>>> >
> >>>> > other jobs also if the requirements change during the runtime.
> >>>> >
> >>>> > Just trying to understand, does this mean jobs are prioritized by
> the
> >>>> order
> >>>> > of their first resource declaring?
> >>>> >
> >>>> > *2. AllocationID*
> >>>> >
> >>>> > Is this FLIP suggesting to completely remove AllocationID?
> >>>> >
> >>>> > I'm fine with this change. It seems where AllocationID is used can
> >>>> either
> >>>> > be removed or be replaced by JobID. This reflects the concept that
> >>>> slots
> >>>> > are now assigned to a job instead of its individual slot requests.
> >>>> >
> >>>> > I would like to bring to attention that this also requires changes
> on
> >>>> the
> >>>> > TM side, with respect to FLIP-56[1].
> >>>> >
> >>>> > In the context of dynamic slot allocation introduced by FLIP-56,
> >>>> slots do
> >>>> > not pre-exist on TM and are dynamically created when RM calls
> >>>> > TaskExecutorGateway.requestSlot. Since the slots do not pre-exist,
> nor
> >>>> > their SlotIDs, RM requests slots from TM with a special SlotID
> >>>> (negative
> >>>> > slot index). The semantic changes from "requesting the slot
> >>>> identified by
> >>>> > the given SlotID" to "requesting a slot with the given resource
> >>>> profile".
> >>>> > The AllocationID is used for identifying the dynamic slots in such
> >>>> cases.
> >>>> >
> >>>> > >From the perspective of FLIP-56 and fine grained resource
> >>>> management, I'm
> >>>> > fine with removing AllocationID. In the meantime, we would need TM
> to
> >>>> > recognize the special negative indexed SlotID and generate a new
> >>>> unique
> >>>> > SlotID for identifying the slot.
> >>>> >
> >>>> > *3. Minimum resource requirement*
> >>>> >
> >>>> > However, we can let the JobMaster know if we cannot fulfill the
> >>>> minimum
> >>>> >
> >>>> > resource requirement for a job after
> >>>> > resourcemanager.standalone.start-up-time has passed.
> >>>> >
> >>>> > What is the "minimum resource requirement for a job"? Did I overlook
> >>>> > anything?
> >>>> >
> >>>> > *4. Offer/free slots between JM/TM*
> >>>> >
> >>>> > This probably deserves a separate discussion thread. Just want to
> >>>> bring it
> >>>> > up.
> >>>> >
> >>>> > The idea has been coming to me for quite some time. Is this design,
> >>>> that JM
> >>>> > requests resources from RM while accepting/releasing resources
> >>>> from/to TM,
> >>>> > the right thing?
> >>>> >
> >>>> > The pain point is that events of JM's activities
> (requesting/releasing
> >>>> > resources) arrive at RM out of order. This leads to several
> problems.
> >>>> >
> >>>> >    - When a job fails and task cancelation takes long, some of the
> >>>> slots
> >>>> >    might be released from the slot pool due to being unused for a
> >>>> while. Then
> >>>> >    the job restarts and requests these slots again. At this time, RM
> >>>> may
> >>>> >    receive slot requests before noticing from TM heartbeats that
> >>>> previous
> >>>> >    slots are released, thus requesting new resources. I've seen many
> >>>> times
> >>>> >    that the Yarn cluster has a heavy load and is not allocating
> >>>> resources
> >>>> >    quickly enough, which leads to slot request timeout and job
> >>>> failover, and
> >>>> >    during the failover more resources are requested which adds more
> >>>> load to
> >>>> >    the Yarn cluster. Happily, this should be improved with the
> >>>> declarative
> >>>> >    resource management. :)
> >>>> >    - As described in this FLIP, it is possible that RM learns the
> >>>> releasing
> >>>> >    of slots from TM heartbeat before noticing the resource
> requirement
> >>>> >    decreasing, it may allocate more resources which need to be
> >>>> released soon.
> >>>> >    - It complicates the ResourceManager/SlotManager, by requiring an
> >>>> >    additional slot state PENDING, which means the slot is assigned
> by
> >>>> RM but
> >>>> >    is not confirmed successfully ordered by TM.
> >>>> >
> >>>> > Why not just make RM offer the allocated resources (TM address,
> >>>> SlotID,
> >>>> > etc.) to JM, and JM release resources to RM? So that for all the
> >>>> resource
> >>>> > management JM talks to RM, and for the task deployment and execution
> >>>> it
> >>>> > talks to TM?
> >>>> >
> >>>> > I tried to understand the benefits for having the current design,
> and
> >>>> found
> >>>> > the following in FLIP-6[2].
> >>>> >
> >>>> >
> >>>> > All that the ResourceManager does is negotiate between the
> >>>> > cluster-manager, the JobManager, and the TaskManagers. Its state can
> >>>> hence
> >>>> > be reconstructed from re-acquiring containers and re-registration
> from
> >>>> > JobManagers and TaskManagers
> >>>> >
> >>>> > Correct me if I'm wrong, it seems the original purpose is to make
> >>>> sure the
> >>>> > assignment between jobs and slots are confirmed between JM and TMs,
> >>>> so that
> >>>> > failures of RM will not lead to any inconsistency. However, this
> only
> >>>> > benefits scenarios where RM fails while JM and TMs live. Currently,
> >>>> JM and
> >>>> > RM are in the same process. We do not really have any scenario where
> >>>> RM
> >>>> > fails alone. We might separate JM and RM to different processes in
> >>>> future,
> >>>> > but as far as I can see we don't have such requirements at the
> >>>> moment. It
> >>>> > seems to me that we are suffering the current problems, complying to
> >>>> > potential future benefits.
> >>>> >
> >>>> > Maybe I overlooked something.
> >>>> >
> >>>> > *5. Implementation Plan*
> >>>> >
> >>>> > For SlotPool, it sounds quite straightforward to "aggregate
> >>>> individual slot
> >>>> > requests".
> >>>> >
> >>>> > For Resource/SlotManager, it seems there are quite a lot changes
> >>>> needed,
> >>>> > with the removal of individual slot requests and AllocationID. It's
> >>>> not
> >>>> > clear to me what is the first step plan for RM/SM? Do we internally
> >>>> treat
> >>>> > the resource requirements as individual slot requests as the first
> >>>> step, so
> >>>> > only the interfaces are changed? Or do we actually change
> (practically
> >>>> > re-write) the slot allocation logics?
> >>>> >
> >>>> > Thank you~
> >>>> >
> >>>> > Xintong Song
> >>>> >
> >>>> >
> >>>> > [1]
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> >>>> > [2]
> >>>>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> >>>> >
> >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
> [hidden email]>
> >>>> <[hidden email]> wrote:
> >>>> >
> >>>> >
> >>>> > Hello,
> >>>> >
> >>>> > in FLIP-138 we want to rework the way the JobMaster acquires slots,
> >>>> such
> >>>> > that required resources are declared before a job is scheduled and
> th
> >>>> > job execution is adjusted according to the provided resources (e.g.,
> >>>> > reducing parallelism), instead of asking for a fixed number of
> >>>> resources
> >>>> > during scheduling and failing midway through if not enough resources
> >>>> are
> >>>> > available.
> >>>> >
> >>>> > This is a stepping stone towards reactive mode, where Flink will
> >>>> > automatically make use of new TaskExecutors being started.
> >>>> >
> >>>> > More details can be found here
> >>>> > <
> >>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> >>>> >
> >>>> > .
> >>>> >
> >>>> >
> >>>> >
> >>>>
> >>>
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Zhu Zhu
Thanks for the clarification @Till Rohrmann <[hidden email]>

>> # Implications for the scheduling
Agreed that it turned out to be different execution strategies for batch
jobs.
We can have a simple one first and improve it later.

Thanks,
Zhu

Xintong Song <[hidden email]> 于2020年8月31日周一 下午3:05写道:

> Thanks for the clarification, @Till.
>
> - For FLIP-56, sounds good to me. I think there should be no problem before
> removing AllocationID. And even after replacing AllocationID, it should
> only require limited effort to make FLIP-56 work with SlotID. I was just
> trying to understand when the effort will be needed.
>
> - For offer/release slots between JM/TM, I think you are right.
> Waiting on the confirmation for resource requirement decrease before
> freeing the slot is quite equivalent to releasing slots through RM, in
> terms of it practically preventing JM from releasing slots when the RM is
> absent. But this approach obviously requires less change to the current
> mechanism.
> Since the first problem can be solved by the declarative protocol, and the
> second problem can be addressed by this confirmation based approach, ATM I
> don't see any strong reason for changing to offering and releasing slots
> through RM, especially considering the significant changes it requires.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <[hidden email]>
> wrote:
>
> > Thanks for creating this FLIP @Chesnay and the good input @Xintong and
> @Zhu
> > Zhu.
> >
> > Let me try to add some comments concerning your questions:
> >
> > # FLIP-56
> >
> > I think there is nothing fundamentally contradicting FLIP-56 in the FLIP
> > for declarative resource management. As Chesnay said, we have to keep the
> > AllocationID around as long as we have the old scheduler implementation.
> > Once it is replaced, we can think about using the SlotID instead of
> > AllocationIDs for identifying allocated slots. For dynamic slots we can
> > keep the special meaning of a SlotID with a negative index. In the future
> > we might think about making this encoding a bit more explicit by sending
> a
> > richer slot request object and reporting the actual SlotID back to the
> RM.
> >
> > For the question of resource utilization vs. deployment latency I believe
> > that this will be a question of requirements and preferences as you've
> said
> > Xintong. I can see that we will have different strategies to fulfill the
> > different needs.
> >
> > # Offer/free slots between JM/TM
> >
> > You are right Xintong that the existing slot protocol was developed with
> > the assumption in mind that the RM and JM can run in separate processes
> and
> > that a failure of the RM should only affect the JM in the sense that it
> > cannot ask for more resources. I believe that one could simplify things a
> > bit under the assumption that the RM and JM are always colocated in the
> > same process. However, the discussion whether to change it or not should
> > indeed be a separate one.
> >
> > Changing the slot protocol to a declarative resource management should
> > already solve the first problem you have described because we won't ask
> for
> > new slots in case of a failover but simply keep the same resource
> > requirements declared and let the RM make sure that we will receive at
> > least this amount of slots.
> >
> > If releasing a slot should lead to allocating new resources because
> > decreasing the resource requirement declaration takes longer than
> releasing
> > the slot on the TM, then we could apply what Chesnay said. By waiting on
> > the confirmation of the resource requirement decrease and then freeing
> the
> > slot on the TM gives you effectively the same behaviour as if the freeing
> > of the slot would be done by the RM.
> >
> > I am not entirely sure whether allocating the slots and receiving the
> slot
> > offers through the RM will allow us to get rid of the pending slot state
> on
> > the RM side. If the RM needs to communicate with the TM and we want to
> have
> > a reconciliation protocol between these components, then I think we would
> > have to solve the exact same problem of currently waiting on the TM for
> > confirming that a slot has been allocated.
> >
> > # Implications for the scheduling
> >
> > The FLIP does not fully cover the changes for the scheduler and mainly
> > drafts the rough idea. For the batch scheduling, I believe that we have a
> > couple degrees of freedom in how to do things. In the scenario you
> > described, one could choose a simple strategy where we wait for all
> > producers to stop before deciding on the parallelism of the consumer and
> > scheduling the respective tasks (even though they have POINTWISE BLOCKING
> > edges). Or we can try to be smart and say if we get at least one slot
> that
> > we can run the consumers with the same parallelism as the producers it
> just
> > might be that we have to run them one after another in a single slot. One
> > advantage of not directly schedule the first consumer when the first
> > producer is finished is that one might schedule the consumer stage with a
> > higher parallelism because one might acquire more resources a bit later.
> > But I would see this as different execution strategies which have
> different
> > properties.
> >
> > Cheers,
> > Till
> >
> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <[hidden email]> wrote:
> >
> > > Thanks for the explanation @Chesnay Schepler <[hidden email]> .
> > >
> > > Yes, for batch jobs it can be safe to schedule downstream vertices if
> > > there
> > > are enough slots in the pool, even if these slots are still in use at
> > that
> > > moment.
> > > And the job can still progress even if the vertices stick to the
> original
> > > parallelism.
> > >
> > > Looks to me several decision makings can be different for streaming and
> > > batch jobs.
> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> > > construction!
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Chesnay Schepler <[hidden email]> 于2020年8月28日周五 下午4:35写道:
> > >
> > >> Maybe :)
> > >>
> > >> Imagine a case where the producer and consumer have the same
> > >> ResourceProfile, or at least one where the consumer requirements are
> > less
> > >> than the producer ones.
> > >> In this case, the scheduler can happily schedule consumers, because it
> > >> knows it will get enough slots.
> > >>
> > >> If the profiles are different, then the Scheduler _may_ wait
> > >> numberOf(producer) slots; it _may_ also stick with the parallelism and
> > >> schedule right away, in the worst case running the consumers in
> > sequence.
> > >> In fact, for batch jobs there is probably(?) never a reason for the
> > >> scheduler to _reduce_ the parallelism; it can always try to run things
> > in
> > >> sequence if it doesn't get enough slots.
> > >> Reducing the parallelism would just mean that you'd have to wait for
> > more
> > >> producers to finish.
> > >>
> > >> The scope of this FLIP is just the protocol, without changes to the
> > >> scheduler; in other words just changing how slots are acquired, but
> > change
> > >> nothing about the scheduling. That is tackled in a follow-up FLIP.
> > >>
> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
> > >>
> > >> Thanks for the response!
> > >>
> > >> >> The scheduler doesn't have to wait for one stage to finish
> > >> Does it mean we will declare resources and decide the parallelism for
> a
> > >> stage which is partially
> > >> schedulable, i.e. when input data are ready just for part of the
> > >> execution vertices?
> > >>
> > >> >> This will get more complicated once we allow the scheduler to
> change
> > >> the parallelism while the job is running
> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
> > avoided
> > >> for streaming jobs.
> > >> Will this FLIP limit its scope to streaming jobs, and improvements for
> > >> batch jobs are to be done later?
> > >>
> > >> Thanks,
> > >> Zhu
> > >>
> > >> Chesnay Schepler <[hidden email]> 于2020年8月28日周五 上午2:27写道:
> > >>
> > >>> The scheduler doesn't have to wait for one stage to finish. It is
> still
> > >>> aware that the upstream execution vertex has finished, and can
> > request/use
> > >>> slots accordingly to schedule the consumer.
> > >>>
> > >>> This will get more complicated once we allow the scheduler to change
> > the
> > >>> parallelism while the job is running, for which we will need some
> > >>> enhancements to the network stack to allow the producer to run
> without
> > >>> knowing the consumer parallelism ahead of time. I'm not too clear on
> > the
> > >>> details, but we'll some form of keygroup-like approach for sub
> > partitions
> > >>> (maxParallelism and all that).
> > >>>
> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
> > >>>
> > >>> Thanks Chesnay&Till for proposing this improvement.
> > >>> It's of good value to allow jobs to make best use of available
> > resources
> > >>> adaptively. Not
> > >>> to mention it further supports reactive mode.
> > >>> So big +1 for it.
> > >>>
> > >>> I have a minor concern about possible regression in certain cases due
> > to
> > >>> the proposed
> > >>> JobVertex-wise scheduling which replaces current ExecutionVertex-wise
> > >>> scheduling.
> > >>> In the proposal, looks to me it requires a stage to finish before its
> > >>> consumer stage can be
> > >>> scheduled. This limitation, however, does not exist in current
> > >>> scheduler. In the case that there
> > >>> exists a POINTWISE BLOCKING edge, the downstream execution region can
> > be
> > >>> scheduled
> > >>> right after its connected upstream execution vertices finishes, even
> > >>> before the whole upstream
> > >>> stage finishes. This allows the region to be launched earlier and
> make
> > >>> use of available resources.
> > >>> Do we need to let the new scheduler retain this property?
> > >>>
> > >>> Thanks,
> > >>> Zhu
> > >>>
> > >>> Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:
> > >>>
> > >>>> Thanks for the quick response.
> > >>>>
> > >>>> *Job prioritization, Allocation IDs, Minimum resource
> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to me.
> > >>>>
> > >>>> *FLIP-56*
> > >>>> Good point about the trade-off. I believe maximum resource
> utilization
> > >>>> and
> > >>>> quick deployment are desired in different scenarios. E.g., a long
> > >>>> running
> > >>>> streaming job deserves some deployment latency to improve the
> resource
> > >>>> utilization, which benefits the entire lifecycle of the job. On the
> > >>>> other
> > >>>> hand, short batch queries may prefer quick deployment, otherwise the
> > >>>> time
> > >>>> for resource allocation might significantly increase the response
> > time.
> > >>>> It would be good enough for me to bring these questions to
> attention.
> > >>>> Nothing that I'm aware of should block this FLIP.
> > >>>>
> > >>>> Thank you~
> > >>>>
> > >>>> Xintong Song
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
> [hidden email]>
> > >>>> wrote:
> > >>>>
> > >>>> > Thank you Xintong for your questions!
> > >>>> > Job prioritization
> > >>>> > Yes, the job which declares it's initial requirements first is
> > >>>> prioritized.
> > >>>> > This is very much for simplicity; for example this avoids the
> nasty
> > >>>> case
> > >>>> > where all jobs get some resources, but none get enough to actually
> > >>>> run the
> > >>>> > job.
> > >>>> > Minimum resource requirements
> > >>>> >
> > >>>> > My bad; at some point we want to allow the JobMaster to declare a
> > >>>> range of
> > >>>> > resources it could use to run a job, for example min=1, target=10,
> > >>>> > max=+inf.
> > >>>> >
> > >>>> > With this model, the RM would then try to balance the resources
> such
> > >>>> that
> > >>>> > as many jobs as possible are as close to the target state as
> > possible.
> > >>>> >
> > >>>> > Currently, the minimum/target/maximum resources are all the same.
> So
> > >>>> the
> > >>>> > notification is sent whenever the current requirements cannot be
> > met.
> > >>>> > Allocation IDs
> > >>>> > We do intend to, at the very least, remove AllocationIDs on the
> > >>>> > SlotManager side, as they are just not required there.
> > >>>> >
> > >>>> > On the slotpool side we have to keep them around at least until
> the
> > >>>> > existing Slotpool implementations are removed (not sure whether
> > we'll
> > >>>> fully
> > >>>> > commit to this in 1.12), since the interfaces use AllocationIDs,
> > >>>> which also
> > >>>> > bleed into the JobMaster.
> > >>>> > The TaskExecutor is in a similar position.
> > >>>> > But in the long-term, yes they will be removed, and most usages
> will
> > >>>> > probably be replaced by the SlotID.
> > >>>> > FLIP-56
> > >>>> >
> > >>>> > Dynamic slot allocations are indeed quite interesting and raise a
> > few
> > >>>> > questions; for example, the main purpose of it is to ensure
> maximum
> > >>>> > resource utilization. In that case, should the JobMaster be
> allowed
> > to
> > >>>> > re-use a slot it if the task requires less resources than the slot
> > >>>> > provides, or should it always request a new slot that exactly
> > matches?
> > >>>> >
> > >>>> > There is a trade-off to be made between maximum resource
> utilization
> > >>>> > (request exactly matching slots, and only re-use exact matches)
> and
> > >>>> quicker
> > >>>> > job deployment (re-use slot even if they don't exactly match, skip
> > >>>> > round-trip to RM).
> > >>>> >
> > >>>> > As for how to handle the lack of a preemptively known SlotIDs,
> that
> > >>>> should
> > >>>> > be fine in and of itself; we already handle a similar case when we
> > >>>> request
> > >>>> > a new TaskExecutor to be started. So long as there is some way to
> > >>>> know how
> > >>>> > many resources the TaskExecutor has in total I do not see a
> problem
> > >>>> at the
> > >>>> > moment. We will get the SlotID eventually by virtue of the
> heartbeat
> > >>>> > SlotReport.
> > >>>> > Implementation plan (SlotManager)
> > >>>> > You are on the right track. The SlotManager tracks the declared
> > >>>> resource
> > >>>> > requirements, and if the requirements increased it creates a
> > >>>> SlotRequest,
> > >>>> > which then goes through similar code paths as we have at the
> moment
> > >>>> (try to
> > >>>> > find a free slot, if found tell the TM, otherwise try to request
> new
> > >>>> TM).
> > >>>> > The SlotManager changes are not that substantial to get a working
> > >>>> version;
> > >>>> > we have a PoC and most of the work went into refactoring the
> > >>>> SlotManager
> > >>>> > into a more manageable state. (split into several components,
> > >>>> stricter and
> > >>>> > simplified Slot life-cycle, ...).
> > >>>> > Offer/free slots between JM/TM
> > >>>> > Gotta run, but that's a good question and I'll think about. But I
> > >>>> think it
> > >>>> > comes down to making less changes, and being able to leverage
> > existing
> > >>>> > reconciliation protocols.
> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
> freed
> > >>>> slots;
> > >>>> > the heartbeat slot report is just a safety net.
> > >>>> > I'm not sure whether slot requests are able to overtake a slot
> > >>>> release;
> > >>>> > @till do you have thoughts on that?
> > >>>> > As for the race condition between the requirements reduction and
> > slot
> > >>>> > release, if we run into problems we have the backup plan of only
> > >>>> releasing
> > >>>> > the slot after the requirement reduction has been acknowledged.
> > >>>> >
> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
> > >>>> >
> > >>>> > Thanks for preparing the FLIP and driving this discussion,
> @Chesnay
> > &
> > >>>> @Till.
> > >>>> >
> > >>>> > I really like the idea. I see a great value in the proposed
> > >>>> declarative
> > >>>> > resource management, in terms of flexibility, usability and
> > >>>> efficiency.
> > >>>> >
> > >>>> > I have a few comments and questions regarding the FLIP design. In
> > >>>> general,
> > >>>> > the protocol design makes good sense to me. My main concern is
> that
> > >>>> it is
> > >>>> > not very clear to me what changes are required from the
> > >>>> > Resource/SlotManager side to adapt to the new protocol.
> > >>>> >
> > >>>> > *1. Distributed slots across different jobs*
> > >>>> >
> > >>>> > Jobs which register their requirements first, will have precedence
> > >>>> over
> > >>>> >
> > >>>> > other jobs also if the requirements change during the runtime.
> > >>>> >
> > >>>> > Just trying to understand, does this mean jobs are prioritized by
> > the
> > >>>> order
> > >>>> > of their first resource declaring?
> > >>>> >
> > >>>> > *2. AllocationID*
> > >>>> >
> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
> > >>>> >
> > >>>> > I'm fine with this change. It seems where AllocationID is used can
> > >>>> either
> > >>>> > be removed or be replaced by JobID. This reflects the concept that
> > >>>> slots
> > >>>> > are now assigned to a job instead of its individual slot requests.
> > >>>> >
> > >>>> > I would like to bring to attention that this also requires changes
> > on
> > >>>> the
> > >>>> > TM side, with respect to FLIP-56[1].
> > >>>> >
> > >>>> > In the context of dynamic slot allocation introduced by FLIP-56,
> > >>>> slots do
> > >>>> > not pre-exist on TM and are dynamically created when RM calls
> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not pre-exist,
> > nor
> > >>>> > their SlotIDs, RM requests slots from TM with a special SlotID
> > >>>> (negative
> > >>>> > slot index). The semantic changes from "requesting the slot
> > >>>> identified by
> > >>>> > the given SlotID" to "requesting a slot with the given resource
> > >>>> profile".
> > >>>> > The AllocationID is used for identifying the dynamic slots in such
> > >>>> cases.
> > >>>> >
> > >>>> > >From the perspective of FLIP-56 and fine grained resource
> > >>>> management, I'm
> > >>>> > fine with removing AllocationID. In the meantime, we would need TM
> > to
> > >>>> > recognize the special negative indexed SlotID and generate a new
> > >>>> unique
> > >>>> > SlotID for identifying the slot.
> > >>>> >
> > >>>> > *3. Minimum resource requirement*
> > >>>> >
> > >>>> > However, we can let the JobMaster know if we cannot fulfill the
> > >>>> minimum
> > >>>> >
> > >>>> > resource requirement for a job after
> > >>>> > resourcemanager.standalone.start-up-time has passed.
> > >>>> >
> > >>>> > What is the "minimum resource requirement for a job"? Did I
> overlook
> > >>>> > anything?
> > >>>> >
> > >>>> > *4. Offer/free slots between JM/TM*
> > >>>> >
> > >>>> > This probably deserves a separate discussion thread. Just want to
> > >>>> bring it
> > >>>> > up.
> > >>>> >
> > >>>> > The idea has been coming to me for quite some time. Is this
> design,
> > >>>> that JM
> > >>>> > requests resources from RM while accepting/releasing resources
> > >>>> from/to TM,
> > >>>> > the right thing?
> > >>>> >
> > >>>> > The pain point is that events of JM's activities
> > (requesting/releasing
> > >>>> > resources) arrive at RM out of order. This leads to several
> > problems.
> > >>>> >
> > >>>> >    - When a job fails and task cancelation takes long, some of the
> > >>>> slots
> > >>>> >    might be released from the slot pool due to being unused for a
> > >>>> while. Then
> > >>>> >    the job restarts and requests these slots again. At this time,
> RM
> > >>>> may
> > >>>> >    receive slot requests before noticing from TM heartbeats that
> > >>>> previous
> > >>>> >    slots are released, thus requesting new resources. I've seen
> many
> > >>>> times
> > >>>> >    that the Yarn cluster has a heavy load and is not allocating
> > >>>> resources
> > >>>> >    quickly enough, which leads to slot request timeout and job
> > >>>> failover, and
> > >>>> >    during the failover more resources are requested which adds
> more
> > >>>> load to
> > >>>> >    the Yarn cluster. Happily, this should be improved with the
> > >>>> declarative
> > >>>> >    resource management. :)
> > >>>> >    - As described in this FLIP, it is possible that RM learns the
> > >>>> releasing
> > >>>> >    of slots from TM heartbeat before noticing the resource
> > requirement
> > >>>> >    decreasing, it may allocate more resources which need to be
> > >>>> released soon.
> > >>>> >    - It complicates the ResourceManager/SlotManager, by requiring
> an
> > >>>> >    additional slot state PENDING, which means the slot is assigned
> > by
> > >>>> RM but
> > >>>> >    is not confirmed successfully ordered by TM.
> > >>>> >
> > >>>> > Why not just make RM offer the allocated resources (TM address,
> > >>>> SlotID,
> > >>>> > etc.) to JM, and JM release resources to RM? So that for all the
> > >>>> resource
> > >>>> > management JM talks to RM, and for the task deployment and
> execution
> > >>>> it
> > >>>> > talks to TM?
> > >>>> >
> > >>>> > I tried to understand the benefits for having the current design,
> > and
> > >>>> found
> > >>>> > the following in FLIP-6[2].
> > >>>> >
> > >>>> >
> > >>>> > All that the ResourceManager does is negotiate between the
> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its state
> can
> > >>>> hence
> > >>>> > be reconstructed from re-acquiring containers and re-registration
> > from
> > >>>> > JobManagers and TaskManagers
> > >>>> >
> > >>>> > Correct me if I'm wrong, it seems the original purpose is to make
> > >>>> sure the
> > >>>> > assignment between jobs and slots are confirmed between JM and
> TMs,
> > >>>> so that
> > >>>> > failures of RM will not lead to any inconsistency. However, this
> > only
> > >>>> > benefits scenarios where RM fails while JM and TMs live.
> Currently,
> > >>>> JM and
> > >>>> > RM are in the same process. We do not really have any scenario
> where
> > >>>> RM
> > >>>> > fails alone. We might separate JM and RM to different processes in
> > >>>> future,
> > >>>> > but as far as I can see we don't have such requirements at the
> > >>>> moment. It
> > >>>> > seems to me that we are suffering the current problems, complying
> to
> > >>>> > potential future benefits.
> > >>>> >
> > >>>> > Maybe I overlooked something.
> > >>>> >
> > >>>> > *5. Implementation Plan*
> > >>>> >
> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
> > >>>> individual slot
> > >>>> > requests".
> > >>>> >
> > >>>> > For Resource/SlotManager, it seems there are quite a lot changes
> > >>>> needed,
> > >>>> > with the removal of individual slot requests and AllocationID.
> It's
> > >>>> not
> > >>>> > clear to me what is the first step plan for RM/SM? Do we
> internally
> > >>>> treat
> > >>>> > the resource requirements as individual slot requests as the first
> > >>>> step, so
> > >>>> > only the interfaces are changed? Or do we actually change
> > (practically
> > >>>> > re-write) the slot allocation logics?
> > >>>> >
> > >>>> > Thank you~
> > >>>> >
> > >>>> > Xintong Song
> > >>>> >
> > >>>> >
> > >>>> > [1]
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> > >>>> > [2]
> > >>>>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> > >>>> >
> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
> > [hidden email]>
> > >>>> <[hidden email]> wrote:
> > >>>> >
> > >>>> >
> > >>>> > Hello,
> > >>>> >
> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
> slots,
> > >>>> such
> > >>>> > that required resources are declared before a job is scheduled and
> > th
> > >>>> > job execution is adjusted according to the provided resources
> (e.g.,
> > >>>> > reducing parallelism), instead of asking for a fixed number of
> > >>>> resources
> > >>>> > during scheduling and failing midway through if not enough
> resources
> > >>>> are
> > >>>> > available.
> > >>>> >
> > >>>> > This is a stepping stone towards reactive mode, where Flink will
> > >>>> > automatically make use of new TaskExecutors being started.
> > >>>> >
> > >>>> > More details can be found here
> > >>>> > <
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> > >>>> >
> > >>>> > .
> > >>>> >
> > >>>> >
> > >>>> >
> > >>>>
> > >>>
> > >>>
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Till Rohrmann
Thanks for the feedback Xintong and Zhu Zhu. I've added a bit more details
for the intended interface extensions, potential follow ups (removing the
AllocationIDs) and the question about whether to reuse or return a slot if
the profiles don't fully match.

If nobody objects, then I would start a vote for this FLIP soon.

Cheers,
Till

On Mon, Aug 31, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote:

> Thanks for the clarification @Till Rohrmann <[hidden email]>
>
> >> # Implications for the scheduling
> Agreed that it turned out to be different execution strategies for batch
> jobs.
> We can have a simple one first and improve it later.
>
> Thanks,
> Zhu
>
> Xintong Song <[hidden email]> 于2020年8月31日周一 下午3:05写道:
>
>> Thanks for the clarification, @Till.
>>
>> - For FLIP-56, sounds good to me. I think there should be no problem
>> before
>> removing AllocationID. And even after replacing AllocationID, it should
>> only require limited effort to make FLIP-56 work with SlotID. I was just
>> trying to understand when the effort will be needed.
>>
>> - For offer/release slots between JM/TM, I think you are right.
>> Waiting on the confirmation for resource requirement decrease before
>> freeing the slot is quite equivalent to releasing slots through RM, in
>> terms of it practically preventing JM from releasing slots when the RM is
>> absent. But this approach obviously requires less change to the current
>> mechanism.
>> Since the first problem can be solved by the declarative protocol, and the
>> second problem can be addressed by this confirmation based approach, ATM I
>> don't see any strong reason for changing to offering and releasing slots
>> through RM, especially considering the significant changes it requires.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <[hidden email]>
>> wrote:
>>
>> > Thanks for creating this FLIP @Chesnay and the good input @Xintong and
>> @Zhu
>> > Zhu.
>> >
>> > Let me try to add some comments concerning your questions:
>> >
>> > # FLIP-56
>> >
>> > I think there is nothing fundamentally contradicting FLIP-56 in the FLIP
>> > for declarative resource management. As Chesnay said, we have to keep
>> the
>> > AllocationID around as long as we have the old scheduler implementation.
>> > Once it is replaced, we can think about using the SlotID instead of
>> > AllocationIDs for identifying allocated slots. For dynamic slots we can
>> > keep the special meaning of a SlotID with a negative index. In the
>> future
>> > we might think about making this encoding a bit more explicit by
>> sending a
>> > richer slot request object and reporting the actual SlotID back to the
>> RM.
>> >
>> > For the question of resource utilization vs. deployment latency I
>> believe
>> > that this will be a question of requirements and preferences as you've
>> said
>> > Xintong. I can see that we will have different strategies to fulfill the
>> > different needs.
>> >
>> > # Offer/free slots between JM/TM
>> >
>> > You are right Xintong that the existing slot protocol was developed with
>> > the assumption in mind that the RM and JM can run in separate processes
>> and
>> > that a failure of the RM should only affect the JM in the sense that it
>> > cannot ask for more resources. I believe that one could simplify things
>> a
>> > bit under the assumption that the RM and JM are always colocated in the
>> > same process. However, the discussion whether to change it or not should
>> > indeed be a separate one.
>> >
>> > Changing the slot protocol to a declarative resource management should
>> > already solve the first problem you have described because we won't ask
>> for
>> > new slots in case of a failover but simply keep the same resource
>> > requirements declared and let the RM make sure that we will receive at
>> > least this amount of slots.
>> >
>> > If releasing a slot should lead to allocating new resources because
>> > decreasing the resource requirement declaration takes longer than
>> releasing
>> > the slot on the TM, then we could apply what Chesnay said. By waiting on
>> > the confirmation of the resource requirement decrease and then freeing
>> the
>> > slot on the TM gives you effectively the same behaviour as if the
>> freeing
>> > of the slot would be done by the RM.
>> >
>> > I am not entirely sure whether allocating the slots and receiving the
>> slot
>> > offers through the RM will allow us to get rid of the pending slot
>> state on
>> > the RM side. If the RM needs to communicate with the TM and we want to
>> have
>> > a reconciliation protocol between these components, then I think we
>> would
>> > have to solve the exact same problem of currently waiting on the TM for
>> > confirming that a slot has been allocated.
>> >
>> > # Implications for the scheduling
>> >
>> > The FLIP does not fully cover the changes for the scheduler and mainly
>> > drafts the rough idea. For the batch scheduling, I believe that we have
>> a
>> > couple degrees of freedom in how to do things. In the scenario you
>> > described, one could choose a simple strategy where we wait for all
>> > producers to stop before deciding on the parallelism of the consumer and
>> > scheduling the respective tasks (even though they have POINTWISE
>> BLOCKING
>> > edges). Or we can try to be smart and say if we get at least one slot
>> that
>> > we can run the consumers with the same parallelism as the producers it
>> just
>> > might be that we have to run them one after another in a single slot.
>> One
>> > advantage of not directly schedule the first consumer when the first
>> > producer is finished is that one might schedule the consumer stage with
>> a
>> > higher parallelism because one might acquire more resources a bit later.
>> > But I would see this as different execution strategies which have
>> different
>> > properties.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <[hidden email]> wrote:
>> >
>> > > Thanks for the explanation @Chesnay Schepler <[hidden email]> .
>> > >
>> > > Yes, for batch jobs it can be safe to schedule downstream vertices if
>> > > there
>> > > are enough slots in the pool, even if these slots are still in use at
>> > that
>> > > moment.
>> > > And the job can still progress even if the vertices stick to the
>> original
>> > > parallelism.
>> > >
>> > > Looks to me several decision makings can be different for streaming
>> and
>> > > batch jobs.
>> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
>> > > construction!
>> > >
>> > > Thanks,
>> > > Zhu
>> > >
>> > > Chesnay Schepler <[hidden email]> 于2020年8月28日周五 下午4:35写道:
>> > >
>> > >> Maybe :)
>> > >>
>> > >> Imagine a case where the producer and consumer have the same
>> > >> ResourceProfile, or at least one where the consumer requirements are
>> > less
>> > >> than the producer ones.
>> > >> In this case, the scheduler can happily schedule consumers, because
>> it
>> > >> knows it will get enough slots.
>> > >>
>> > >> If the profiles are different, then the Scheduler _may_ wait
>> > >> numberOf(producer) slots; it _may_ also stick with the parallelism
>> and
>> > >> schedule right away, in the worst case running the consumers in
>> > sequence.
>> > >> In fact, for batch jobs there is probably(?) never a reason for the
>> > >> scheduler to _reduce_ the parallelism; it can always try to run
>> things
>> > in
>> > >> sequence if it doesn't get enough slots.
>> > >> Reducing the parallelism would just mean that you'd have to wait for
>> > more
>> > >> producers to finish.
>> > >>
>> > >> The scope of this FLIP is just the protocol, without changes to the
>> > >> scheduler; in other words just changing how slots are acquired, but
>> > change
>> > >> nothing about the scheduling. That is tackled in a follow-up FLIP.
>> > >>
>> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
>> > >>
>> > >> Thanks for the response!
>> > >>
>> > >> >> The scheduler doesn't have to wait for one stage to finish
>> > >> Does it mean we will declare resources and decide the parallelism
>> for a
>> > >> stage which is partially
>> > >> schedulable, i.e. when input data are ready just for part of the
>> > >> execution vertices?
>> > >>
>> > >> >> This will get more complicated once we allow the scheduler to
>> change
>> > >> the parallelism while the job is running
>> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
>> > avoided
>> > >> for streaming jobs.
>> > >> Will this FLIP limit its scope to streaming jobs, and improvements
>> for
>> > >> batch jobs are to be done later?
>> > >>
>> > >> Thanks,
>> > >> Zhu
>> > >>
>> > >> Chesnay Schepler <[hidden email]> 于2020年8月28日周五 上午2:27写道:
>> > >>
>> > >>> The scheduler doesn't have to wait for one stage to finish. It is
>> still
>> > >>> aware that the upstream execution vertex has finished, and can
>> > request/use
>> > >>> slots accordingly to schedule the consumer.
>> > >>>
>> > >>> This will get more complicated once we allow the scheduler to change
>> > the
>> > >>> parallelism while the job is running, for which we will need some
>> > >>> enhancements to the network stack to allow the producer to run
>> without
>> > >>> knowing the consumer parallelism ahead of time. I'm not too clear on
>> > the
>> > >>> details, but we'll some form of keygroup-like approach for sub
>> > partitions
>> > >>> (maxParallelism and all that).
>> > >>>
>> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
>> > >>>
>> > >>> Thanks Chesnay&Till for proposing this improvement.
>> > >>> It's of good value to allow jobs to make best use of available
>> > resources
>> > >>> adaptively. Not
>> > >>> to mention it further supports reactive mode.
>> > >>> So big +1 for it.
>> > >>>
>> > >>> I have a minor concern about possible regression in certain cases
>> due
>> > to
>> > >>> the proposed
>> > >>> JobVertex-wise scheduling which replaces current
>> ExecutionVertex-wise
>> > >>> scheduling.
>> > >>> In the proposal, looks to me it requires a stage to finish before
>> its
>> > >>> consumer stage can be
>> > >>> scheduled. This limitation, however, does not exist in current
>> > >>> scheduler. In the case that there
>> > >>> exists a POINTWISE BLOCKING edge, the downstream execution region
>> can
>> > be
>> > >>> scheduled
>> > >>> right after its connected upstream execution vertices finishes, even
>> > >>> before the whole upstream
>> > >>> stage finishes. This allows the region to be launched earlier and
>> make
>> > >>> use of available resources.
>> > >>> Do we need to let the new scheduler retain this property?
>> > >>>
>> > >>> Thanks,
>> > >>> Zhu
>> > >>>
>> > >>> Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:
>> > >>>
>> > >>>> Thanks for the quick response.
>> > >>>>
>> > >>>> *Job prioritization, Allocation IDs, Minimum resource
>> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to me.
>> > >>>>
>> > >>>> *FLIP-56*
>> > >>>> Good point about the trade-off. I believe maximum resource
>> utilization
>> > >>>> and
>> > >>>> quick deployment are desired in different scenarios. E.g., a long
>> > >>>> running
>> > >>>> streaming job deserves some deployment latency to improve the
>> resource
>> > >>>> utilization, which benefits the entire lifecycle of the job. On the
>> > >>>> other
>> > >>>> hand, short batch queries may prefer quick deployment, otherwise
>> the
>> > >>>> time
>> > >>>> for resource allocation might significantly increase the response
>> > time.
>> > >>>> It would be good enough for me to bring these questions to
>> attention.
>> > >>>> Nothing that I'm aware of should block this FLIP.
>> > >>>>
>> > >>>> Thank you~
>> > >>>>
>> > >>>> Xintong Song
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
>> [hidden email]>
>> > >>>> wrote:
>> > >>>>
>> > >>>> > Thank you Xintong for your questions!
>> > >>>> > Job prioritization
>> > >>>> > Yes, the job which declares it's initial requirements first is
>> > >>>> prioritized.
>> > >>>> > This is very much for simplicity; for example this avoids the
>> nasty
>> > >>>> case
>> > >>>> > where all jobs get some resources, but none get enough to
>> actually
>> > >>>> run the
>> > >>>> > job.
>> > >>>> > Minimum resource requirements
>> > >>>> >
>> > >>>> > My bad; at some point we want to allow the JobMaster to declare a
>> > >>>> range of
>> > >>>> > resources it could use to run a job, for example min=1,
>> target=10,
>> > >>>> > max=+inf.
>> > >>>> >
>> > >>>> > With this model, the RM would then try to balance the resources
>> such
>> > >>>> that
>> > >>>> > as many jobs as possible are as close to the target state as
>> > possible.
>> > >>>> >
>> > >>>> > Currently, the minimum/target/maximum resources are all the
>> same. So
>> > >>>> the
>> > >>>> > notification is sent whenever the current requirements cannot be
>> > met.
>> > >>>> > Allocation IDs
>> > >>>> > We do intend to, at the very least, remove AllocationIDs on the
>> > >>>> > SlotManager side, as they are just not required there.
>> > >>>> >
>> > >>>> > On the slotpool side we have to keep them around at least until
>> the
>> > >>>> > existing Slotpool implementations are removed (not sure whether
>> > we'll
>> > >>>> fully
>> > >>>> > commit to this in 1.12), since the interfaces use AllocationIDs,
>> > >>>> which also
>> > >>>> > bleed into the JobMaster.
>> > >>>> > The TaskExecutor is in a similar position.
>> > >>>> > But in the long-term, yes they will be removed, and most usages
>> will
>> > >>>> > probably be replaced by the SlotID.
>> > >>>> > FLIP-56
>> > >>>> >
>> > >>>> > Dynamic slot allocations are indeed quite interesting and raise a
>> > few
>> > >>>> > questions; for example, the main purpose of it is to ensure
>> maximum
>> > >>>> > resource utilization. In that case, should the JobMaster be
>> allowed
>> > to
>> > >>>> > re-use a slot it if the task requires less resources than the
>> slot
>> > >>>> > provides, or should it always request a new slot that exactly
>> > matches?
>> > >>>> >
>> > >>>> > There is a trade-off to be made between maximum resource
>> utilization
>> > >>>> > (request exactly matching slots, and only re-use exact matches)
>> and
>> > >>>> quicker
>> > >>>> > job deployment (re-use slot even if they don't exactly match,
>> skip
>> > >>>> > round-trip to RM).
>> > >>>> >
>> > >>>> > As for how to handle the lack of a preemptively known SlotIDs,
>> that
>> > >>>> should
>> > >>>> > be fine in and of itself; we already handle a similar case when
>> we
>> > >>>> request
>> > >>>> > a new TaskExecutor to be started. So long as there is some way to
>> > >>>> know how
>> > >>>> > many resources the TaskExecutor has in total I do not see a
>> problem
>> > >>>> at the
>> > >>>> > moment. We will get the SlotID eventually by virtue of the
>> heartbeat
>> > >>>> > SlotReport.
>> > >>>> > Implementation plan (SlotManager)
>> > >>>> > You are on the right track. The SlotManager tracks the declared
>> > >>>> resource
>> > >>>> > requirements, and if the requirements increased it creates a
>> > >>>> SlotRequest,
>> > >>>> > which then goes through similar code paths as we have at the
>> moment
>> > >>>> (try to
>> > >>>> > find a free slot, if found tell the TM, otherwise try to request
>> new
>> > >>>> TM).
>> > >>>> > The SlotManager changes are not that substantial to get a working
>> > >>>> version;
>> > >>>> > we have a PoC and most of the work went into refactoring the
>> > >>>> SlotManager
>> > >>>> > into a more manageable state. (split into several components,
>> > >>>> stricter and
>> > >>>> > simplified Slot life-cycle, ...).
>> > >>>> > Offer/free slots between JM/TM
>> > >>>> > Gotta run, but that's a good question and I'll think about. But I
>> > >>>> think it
>> > >>>> > comes down to making less changes, and being able to leverage
>> > existing
>> > >>>> > reconciliation protocols.
>> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
>> freed
>> > >>>> slots;
>> > >>>> > the heartbeat slot report is just a safety net.
>> > >>>> > I'm not sure whether slot requests are able to overtake a slot
>> > >>>> release;
>> > >>>> > @till do you have thoughts on that?
>> > >>>> > As for the race condition between the requirements reduction and
>> > slot
>> > >>>> > release, if we run into problems we have the backup plan of only
>> > >>>> releasing
>> > >>>> > the slot after the requirement reduction has been acknowledged.
>> > >>>> >
>> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
>> > >>>> >
>> > >>>> > Thanks for preparing the FLIP and driving this discussion,
>> @Chesnay
>> > &
>> > >>>> @Till.
>> > >>>> >
>> > >>>> > I really like the idea. I see a great value in the proposed
>> > >>>> declarative
>> > >>>> > resource management, in terms of flexibility, usability and
>> > >>>> efficiency.
>> > >>>> >
>> > >>>> > I have a few comments and questions regarding the FLIP design. In
>> > >>>> general,
>> > >>>> > the protocol design makes good sense to me. My main concern is
>> that
>> > >>>> it is
>> > >>>> > not very clear to me what changes are required from the
>> > >>>> > Resource/SlotManager side to adapt to the new protocol.
>> > >>>> >
>> > >>>> > *1. Distributed slots across different jobs*
>> > >>>> >
>> > >>>> > Jobs which register their requirements first, will have
>> precedence
>> > >>>> over
>> > >>>> >
>> > >>>> > other jobs also if the requirements change during the runtime.
>> > >>>> >
>> > >>>> > Just trying to understand, does this mean jobs are prioritized by
>> > the
>> > >>>> order
>> > >>>> > of their first resource declaring?
>> > >>>> >
>> > >>>> > *2. AllocationID*
>> > >>>> >
>> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
>> > >>>> >
>> > >>>> > I'm fine with this change. It seems where AllocationID is used
>> can
>> > >>>> either
>> > >>>> > be removed or be replaced by JobID. This reflects the concept
>> that
>> > >>>> slots
>> > >>>> > are now assigned to a job instead of its individual slot
>> requests.
>> > >>>> >
>> > >>>> > I would like to bring to attention that this also requires
>> changes
>> > on
>> > >>>> the
>> > >>>> > TM side, with respect to FLIP-56[1].
>> > >>>> >
>> > >>>> > In the context of dynamic slot allocation introduced by FLIP-56,
>> > >>>> slots do
>> > >>>> > not pre-exist on TM and are dynamically created when RM calls
>> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not
>> pre-exist,
>> > nor
>> > >>>> > their SlotIDs, RM requests slots from TM with a special SlotID
>> > >>>> (negative
>> > >>>> > slot index). The semantic changes from "requesting the slot
>> > >>>> identified by
>> > >>>> > the given SlotID" to "requesting a slot with the given resource
>> > >>>> profile".
>> > >>>> > The AllocationID is used for identifying the dynamic slots in
>> such
>> > >>>> cases.
>> > >>>> >
>> > >>>> > >From the perspective of FLIP-56 and fine grained resource
>> > >>>> management, I'm
>> > >>>> > fine with removing AllocationID. In the meantime, we would need
>> TM
>> > to
>> > >>>> > recognize the special negative indexed SlotID and generate a new
>> > >>>> unique
>> > >>>> > SlotID for identifying the slot.
>> > >>>> >
>> > >>>> > *3. Minimum resource requirement*
>> > >>>> >
>> > >>>> > However, we can let the JobMaster know if we cannot fulfill the
>> > >>>> minimum
>> > >>>> >
>> > >>>> > resource requirement for a job after
>> > >>>> > resourcemanager.standalone.start-up-time has passed.
>> > >>>> >
>> > >>>> > What is the "minimum resource requirement for a job"? Did I
>> overlook
>> > >>>> > anything?
>> > >>>> >
>> > >>>> > *4. Offer/free slots between JM/TM*
>> > >>>> >
>> > >>>> > This probably deserves a separate discussion thread. Just want to
>> > >>>> bring it
>> > >>>> > up.
>> > >>>> >
>> > >>>> > The idea has been coming to me for quite some time. Is this
>> design,
>> > >>>> that JM
>> > >>>> > requests resources from RM while accepting/releasing resources
>> > >>>> from/to TM,
>> > >>>> > the right thing?
>> > >>>> >
>> > >>>> > The pain point is that events of JM's activities
>> > (requesting/releasing
>> > >>>> > resources) arrive at RM out of order. This leads to several
>> > problems.
>> > >>>> >
>> > >>>> >    - When a job fails and task cancelation takes long, some of
>> the
>> > >>>> slots
>> > >>>> >    might be released from the slot pool due to being unused for a
>> > >>>> while. Then
>> > >>>> >    the job restarts and requests these slots again. At this
>> time, RM
>> > >>>> may
>> > >>>> >    receive slot requests before noticing from TM heartbeats that
>> > >>>> previous
>> > >>>> >    slots are released, thus requesting new resources. I've seen
>> many
>> > >>>> times
>> > >>>> >    that the Yarn cluster has a heavy load and is not allocating
>> > >>>> resources
>> > >>>> >    quickly enough, which leads to slot request timeout and job
>> > >>>> failover, and
>> > >>>> >    during the failover more resources are requested which adds
>> more
>> > >>>> load to
>> > >>>> >    the Yarn cluster. Happily, this should be improved with the
>> > >>>> declarative
>> > >>>> >    resource management. :)
>> > >>>> >    - As described in this FLIP, it is possible that RM learns the
>> > >>>> releasing
>> > >>>> >    of slots from TM heartbeat before noticing the resource
>> > requirement
>> > >>>> >    decreasing, it may allocate more resources which need to be
>> > >>>> released soon.
>> > >>>> >    - It complicates the ResourceManager/SlotManager, by
>> requiring an
>> > >>>> >    additional slot state PENDING, which means the slot is
>> assigned
>> > by
>> > >>>> RM but
>> > >>>> >    is not confirmed successfully ordered by TM.
>> > >>>> >
>> > >>>> > Why not just make RM offer the allocated resources (TM address,
>> > >>>> SlotID,
>> > >>>> > etc.) to JM, and JM release resources to RM? So that for all the
>> > >>>> resource
>> > >>>> > management JM talks to RM, and for the task deployment and
>> execution
>> > >>>> it
>> > >>>> > talks to TM?
>> > >>>> >
>> > >>>> > I tried to understand the benefits for having the current design,
>> > and
>> > >>>> found
>> > >>>> > the following in FLIP-6[2].
>> > >>>> >
>> > >>>> >
>> > >>>> > All that the ResourceManager does is negotiate between the
>> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its state
>> can
>> > >>>> hence
>> > >>>> > be reconstructed from re-acquiring containers and re-registration
>> > from
>> > >>>> > JobManagers and TaskManagers
>> > >>>> >
>> > >>>> > Correct me if I'm wrong, it seems the original purpose is to make
>> > >>>> sure the
>> > >>>> > assignment between jobs and slots are confirmed between JM and
>> TMs,
>> > >>>> so that
>> > >>>> > failures of RM will not lead to any inconsistency. However, this
>> > only
>> > >>>> > benefits scenarios where RM fails while JM and TMs live.
>> Currently,
>> > >>>> JM and
>> > >>>> > RM are in the same process. We do not really have any scenario
>> where
>> > >>>> RM
>> > >>>> > fails alone. We might separate JM and RM to different processes
>> in
>> > >>>> future,
>> > >>>> > but as far as I can see we don't have such requirements at the
>> > >>>> moment. It
>> > >>>> > seems to me that we are suffering the current problems,
>> complying to
>> > >>>> > potential future benefits.
>> > >>>> >
>> > >>>> > Maybe I overlooked something.
>> > >>>> >
>> > >>>> > *5. Implementation Plan*
>> > >>>> >
>> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
>> > >>>> individual slot
>> > >>>> > requests".
>> > >>>> >
>> > >>>> > For Resource/SlotManager, it seems there are quite a lot changes
>> > >>>> needed,
>> > >>>> > with the removal of individual slot requests and AllocationID.
>> It's
>> > >>>> not
>> > >>>> > clear to me what is the first step plan for RM/SM? Do we
>> internally
>> > >>>> treat
>> > >>>> > the resource requirements as individual slot requests as the
>> first
>> > >>>> step, so
>> > >>>> > only the interfaces are changed? Or do we actually change
>> > (practically
>> > >>>> > re-write) the slot allocation logics?
>> > >>>> >
>> > >>>> > Thank you~
>> > >>>> >
>> > >>>> > Xintong Song
>> > >>>> >
>> > >>>> >
>> > >>>> > [1]
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>> > >>>> > [2]
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>> > >>>> >
>> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
>> > [hidden email]>
>> > >>>> <[hidden email]> wrote:
>> > >>>> >
>> > >>>> >
>> > >>>> > Hello,
>> > >>>> >
>> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
>> slots,
>> > >>>> such
>> > >>>> > that required resources are declared before a job is scheduled
>> and
>> > th
>> > >>>> > job execution is adjusted according to the provided resources
>> (e.g.,
>> > >>>> > reducing parallelism), instead of asking for a fixed number of
>> > >>>> resources
>> > >>>> > during scheduling and failing midway through if not enough
>> resources
>> > >>>> are
>> > >>>> > available.
>> > >>>> >
>> > >>>> > This is a stepping stone towards reactive mode, where Flink will
>> > >>>> > automatically make use of new TaskExecutors being started.
>> > >>>> >
>> > >>>> > More details can be found here
>> > >>>> > <
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>> > >>>> >
>> > >>>> > .
>> > >>>> >
>> > >>>> >
>> > >>>> >
>> > >>>>
>> > >>>
>> > >>>
>> > >>
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Xintong Song
Thanks Till, the changes look good to me. Looking forward to the vote.

Thank you~

Xintong Song



On Fri, Sep 4, 2020 at 12:31 AM Till Rohrmann <[hidden email]> wrote:

> Thanks for the feedback Xintong and Zhu Zhu. I've added a bit more details
> for the intended interface extensions, potential follow ups (removing the
> AllocationIDs) and the question about whether to reuse or return a slot if
> the profiles don't fully match.
>
> If nobody objects, then I would start a vote for this FLIP soon.
>
> Cheers,
> Till
>
> On Mon, Aug 31, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote:
>
> > Thanks for the clarification @Till Rohrmann <[hidden email]>
> >
> > >> # Implications for the scheduling
> > Agreed that it turned out to be different execution strategies for batch
> > jobs.
> > We can have a simple one first and improve it later.
> >
> > Thanks,
> > Zhu
> >
> > Xintong Song <[hidden email]> 于2020年8月31日周一 下午3:05写道:
> >
> >> Thanks for the clarification, @Till.
> >>
> >> - For FLIP-56, sounds good to me. I think there should be no problem
> >> before
> >> removing AllocationID. And even after replacing AllocationID, it should
> >> only require limited effort to make FLIP-56 work with SlotID. I was just
> >> trying to understand when the effort will be needed.
> >>
> >> - For offer/release slots between JM/TM, I think you are right.
> >> Waiting on the confirmation for resource requirement decrease before
> >> freeing the slot is quite equivalent to releasing slots through RM, in
> >> terms of it practically preventing JM from releasing slots when the RM
> is
> >> absent. But this approach obviously requires less change to the current
> >> mechanism.
> >> Since the first problem can be solved by the declarative protocol, and
> the
> >> second problem can be addressed by this confirmation based approach,
> ATM I
> >> don't see any strong reason for changing to offering and releasing slots
> >> through RM, especially considering the significant changes it requires.
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <[hidden email]>
> >> wrote:
> >>
> >> > Thanks for creating this FLIP @Chesnay and the good input @Xintong and
> >> @Zhu
> >> > Zhu.
> >> >
> >> > Let me try to add some comments concerning your questions:
> >> >
> >> > # FLIP-56
> >> >
> >> > I think there is nothing fundamentally contradicting FLIP-56 in the
> FLIP
> >> > for declarative resource management. As Chesnay said, we have to keep
> >> the
> >> > AllocationID around as long as we have the old scheduler
> implementation.
> >> > Once it is replaced, we can think about using the SlotID instead of
> >> > AllocationIDs for identifying allocated slots. For dynamic slots we
> can
> >> > keep the special meaning of a SlotID with a negative index. In the
> >> future
> >> > we might think about making this encoding a bit more explicit by
> >> sending a
> >> > richer slot request object and reporting the actual SlotID back to the
> >> RM.
> >> >
> >> > For the question of resource utilization vs. deployment latency I
> >> believe
> >> > that this will be a question of requirements and preferences as you've
> >> said
> >> > Xintong. I can see that we will have different strategies to fulfill
> the
> >> > different needs.
> >> >
> >> > # Offer/free slots between JM/TM
> >> >
> >> > You are right Xintong that the existing slot protocol was developed
> with
> >> > the assumption in mind that the RM and JM can run in separate
> processes
> >> and
> >> > that a failure of the RM should only affect the JM in the sense that
> it
> >> > cannot ask for more resources. I believe that one could simplify
> things
> >> a
> >> > bit under the assumption that the RM and JM are always colocated in
> the
> >> > same process. However, the discussion whether to change it or not
> should
> >> > indeed be a separate one.
> >> >
> >> > Changing the slot protocol to a declarative resource management should
> >> > already solve the first problem you have described because we won't
> ask
> >> for
> >> > new slots in case of a failover but simply keep the same resource
> >> > requirements declared and let the RM make sure that we will receive at
> >> > least this amount of slots.
> >> >
> >> > If releasing a slot should lead to allocating new resources because
> >> > decreasing the resource requirement declaration takes longer than
> >> releasing
> >> > the slot on the TM, then we could apply what Chesnay said. By waiting
> on
> >> > the confirmation of the resource requirement decrease and then freeing
> >> the
> >> > slot on the TM gives you effectively the same behaviour as if the
> >> freeing
> >> > of the slot would be done by the RM.
> >> >
> >> > I am not entirely sure whether allocating the slots and receiving the
> >> slot
> >> > offers through the RM will allow us to get rid of the pending slot
> >> state on
> >> > the RM side. If the RM needs to communicate with the TM and we want to
> >> have
> >> > a reconciliation protocol between these components, then I think we
> >> would
> >> > have to solve the exact same problem of currently waiting on the TM
> for
> >> > confirming that a slot has been allocated.
> >> >
> >> > # Implications for the scheduling
> >> >
> >> > The FLIP does not fully cover the changes for the scheduler and mainly
> >> > drafts the rough idea. For the batch scheduling, I believe that we
> have
> >> a
> >> > couple degrees of freedom in how to do things. In the scenario you
> >> > described, one could choose a simple strategy where we wait for all
> >> > producers to stop before deciding on the parallelism of the consumer
> and
> >> > scheduling the respective tasks (even though they have POINTWISE
> >> BLOCKING
> >> > edges). Or we can try to be smart and say if we get at least one slot
> >> that
> >> > we can run the consumers with the same parallelism as the producers it
> >> just
> >> > might be that we have to run them one after another in a single slot.
> >> One
> >> > advantage of not directly schedule the first consumer when the first
> >> > producer is finished is that one might schedule the consumer stage
> with
> >> a
> >> > higher parallelism because one might acquire more resources a bit
> later.
> >> > But I would see this as different execution strategies which have
> >> different
> >> > properties.
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <[hidden email]> wrote:
> >> >
> >> > > Thanks for the explanation @Chesnay Schepler <[hidden email]> .
> >> > >
> >> > > Yes, for batch jobs it can be safe to schedule downstream vertices
> if
> >> > > there
> >> > > are enough slots in the pool, even if these slots are still in use
> at
> >> > that
> >> > > moment.
> >> > > And the job can still progress even if the vertices stick to the
> >> original
> >> > > parallelism.
> >> > >
> >> > > Looks to me several decision makings can be different for streaming
> >> and
> >> > > batch jobs.
> >> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> >> > > construction!
> >> > >
> >> > > Thanks,
> >> > > Zhu
> >> > >
> >> > > Chesnay Schepler <[hidden email]> 于2020年8月28日周五 下午4:35写道:
> >> > >
> >> > >> Maybe :)
> >> > >>
> >> > >> Imagine a case where the producer and consumer have the same
> >> > >> ResourceProfile, or at least one where the consumer requirements
> are
> >> > less
> >> > >> than the producer ones.
> >> > >> In this case, the scheduler can happily schedule consumers, because
> >> it
> >> > >> knows it will get enough slots.
> >> > >>
> >> > >> If the profiles are different, then the Scheduler _may_ wait
> >> > >> numberOf(producer) slots; it _may_ also stick with the parallelism
> >> and
> >> > >> schedule right away, in the worst case running the consumers in
> >> > sequence.
> >> > >> In fact, for batch jobs there is probably(?) never a reason for the
> >> > >> scheduler to _reduce_ the parallelism; it can always try to run
> >> things
> >> > in
> >> > >> sequence if it doesn't get enough slots.
> >> > >> Reducing the parallelism would just mean that you'd have to wait
> for
> >> > more
> >> > >> producers to finish.
> >> > >>
> >> > >> The scope of this FLIP is just the protocol, without changes to the
> >> > >> scheduler; in other words just changing how slots are acquired, but
> >> > change
> >> > >> nothing about the scheduling. That is tackled in a follow-up FLIP.
> >> > >>
> >> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
> >> > >>
> >> > >> Thanks for the response!
> >> > >>
> >> > >> >> The scheduler doesn't have to wait for one stage to finish
> >> > >> Does it mean we will declare resources and decide the parallelism
> >> for a
> >> > >> stage which is partially
> >> > >> schedulable, i.e. when input data are ready just for part of the
> >> > >> execution vertices?
> >> > >>
> >> > >> >> This will get more complicated once we allow the scheduler to
> >> change
> >> > >> the parallelism while the job is running
> >> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
> >> > avoided
> >> > >> for streaming jobs.
> >> > >> Will this FLIP limit its scope to streaming jobs, and improvements
> >> for
> >> > >> batch jobs are to be done later?
> >> > >>
> >> > >> Thanks,
> >> > >> Zhu
> >> > >>
> >> > >> Chesnay Schepler <[hidden email]> 于2020年8月28日周五 上午2:27写道:
> >> > >>
> >> > >>> The scheduler doesn't have to wait for one stage to finish. It is
> >> still
> >> > >>> aware that the upstream execution vertex has finished, and can
> >> > request/use
> >> > >>> slots accordingly to schedule the consumer.
> >> > >>>
> >> > >>> This will get more complicated once we allow the scheduler to
> change
> >> > the
> >> > >>> parallelism while the job is running, for which we will need some
> >> > >>> enhancements to the network stack to allow the producer to run
> >> without
> >> > >>> knowing the consumer parallelism ahead of time. I'm not too clear
> on
> >> > the
> >> > >>> details, but we'll some form of keygroup-like approach for sub
> >> > partitions
> >> > >>> (maxParallelism and all that).
> >> > >>>
> >> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
> >> > >>>
> >> > >>> Thanks Chesnay&Till for proposing this improvement.
> >> > >>> It's of good value to allow jobs to make best use of available
> >> > resources
> >> > >>> adaptively. Not
> >> > >>> to mention it further supports reactive mode.
> >> > >>> So big +1 for it.
> >> > >>>
> >> > >>> I have a minor concern about possible regression in certain cases
> >> due
> >> > to
> >> > >>> the proposed
> >> > >>> JobVertex-wise scheduling which replaces current
> >> ExecutionVertex-wise
> >> > >>> scheduling.
> >> > >>> In the proposal, looks to me it requires a stage to finish before
> >> its
> >> > >>> consumer stage can be
> >> > >>> scheduled. This limitation, however, does not exist in current
> >> > >>> scheduler. In the case that there
> >> > >>> exists a POINTWISE BLOCKING edge, the downstream execution region
> >> can
> >> > be
> >> > >>> scheduled
> >> > >>> right after its connected upstream execution vertices finishes,
> even
> >> > >>> before the whole upstream
> >> > >>> stage finishes. This allows the region to be launched earlier and
> >> make
> >> > >>> use of available resources.
> >> > >>> Do we need to let the new scheduler retain this property?
> >> > >>>
> >> > >>> Thanks,
> >> > >>> Zhu
> >> > >>>
> >> > >>> Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:
> >> > >>>
> >> > >>>> Thanks for the quick response.
> >> > >>>>
> >> > >>>> *Job prioritization, Allocation IDs, Minimum resource
> >> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to
> me.
> >> > >>>>
> >> > >>>> *FLIP-56*
> >> > >>>> Good point about the trade-off. I believe maximum resource
> >> utilization
> >> > >>>> and
> >> > >>>> quick deployment are desired in different scenarios. E.g., a long
> >> > >>>> running
> >> > >>>> streaming job deserves some deployment latency to improve the
> >> resource
> >> > >>>> utilization, which benefits the entire lifecycle of the job. On
> the
> >> > >>>> other
> >> > >>>> hand, short batch queries may prefer quick deployment, otherwise
> >> the
> >> > >>>> time
> >> > >>>> for resource allocation might significantly increase the response
> >> > time.
> >> > >>>> It would be good enough for me to bring these questions to
> >> attention.
> >> > >>>> Nothing that I'm aware of should block this FLIP.
> >> > >>>>
> >> > >>>> Thank you~
> >> > >>>>
> >> > >>>> Xintong Song
> >> > >>>>
> >> > >>>>
> >> > >>>>
> >> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
> >> [hidden email]>
> >> > >>>> wrote:
> >> > >>>>
> >> > >>>> > Thank you Xintong for your questions!
> >> > >>>> > Job prioritization
> >> > >>>> > Yes, the job which declares it's initial requirements first is
> >> > >>>> prioritized.
> >> > >>>> > This is very much for simplicity; for example this avoids the
> >> nasty
> >> > >>>> case
> >> > >>>> > where all jobs get some resources, but none get enough to
> >> actually
> >> > >>>> run the
> >> > >>>> > job.
> >> > >>>> > Minimum resource requirements
> >> > >>>> >
> >> > >>>> > My bad; at some point we want to allow the JobMaster to
> declare a
> >> > >>>> range of
> >> > >>>> > resources it could use to run a job, for example min=1,
> >> target=10,
> >> > >>>> > max=+inf.
> >> > >>>> >
> >> > >>>> > With this model, the RM would then try to balance the resources
> >> such
> >> > >>>> that
> >> > >>>> > as many jobs as possible are as close to the target state as
> >> > possible.
> >> > >>>> >
> >> > >>>> > Currently, the minimum/target/maximum resources are all the
> >> same. So
> >> > >>>> the
> >> > >>>> > notification is sent whenever the current requirements cannot
> be
> >> > met.
> >> > >>>> > Allocation IDs
> >> > >>>> > We do intend to, at the very least, remove AllocationIDs on the
> >> > >>>> > SlotManager side, as they are just not required there.
> >> > >>>> >
> >> > >>>> > On the slotpool side we have to keep them around at least until
> >> the
> >> > >>>> > existing Slotpool implementations are removed (not sure whether
> >> > we'll
> >> > >>>> fully
> >> > >>>> > commit to this in 1.12), since the interfaces use
> AllocationIDs,
> >> > >>>> which also
> >> > >>>> > bleed into the JobMaster.
> >> > >>>> > The TaskExecutor is in a similar position.
> >> > >>>> > But in the long-term, yes they will be removed, and most usages
> >> will
> >> > >>>> > probably be replaced by the SlotID.
> >> > >>>> > FLIP-56
> >> > >>>> >
> >> > >>>> > Dynamic slot allocations are indeed quite interesting and
> raise a
> >> > few
> >> > >>>> > questions; for example, the main purpose of it is to ensure
> >> maximum
> >> > >>>> > resource utilization. In that case, should the JobMaster be
> >> allowed
> >> > to
> >> > >>>> > re-use a slot it if the task requires less resources than the
> >> slot
> >> > >>>> > provides, or should it always request a new slot that exactly
> >> > matches?
> >> > >>>> >
> >> > >>>> > There is a trade-off to be made between maximum resource
> >> utilization
> >> > >>>> > (request exactly matching slots, and only re-use exact matches)
> >> and
> >> > >>>> quicker
> >> > >>>> > job deployment (re-use slot even if they don't exactly match,
> >> skip
> >> > >>>> > round-trip to RM).
> >> > >>>> >
> >> > >>>> > As for how to handle the lack of a preemptively known SlotIDs,
> >> that
> >> > >>>> should
> >> > >>>> > be fine in and of itself; we already handle a similar case when
> >> we
> >> > >>>> request
> >> > >>>> > a new TaskExecutor to be started. So long as there is some way
> to
> >> > >>>> know how
> >> > >>>> > many resources the TaskExecutor has in total I do not see a
> >> problem
> >> > >>>> at the
> >> > >>>> > moment. We will get the SlotID eventually by virtue of the
> >> heartbeat
> >> > >>>> > SlotReport.
> >> > >>>> > Implementation plan (SlotManager)
> >> > >>>> > You are on the right track. The SlotManager tracks the declared
> >> > >>>> resource
> >> > >>>> > requirements, and if the requirements increased it creates a
> >> > >>>> SlotRequest,
> >> > >>>> > which then goes through similar code paths as we have at the
> >> moment
> >> > >>>> (try to
> >> > >>>> > find a free slot, if found tell the TM, otherwise try to
> request
> >> new
> >> > >>>> TM).
> >> > >>>> > The SlotManager changes are not that substantial to get a
> working
> >> > >>>> version;
> >> > >>>> > we have a PoC and most of the work went into refactoring the
> >> > >>>> SlotManager
> >> > >>>> > into a more manageable state. (split into several components,
> >> > >>>> stricter and
> >> > >>>> > simplified Slot life-cycle, ...).
> >> > >>>> > Offer/free slots between JM/TM
> >> > >>>> > Gotta run, but that's a good question and I'll think about.
> But I
> >> > >>>> think it
> >> > >>>> > comes down to making less changes, and being able to leverage
> >> > existing
> >> > >>>> > reconciliation protocols.
> >> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
> >> freed
> >> > >>>> slots;
> >> > >>>> > the heartbeat slot report is just a safety net.
> >> > >>>> > I'm not sure whether slot requests are able to overtake a slot
> >> > >>>> release;
> >> > >>>> > @till do you have thoughts on that?
> >> > >>>> > As for the race condition between the requirements reduction
> and
> >> > slot
> >> > >>>> > release, if we run into problems we have the backup plan of
> only
> >> > >>>> releasing
> >> > >>>> > the slot after the requirement reduction has been acknowledged.
> >> > >>>> >
> >> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
> >> > >>>> >
> >> > >>>> > Thanks for preparing the FLIP and driving this discussion,
> >> @Chesnay
> >> > &
> >> > >>>> @Till.
> >> > >>>> >
> >> > >>>> > I really like the idea. I see a great value in the proposed
> >> > >>>> declarative
> >> > >>>> > resource management, in terms of flexibility, usability and
> >> > >>>> efficiency.
> >> > >>>> >
> >> > >>>> > I have a few comments and questions regarding the FLIP design.
> In
> >> > >>>> general,
> >> > >>>> > the protocol design makes good sense to me. My main concern is
> >> that
> >> > >>>> it is
> >> > >>>> > not very clear to me what changes are required from the
> >> > >>>> > Resource/SlotManager side to adapt to the new protocol.
> >> > >>>> >
> >> > >>>> > *1. Distributed slots across different jobs*
> >> > >>>> >
> >> > >>>> > Jobs which register their requirements first, will have
> >> precedence
> >> > >>>> over
> >> > >>>> >
> >> > >>>> > other jobs also if the requirements change during the runtime.
> >> > >>>> >
> >> > >>>> > Just trying to understand, does this mean jobs are prioritized
> by
> >> > the
> >> > >>>> order
> >> > >>>> > of their first resource declaring?
> >> > >>>> >
> >> > >>>> > *2. AllocationID*
> >> > >>>> >
> >> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
> >> > >>>> >
> >> > >>>> > I'm fine with this change. It seems where AllocationID is used
> >> can
> >> > >>>> either
> >> > >>>> > be removed or be replaced by JobID. This reflects the concept
> >> that
> >> > >>>> slots
> >> > >>>> > are now assigned to a job instead of its individual slot
> >> requests.
> >> > >>>> >
> >> > >>>> > I would like to bring to attention that this also requires
> >> changes
> >> > on
> >> > >>>> the
> >> > >>>> > TM side, with respect to FLIP-56[1].
> >> > >>>> >
> >> > >>>> > In the context of dynamic slot allocation introduced by
> FLIP-56,
> >> > >>>> slots do
> >> > >>>> > not pre-exist on TM and are dynamically created when RM calls
> >> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not
> >> pre-exist,
> >> > nor
> >> > >>>> > their SlotIDs, RM requests slots from TM with a special SlotID
> >> > >>>> (negative
> >> > >>>> > slot index). The semantic changes from "requesting the slot
> >> > >>>> identified by
> >> > >>>> > the given SlotID" to "requesting a slot with the given resource
> >> > >>>> profile".
> >> > >>>> > The AllocationID is used for identifying the dynamic slots in
> >> such
> >> > >>>> cases.
> >> > >>>> >
> >> > >>>> > >From the perspective of FLIP-56 and fine grained resource
> >> > >>>> management, I'm
> >> > >>>> > fine with removing AllocationID. In the meantime, we would need
> >> TM
> >> > to
> >> > >>>> > recognize the special negative indexed SlotID and generate a
> new
> >> > >>>> unique
> >> > >>>> > SlotID for identifying the slot.
> >> > >>>> >
> >> > >>>> > *3. Minimum resource requirement*
> >> > >>>> >
> >> > >>>> > However, we can let the JobMaster know if we cannot fulfill the
> >> > >>>> minimum
> >> > >>>> >
> >> > >>>> > resource requirement for a job after
> >> > >>>> > resourcemanager.standalone.start-up-time has passed.
> >> > >>>> >
> >> > >>>> > What is the "minimum resource requirement for a job"? Did I
> >> overlook
> >> > >>>> > anything?
> >> > >>>> >
> >> > >>>> > *4. Offer/free slots between JM/TM*
> >> > >>>> >
> >> > >>>> > This probably deserves a separate discussion thread. Just want
> to
> >> > >>>> bring it
> >> > >>>> > up.
> >> > >>>> >
> >> > >>>> > The idea has been coming to me for quite some time. Is this
> >> design,
> >> > >>>> that JM
> >> > >>>> > requests resources from RM while accepting/releasing resources
> >> > >>>> from/to TM,
> >> > >>>> > the right thing?
> >> > >>>> >
> >> > >>>> > The pain point is that events of JM's activities
> >> > (requesting/releasing
> >> > >>>> > resources) arrive at RM out of order. This leads to several
> >> > problems.
> >> > >>>> >
> >> > >>>> >    - When a job fails and task cancelation takes long, some of
> >> the
> >> > >>>> slots
> >> > >>>> >    might be released from the slot pool due to being unused
> for a
> >> > >>>> while. Then
> >> > >>>> >    the job restarts and requests these slots again. At this
> >> time, RM
> >> > >>>> may
> >> > >>>> >    receive slot requests before noticing from TM heartbeats
> that
> >> > >>>> previous
> >> > >>>> >    slots are released, thus requesting new resources. I've seen
> >> many
> >> > >>>> times
> >> > >>>> >    that the Yarn cluster has a heavy load and is not allocating
> >> > >>>> resources
> >> > >>>> >    quickly enough, which leads to slot request timeout and job
> >> > >>>> failover, and
> >> > >>>> >    during the failover more resources are requested which adds
> >> more
> >> > >>>> load to
> >> > >>>> >    the Yarn cluster. Happily, this should be improved with the
> >> > >>>> declarative
> >> > >>>> >    resource management. :)
> >> > >>>> >    - As described in this FLIP, it is possible that RM learns
> the
> >> > >>>> releasing
> >> > >>>> >    of slots from TM heartbeat before noticing the resource
> >> > requirement
> >> > >>>> >    decreasing, it may allocate more resources which need to be
> >> > >>>> released soon.
> >> > >>>> >    - It complicates the ResourceManager/SlotManager, by
> >> requiring an
> >> > >>>> >    additional slot state PENDING, which means the slot is
> >> assigned
> >> > by
> >> > >>>> RM but
> >> > >>>> >    is not confirmed successfully ordered by TM.
> >> > >>>> >
> >> > >>>> > Why not just make RM offer the allocated resources (TM address,
> >> > >>>> SlotID,
> >> > >>>> > etc.) to JM, and JM release resources to RM? So that for all
> the
> >> > >>>> resource
> >> > >>>> > management JM talks to RM, and for the task deployment and
> >> execution
> >> > >>>> it
> >> > >>>> > talks to TM?
> >> > >>>> >
> >> > >>>> > I tried to understand the benefits for having the current
> design,
> >> > and
> >> > >>>> found
> >> > >>>> > the following in FLIP-6[2].
> >> > >>>> >
> >> > >>>> >
> >> > >>>> > All that the ResourceManager does is negotiate between the
> >> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its
> state
> >> can
> >> > >>>> hence
> >> > >>>> > be reconstructed from re-acquiring containers and
> re-registration
> >> > from
> >> > >>>> > JobManagers and TaskManagers
> >> > >>>> >
> >> > >>>> > Correct me if I'm wrong, it seems the original purpose is to
> make
> >> > >>>> sure the
> >> > >>>> > assignment between jobs and slots are confirmed between JM and
> >> TMs,
> >> > >>>> so that
> >> > >>>> > failures of RM will not lead to any inconsistency. However,
> this
> >> > only
> >> > >>>> > benefits scenarios where RM fails while JM and TMs live.
> >> Currently,
> >> > >>>> JM and
> >> > >>>> > RM are in the same process. We do not really have any scenario
> >> where
> >> > >>>> RM
> >> > >>>> > fails alone. We might separate JM and RM to different processes
> >> in
> >> > >>>> future,
> >> > >>>> > but as far as I can see we don't have such requirements at the
> >> > >>>> moment. It
> >> > >>>> > seems to me that we are suffering the current problems,
> >> complying to
> >> > >>>> > potential future benefits.
> >> > >>>> >
> >> > >>>> > Maybe I overlooked something.
> >> > >>>> >
> >> > >>>> > *5. Implementation Plan*
> >> > >>>> >
> >> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
> >> > >>>> individual slot
> >> > >>>> > requests".
> >> > >>>> >
> >> > >>>> > For Resource/SlotManager, it seems there are quite a lot
> changes
> >> > >>>> needed,
> >> > >>>> > with the removal of individual slot requests and AllocationID.
> >> It's
> >> > >>>> not
> >> > >>>> > clear to me what is the first step plan for RM/SM? Do we
> >> internally
> >> > >>>> treat
> >> > >>>> > the resource requirements as individual slot requests as the
> >> first
> >> > >>>> step, so
> >> > >>>> > only the interfaces are changed? Or do we actually change
> >> > (practically
> >> > >>>> > re-write) the slot allocation logics?
> >> > >>>> >
> >> > >>>> > Thank you~
> >> > >>>> >
> >> > >>>> > Xintong Song
> >> > >>>> >
> >> > >>>> >
> >> > >>>> > [1]
> >> > >>>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> >> > >>>> > [2]
> >> > >>>>
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> >> > >>>> >
> >> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
> >> > [hidden email]>
> >> > >>>> <[hidden email]> wrote:
> >> > >>>> >
> >> > >>>> >
> >> > >>>> > Hello,
> >> > >>>> >
> >> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
> >> slots,
> >> > >>>> such
> >> > >>>> > that required resources are declared before a job is scheduled
> >> and
> >> > th
> >> > >>>> > job execution is adjusted according to the provided resources
> >> (e.g.,
> >> > >>>> > reducing parallelism), instead of asking for a fixed number of
> >> > >>>> resources
> >> > >>>> > during scheduling and failing midway through if not enough
> >> resources
> >> > >>>> are
> >> > >>>> > available.
> >> > >>>> >
> >> > >>>> > This is a stepping stone towards reactive mode, where Flink
> will
> >> > >>>> > automatically make use of new TaskExecutors being started.
> >> > >>>> >
> >> > >>>> > More details can be found here
> >> > >>>> > <
> >> > >>>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> >> > >>>> >
> >> > >>>> > .
> >> > >>>> >
> >> > >>>> >
> >> > >>>> >
> >> > >>>>
> >> > >>>
> >> > >>>
> >> > >>
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-138: Declarative Resource management

Zhu Zhu
The new edits look good to me.
Looking forward to the vote.

Thanks,
Zhu

Xintong Song <[hidden email]> 于2020年9月4日周五 上午9:49写道:

> Thanks Till, the changes look good to me. Looking forward to the vote.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Sep 4, 2020 at 12:31 AM Till Rohrmann <[hidden email]>
> wrote:
>
> > Thanks for the feedback Xintong and Zhu Zhu. I've added a bit more
> details
> > for the intended interface extensions, potential follow ups (removing the
> > AllocationIDs) and the question about whether to reuse or return a slot
> if
> > the profiles don't fully match.
> >
> > If nobody objects, then I would start a vote for this FLIP soon.
> >
> > Cheers,
> > Till
> >
> > On Mon, Aug 31, 2020 at 11:53 AM Zhu Zhu <[hidden email]> wrote:
> >
> > > Thanks for the clarification @Till Rohrmann <[hidden email]>
> > >
> > > >> # Implications for the scheduling
> > > Agreed that it turned out to be different execution strategies for
> batch
> > > jobs.
> > > We can have a simple one first and improve it later.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Xintong Song <[hidden email]> 于2020年8月31日周一 下午3:05写道:
> > >
> > >> Thanks for the clarification, @Till.
> > >>
> > >> - For FLIP-56, sounds good to me. I think there should be no problem
> > >> before
> > >> removing AllocationID. And even after replacing AllocationID, it
> should
> > >> only require limited effort to make FLIP-56 work with SlotID. I was
> just
> > >> trying to understand when the effort will be needed.
> > >>
> > >> - For offer/release slots between JM/TM, I think you are right.
> > >> Waiting on the confirmation for resource requirement decrease before
> > >> freeing the slot is quite equivalent to releasing slots through RM, in
> > >> terms of it practically preventing JM from releasing slots when the RM
> > is
> > >> absent. But this approach obviously requires less change to the
> current
> > >> mechanism.
> > >> Since the first problem can be solved by the declarative protocol, and
> > the
> > >> second problem can be addressed by this confirmation based approach,
> > ATM I
> > >> don't see any strong reason for changing to offering and releasing
> slots
> > >> through RM, especially considering the significant changes it
> requires.
> > >>
> > >> Thank you~
> > >>
> > >> Xintong Song
> > >>
> > >>
> > >>
> > >> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <[hidden email]>
> > >> wrote:
> > >>
> > >> > Thanks for creating this FLIP @Chesnay and the good input @Xintong
> and
> > >> @Zhu
> > >> > Zhu.
> > >> >
> > >> > Let me try to add some comments concerning your questions:
> > >> >
> > >> > # FLIP-56
> > >> >
> > >> > I think there is nothing fundamentally contradicting FLIP-56 in the
> > FLIP
> > >> > for declarative resource management. As Chesnay said, we have to
> keep
> > >> the
> > >> > AllocationID around as long as we have the old scheduler
> > implementation.
> > >> > Once it is replaced, we can think about using the SlotID instead of
> > >> > AllocationIDs for identifying allocated slots. For dynamic slots we
> > can
> > >> > keep the special meaning of a SlotID with a negative index. In the
> > >> future
> > >> > we might think about making this encoding a bit more explicit by
> > >> sending a
> > >> > richer slot request object and reporting the actual SlotID back to
> the
> > >> RM.
> > >> >
> > >> > For the question of resource utilization vs. deployment latency I
> > >> believe
> > >> > that this will be a question of requirements and preferences as
> you've
> > >> said
> > >> > Xintong. I can see that we will have different strategies to fulfill
> > the
> > >> > different needs.
> > >> >
> > >> > # Offer/free slots between JM/TM
> > >> >
> > >> > You are right Xintong that the existing slot protocol was developed
> > with
> > >> > the assumption in mind that the RM and JM can run in separate
> > processes
> > >> and
> > >> > that a failure of the RM should only affect the JM in the sense that
> > it
> > >> > cannot ask for more resources. I believe that one could simplify
> > things
> > >> a
> > >> > bit under the assumption that the RM and JM are always colocated in
> > the
> > >> > same process. However, the discussion whether to change it or not
> > should
> > >> > indeed be a separate one.
> > >> >
> > >> > Changing the slot protocol to a declarative resource management
> should
> > >> > already solve the first problem you have described because we won't
> > ask
> > >> for
> > >> > new slots in case of a failover but simply keep the same resource
> > >> > requirements declared and let the RM make sure that we will receive
> at
> > >> > least this amount of slots.
> > >> >
> > >> > If releasing a slot should lead to allocating new resources because
> > >> > decreasing the resource requirement declaration takes longer than
> > >> releasing
> > >> > the slot on the TM, then we could apply what Chesnay said. By
> waiting
> > on
> > >> > the confirmation of the resource requirement decrease and then
> freeing
> > >> the
> > >> > slot on the TM gives you effectively the same behaviour as if the
> > >> freeing
> > >> > of the slot would be done by the RM.
> > >> >
> > >> > I am not entirely sure whether allocating the slots and receiving
> the
> > >> slot
> > >> > offers through the RM will allow us to get rid of the pending slot
> > >> state on
> > >> > the RM side. If the RM needs to communicate with the TM and we want
> to
> > >> have
> > >> > a reconciliation protocol between these components, then I think we
> > >> would
> > >> > have to solve the exact same problem of currently waiting on the TM
> > for
> > >> > confirming that a slot has been allocated.
> > >> >
> > >> > # Implications for the scheduling
> > >> >
> > >> > The FLIP does not fully cover the changes for the scheduler and
> mainly
> > >> > drafts the rough idea. For the batch scheduling, I believe that we
> > have
> > >> a
> > >> > couple degrees of freedom in how to do things. In the scenario you
> > >> > described, one could choose a simple strategy where we wait for all
> > >> > producers to stop before deciding on the parallelism of the consumer
> > and
> > >> > scheduling the respective tasks (even though they have POINTWISE
> > >> BLOCKING
> > >> > edges). Or we can try to be smart and say if we get at least one
> slot
> > >> that
> > >> > we can run the consumers with the same parallelism as the producers
> it
> > >> just
> > >> > might be that we have to run them one after another in a single
> slot.
> > >> One
> > >> > advantage of not directly schedule the first consumer when the first
> > >> > producer is finished is that one might schedule the consumer stage
> > with
> > >> a
> > >> > higher parallelism because one might acquire more resources a bit
> > later.
> > >> > But I would see this as different execution strategies which have
> > >> different
> > >> > properties.
> > >> >
> > >> > Cheers,
> > >> > Till
> > >> >
> > >> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <[hidden email]> wrote:
> > >> >
> > >> > > Thanks for the explanation @Chesnay Schepler <[hidden email]>
> .
> > >> > >
> > >> > > Yes, for batch jobs it can be safe to schedule downstream vertices
> > if
> > >> > > there
> > >> > > are enough slots in the pool, even if these slots are still in use
> > at
> > >> > that
> > >> > > moment.
> > >> > > And the job can still progress even if the vertices stick to the
> > >> original
> > >> > > parallelism.
> > >> > >
> > >> > > Looks to me several decision makings can be different for
> streaming
> > >> and
> > >> > > batch jobs.
> > >> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> > >> > > construction!
> > >> > >
> > >> > > Thanks,
> > >> > > Zhu
> > >> > >
> > >> > > Chesnay Schepler <[hidden email]> 于2020年8月28日周五 下午4:35写道:
> > >> > >
> > >> > >> Maybe :)
> > >> > >>
> > >> > >> Imagine a case where the producer and consumer have the same
> > >> > >> ResourceProfile, or at least one where the consumer requirements
> > are
> > >> > less
> > >> > >> than the producer ones.
> > >> > >> In this case, the scheduler can happily schedule consumers,
> because
> > >> it
> > >> > >> knows it will get enough slots.
> > >> > >>
> > >> > >> If the profiles are different, then the Scheduler _may_ wait
> > >> > >> numberOf(producer) slots; it _may_ also stick with the
> parallelism
> > >> and
> > >> > >> schedule right away, in the worst case running the consumers in
> > >> > sequence.
> > >> > >> In fact, for batch jobs there is probably(?) never a reason for
> the
> > >> > >> scheduler to _reduce_ the parallelism; it can always try to run
> > >> things
> > >> > in
> > >> > >> sequence if it doesn't get enough slots.
> > >> > >> Reducing the parallelism would just mean that you'd have to wait
> > for
> > >> > more
> > >> > >> producers to finish.
> > >> > >>
> > >> > >> The scope of this FLIP is just the protocol, without changes to
> the
> > >> > >> scheduler; in other words just changing how slots are acquired,
> but
> > >> > change
> > >> > >> nothing about the scheduling. That is tackled in a follow-up
> FLIP.
> > >> > >>
> > >> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
> > >> > >>
> > >> > >> Thanks for the response!
> > >> > >>
> > >> > >> >> The scheduler doesn't have to wait for one stage to finish
> > >> > >> Does it mean we will declare resources and decide the parallelism
> > >> for a
> > >> > >> stage which is partially
> > >> > >> schedulable, i.e. when input data are ready just for part of the
> > >> > >> execution vertices?
> > >> > >>
> > >> > >> >> This will get more complicated once we allow the scheduler to
> > >> change
> > >> > >> the parallelism while the job is running
> > >> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
> > >> > avoided
> > >> > >> for streaming jobs.
> > >> > >> Will this FLIP limit its scope to streaming jobs, and
> improvements
> > >> for
> > >> > >> batch jobs are to be done later?
> > >> > >>
> > >> > >> Thanks,
> > >> > >> Zhu
> > >> > >>
> > >> > >> Chesnay Schepler <[hidden email]> 于2020年8月28日周五 上午2:27写道:
> > >> > >>
> > >> > >>> The scheduler doesn't have to wait for one stage to finish. It
> is
> > >> still
> > >> > >>> aware that the upstream execution vertex has finished, and can
> > >> > request/use
> > >> > >>> slots accordingly to schedule the consumer.
> > >> > >>>
> > >> > >>> This will get more complicated once we allow the scheduler to
> > change
> > >> > the
> > >> > >>> parallelism while the job is running, for which we will need
> some
> > >> > >>> enhancements to the network stack to allow the producer to run
> > >> without
> > >> > >>> knowing the consumer parallelism ahead of time. I'm not too
> clear
> > on
> > >> > the
> > >> > >>> details, but we'll some form of keygroup-like approach for sub
> > >> > partitions
> > >> > >>> (maxParallelism and all that).
> > >> > >>>
> > >> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
> > >> > >>>
> > >> > >>> Thanks Chesnay&Till for proposing this improvement.
> > >> > >>> It's of good value to allow jobs to make best use of available
> > >> > resources
> > >> > >>> adaptively. Not
> > >> > >>> to mention it further supports reactive mode.
> > >> > >>> So big +1 for it.
> > >> > >>>
> > >> > >>> I have a minor concern about possible regression in certain
> cases
> > >> due
> > >> > to
> > >> > >>> the proposed
> > >> > >>> JobVertex-wise scheduling which replaces current
> > >> ExecutionVertex-wise
> > >> > >>> scheduling.
> > >> > >>> In the proposal, looks to me it requires a stage to finish
> before
> > >> its
> > >> > >>> consumer stage can be
> > >> > >>> scheduled. This limitation, however, does not exist in current
> > >> > >>> scheduler. In the case that there
> > >> > >>> exists a POINTWISE BLOCKING edge, the downstream execution
> region
> > >> can
> > >> > be
> > >> > >>> scheduled
> > >> > >>> right after its connected upstream execution vertices finishes,
> > even
> > >> > >>> before the whole upstream
> > >> > >>> stage finishes. This allows the region to be launched earlier
> and
> > >> make
> > >> > >>> use of available resources.
> > >> > >>> Do we need to let the new scheduler retain this property?
> > >> > >>>
> > >> > >>> Thanks,
> > >> > >>> Zhu
> > >> > >>>
> > >> > >>> Xintong Song <[hidden email]> 于2020年8月26日周三 下午6:59写道:
> > >> > >>>
> > >> > >>>> Thanks for the quick response.
> > >> > >>>>
> > >> > >>>> *Job prioritization, Allocation IDs, Minimum resource
> > >> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to
> > me.
> > >> > >>>>
> > >> > >>>> *FLIP-56*
> > >> > >>>> Good point about the trade-off. I believe maximum resource
> > >> utilization
> > >> > >>>> and
> > >> > >>>> quick deployment are desired in different scenarios. E.g., a
> long
> > >> > >>>> running
> > >> > >>>> streaming job deserves some deployment latency to improve the
> > >> resource
> > >> > >>>> utilization, which benefits the entire lifecycle of the job. On
> > the
> > >> > >>>> other
> > >> > >>>> hand, short batch queries may prefer quick deployment,
> otherwise
> > >> the
> > >> > >>>> time
> > >> > >>>> for resource allocation might significantly increase the
> response
> > >> > time.
> > >> > >>>> It would be good enough for me to bring these questions to
> > >> attention.
> > >> > >>>> Nothing that I'm aware of should block this FLIP.
> > >> > >>>>
> > >> > >>>> Thank you~
> > >> > >>>>
> > >> > >>>> Xintong Song
> > >> > >>>>
> > >> > >>>>
> > >> > >>>>
> > >> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
> > >> [hidden email]>
> > >> > >>>> wrote:
> > >> > >>>>
> > >> > >>>> > Thank you Xintong for your questions!
> > >> > >>>> > Job prioritization
> > >> > >>>> > Yes, the job which declares it's initial requirements first
> is
> > >> > >>>> prioritized.
> > >> > >>>> > This is very much for simplicity; for example this avoids the
> > >> nasty
> > >> > >>>> case
> > >> > >>>> > where all jobs get some resources, but none get enough to
> > >> actually
> > >> > >>>> run the
> > >> > >>>> > job.
> > >> > >>>> > Minimum resource requirements
> > >> > >>>> >
> > >> > >>>> > My bad; at some point we want to allow the JobMaster to
> > declare a
> > >> > >>>> range of
> > >> > >>>> > resources it could use to run a job, for example min=1,
> > >> target=10,
> > >> > >>>> > max=+inf.
> > >> > >>>> >
> > >> > >>>> > With this model, the RM would then try to balance the
> resources
> > >> such
> > >> > >>>> that
> > >> > >>>> > as many jobs as possible are as close to the target state as
> > >> > possible.
> > >> > >>>> >
> > >> > >>>> > Currently, the minimum/target/maximum resources are all the
> > >> same. So
> > >> > >>>> the
> > >> > >>>> > notification is sent whenever the current requirements cannot
> > be
> > >> > met.
> > >> > >>>> > Allocation IDs
> > >> > >>>> > We do intend to, at the very least, remove AllocationIDs on
> the
> > >> > >>>> > SlotManager side, as they are just not required there.
> > >> > >>>> >
> > >> > >>>> > On the slotpool side we have to keep them around at least
> until
> > >> the
> > >> > >>>> > existing Slotpool implementations are removed (not sure
> whether
> > >> > we'll
> > >> > >>>> fully
> > >> > >>>> > commit to this in 1.12), since the interfaces use
> > AllocationIDs,
> > >> > >>>> which also
> > >> > >>>> > bleed into the JobMaster.
> > >> > >>>> > The TaskExecutor is in a similar position.
> > >> > >>>> > But in the long-term, yes they will be removed, and most
> usages
> > >> will
> > >> > >>>> > probably be replaced by the SlotID.
> > >> > >>>> > FLIP-56
> > >> > >>>> >
> > >> > >>>> > Dynamic slot allocations are indeed quite interesting and
> > raise a
> > >> > few
> > >> > >>>> > questions; for example, the main purpose of it is to ensure
> > >> maximum
> > >> > >>>> > resource utilization. In that case, should the JobMaster be
> > >> allowed
> > >> > to
> > >> > >>>> > re-use a slot it if the task requires less resources than the
> > >> slot
> > >> > >>>> > provides, or should it always request a new slot that exactly
> > >> > matches?
> > >> > >>>> >
> > >> > >>>> > There is a trade-off to be made between maximum resource
> > >> utilization
> > >> > >>>> > (request exactly matching slots, and only re-use exact
> matches)
> > >> and
> > >> > >>>> quicker
> > >> > >>>> > job deployment (re-use slot even if they don't exactly match,
> > >> skip
> > >> > >>>> > round-trip to RM).
> > >> > >>>> >
> > >> > >>>> > As for how to handle the lack of a preemptively known
> SlotIDs,
> > >> that
> > >> > >>>> should
> > >> > >>>> > be fine in and of itself; we already handle a similar case
> when
> > >> we
> > >> > >>>> request
> > >> > >>>> > a new TaskExecutor to be started. So long as there is some
> way
> > to
> > >> > >>>> know how
> > >> > >>>> > many resources the TaskExecutor has in total I do not see a
> > >> problem
> > >> > >>>> at the
> > >> > >>>> > moment. We will get the SlotID eventually by virtue of the
> > >> heartbeat
> > >> > >>>> > SlotReport.
> > >> > >>>> > Implementation plan (SlotManager)
> > >> > >>>> > You are on the right track. The SlotManager tracks the
> declared
> > >> > >>>> resource
> > >> > >>>> > requirements, and if the requirements increased it creates a
> > >> > >>>> SlotRequest,
> > >> > >>>> > which then goes through similar code paths as we have at the
> > >> moment
> > >> > >>>> (try to
> > >> > >>>> > find a free slot, if found tell the TM, otherwise try to
> > request
> > >> new
> > >> > >>>> TM).
> > >> > >>>> > The SlotManager changes are not that substantial to get a
> > working
> > >> > >>>> version;
> > >> > >>>> > we have a PoC and most of the work went into refactoring the
> > >> > >>>> SlotManager
> > >> > >>>> > into a more manageable state. (split into several components,
> > >> > >>>> stricter and
> > >> > >>>> > simplified Slot life-cycle, ...).
> > >> > >>>> > Offer/free slots between JM/TM
> > >> > >>>> > Gotta run, but that's a good question and I'll think about.
> > But I
> > >> > >>>> think it
> > >> > >>>> > comes down to making less changes, and being able to leverage
> > >> > existing
> > >> > >>>> > reconciliation protocols.
> > >> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
> > >> freed
> > >> > >>>> slots;
> > >> > >>>> > the heartbeat slot report is just a safety net.
> > >> > >>>> > I'm not sure whether slot requests are able to overtake a
> slot
> > >> > >>>> release;
> > >> > >>>> > @till do you have thoughts on that?
> > >> > >>>> > As for the race condition between the requirements reduction
> > and
> > >> > slot
> > >> > >>>> > release, if we run into problems we have the backup plan of
> > only
> > >> > >>>> releasing
> > >> > >>>> > the slot after the requirement reduction has been
> acknowledged.
> > >> > >>>> >
> > >> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
> > >> > >>>> >
> > >> > >>>> > Thanks for preparing the FLIP and driving this discussion,
> > >> @Chesnay
> > >> > &
> > >> > >>>> @Till.
> > >> > >>>> >
> > >> > >>>> > I really like the idea. I see a great value in the proposed
> > >> > >>>> declarative
> > >> > >>>> > resource management, in terms of flexibility, usability and
> > >> > >>>> efficiency.
> > >> > >>>> >
> > >> > >>>> > I have a few comments and questions regarding the FLIP
> design.
> > In
> > >> > >>>> general,
> > >> > >>>> > the protocol design makes good sense to me. My main concern
> is
> > >> that
> > >> > >>>> it is
> > >> > >>>> > not very clear to me what changes are required from the
> > >> > >>>> > Resource/SlotManager side to adapt to the new protocol.
> > >> > >>>> >
> > >> > >>>> > *1. Distributed slots across different jobs*
> > >> > >>>> >
> > >> > >>>> > Jobs which register their requirements first, will have
> > >> precedence
> > >> > >>>> over
> > >> > >>>> >
> > >> > >>>> > other jobs also if the requirements change during the
> runtime.
> > >> > >>>> >
> > >> > >>>> > Just trying to understand, does this mean jobs are
> prioritized
> > by
> > >> > the
> > >> > >>>> order
> > >> > >>>> > of their first resource declaring?
> > >> > >>>> >
> > >> > >>>> > *2. AllocationID*
> > >> > >>>> >
> > >> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
> > >> > >>>> >
> > >> > >>>> > I'm fine with this change. It seems where AllocationID is
> used
> > >> can
> > >> > >>>> either
> > >> > >>>> > be removed or be replaced by JobID. This reflects the concept
> > >> that
> > >> > >>>> slots
> > >> > >>>> > are now assigned to a job instead of its individual slot
> > >> requests.
> > >> > >>>> >
> > >> > >>>> > I would like to bring to attention that this also requires
> > >> changes
> > >> > on
> > >> > >>>> the
> > >> > >>>> > TM side, with respect to FLIP-56[1].
> > >> > >>>> >
> > >> > >>>> > In the context of dynamic slot allocation introduced by
> > FLIP-56,
> > >> > >>>> slots do
> > >> > >>>> > not pre-exist on TM and are dynamically created when RM calls
> > >> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not
> > >> pre-exist,
> > >> > nor
> > >> > >>>> > their SlotIDs, RM requests slots from TM with a special
> SlotID
> > >> > >>>> (negative
> > >> > >>>> > slot index). The semantic changes from "requesting the slot
> > >> > >>>> identified by
> > >> > >>>> > the given SlotID" to "requesting a slot with the given
> resource
> > >> > >>>> profile".
> > >> > >>>> > The AllocationID is used for identifying the dynamic slots in
> > >> such
> > >> > >>>> cases.
> > >> > >>>> >
> > >> > >>>> > >From the perspective of FLIP-56 and fine grained resource
> > >> > >>>> management, I'm
> > >> > >>>> > fine with removing AllocationID. In the meantime, we would
> need
> > >> TM
> > >> > to
> > >> > >>>> > recognize the special negative indexed SlotID and generate a
> > new
> > >> > >>>> unique
> > >> > >>>> > SlotID for identifying the slot.
> > >> > >>>> >
> > >> > >>>> > *3. Minimum resource requirement*
> > >> > >>>> >
> > >> > >>>> > However, we can let the JobMaster know if we cannot fulfill
> the
> > >> > >>>> minimum
> > >> > >>>> >
> > >> > >>>> > resource requirement for a job after
> > >> > >>>> > resourcemanager.standalone.start-up-time has passed.
> > >> > >>>> >
> > >> > >>>> > What is the "minimum resource requirement for a job"? Did I
> > >> overlook
> > >> > >>>> > anything?
> > >> > >>>> >
> > >> > >>>> > *4. Offer/free slots between JM/TM*
> > >> > >>>> >
> > >> > >>>> > This probably deserves a separate discussion thread. Just
> want
> > to
> > >> > >>>> bring it
> > >> > >>>> > up.
> > >> > >>>> >
> > >> > >>>> > The idea has been coming to me for quite some time. Is this
> > >> design,
> > >> > >>>> that JM
> > >> > >>>> > requests resources from RM while accepting/releasing
> resources
> > >> > >>>> from/to TM,
> > >> > >>>> > the right thing?
> > >> > >>>> >
> > >> > >>>> > The pain point is that events of JM's activities
> > >> > (requesting/releasing
> > >> > >>>> > resources) arrive at RM out of order. This leads to several
> > >> > problems.
> > >> > >>>> >
> > >> > >>>> >    - When a job fails and task cancelation takes long, some
> of
> > >> the
> > >> > >>>> slots
> > >> > >>>> >    might be released from the slot pool due to being unused
> > for a
> > >> > >>>> while. Then
> > >> > >>>> >    the job restarts and requests these slots again. At this
> > >> time, RM
> > >> > >>>> may
> > >> > >>>> >    receive slot requests before noticing from TM heartbeats
> > that
> > >> > >>>> previous
> > >> > >>>> >    slots are released, thus requesting new resources. I've
> seen
> > >> many
> > >> > >>>> times
> > >> > >>>> >    that the Yarn cluster has a heavy load and is not
> allocating
> > >> > >>>> resources
> > >> > >>>> >    quickly enough, which leads to slot request timeout and
> job
> > >> > >>>> failover, and
> > >> > >>>> >    during the failover more resources are requested which
> adds
> > >> more
> > >> > >>>> load to
> > >> > >>>> >    the Yarn cluster. Happily, this should be improved with
> the
> > >> > >>>> declarative
> > >> > >>>> >    resource management. :)
> > >> > >>>> >    - As described in this FLIP, it is possible that RM learns
> > the
> > >> > >>>> releasing
> > >> > >>>> >    of slots from TM heartbeat before noticing the resource
> > >> > requirement
> > >> > >>>> >    decreasing, it may allocate more resources which need to
> be
> > >> > >>>> released soon.
> > >> > >>>> >    - It complicates the ResourceManager/SlotManager, by
> > >> requiring an
> > >> > >>>> >    additional slot state PENDING, which means the slot is
> > >> assigned
> > >> > by
> > >> > >>>> RM but
> > >> > >>>> >    is not confirmed successfully ordered by TM.
> > >> > >>>> >
> > >> > >>>> > Why not just make RM offer the allocated resources (TM
> address,
> > >> > >>>> SlotID,
> > >> > >>>> > etc.) to JM, and JM release resources to RM? So that for all
> > the
> > >> > >>>> resource
> > >> > >>>> > management JM talks to RM, and for the task deployment and
> > >> execution
> > >> > >>>> it
> > >> > >>>> > talks to TM?
> > >> > >>>> >
> > >> > >>>> > I tried to understand the benefits for having the current
> > design,
> > >> > and
> > >> > >>>> found
> > >> > >>>> > the following in FLIP-6[2].
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > All that the ResourceManager does is negotiate between the
> > >> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its
> > state
> > >> can
> > >> > >>>> hence
> > >> > >>>> > be reconstructed from re-acquiring containers and
> > re-registration
> > >> > from
> > >> > >>>> > JobManagers and TaskManagers
> > >> > >>>> >
> > >> > >>>> > Correct me if I'm wrong, it seems the original purpose is to
> > make
> > >> > >>>> sure the
> > >> > >>>> > assignment between jobs and slots are confirmed between JM
> and
> > >> TMs,
> > >> > >>>> so that
> > >> > >>>> > failures of RM will not lead to any inconsistency. However,
> > this
> > >> > only
> > >> > >>>> > benefits scenarios where RM fails while JM and TMs live.
> > >> Currently,
> > >> > >>>> JM and
> > >> > >>>> > RM are in the same process. We do not really have any
> scenario
> > >> where
> > >> > >>>> RM
> > >> > >>>> > fails alone. We might separate JM and RM to different
> processes
> > >> in
> > >> > >>>> future,
> > >> > >>>> > but as far as I can see we don't have such requirements at
> the
> > >> > >>>> moment. It
> > >> > >>>> > seems to me that we are suffering the current problems,
> > >> complying to
> > >> > >>>> > potential future benefits.
> > >> > >>>> >
> > >> > >>>> > Maybe I overlooked something.
> > >> > >>>> >
> > >> > >>>> > *5. Implementation Plan*
> > >> > >>>> >
> > >> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
> > >> > >>>> individual slot
> > >> > >>>> > requests".
> > >> > >>>> >
> > >> > >>>> > For Resource/SlotManager, it seems there are quite a lot
> > changes
> > >> > >>>> needed,
> > >> > >>>> > with the removal of individual slot requests and
> AllocationID.
> > >> It's
> > >> > >>>> not
> > >> > >>>> > clear to me what is the first step plan for RM/SM? Do we
> > >> internally
> > >> > >>>> treat
> > >> > >>>> > the resource requirements as individual slot requests as the
> > >> first
> > >> > >>>> step, so
> > >> > >>>> > only the interfaces are changed? Or do we actually change
> > >> > (practically
> > >> > >>>> > re-write) the slot allocation logics?
> > >> > >>>> >
> > >> > >>>> > Thank you~
> > >> > >>>> >
> > >> > >>>> > Xintong Song
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > [1]
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> > >> > >>>> > [2]
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> > >> > >>>> >
> > >> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
> > >> > [hidden email]>
> > >> > >>>> <[hidden email]> wrote:
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > Hello,
> > >> > >>>> >
> > >> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
> > >> slots,
> > >> > >>>> such
> > >> > >>>> > that required resources are declared before a job is
> scheduled
> > >> and
> > >> > th
> > >> > >>>> > job execution is adjusted according to the provided resources
> > >> (e.g.,
> > >> > >>>> > reducing parallelism), instead of asking for a fixed number
> of
> > >> > >>>> resources
> > >> > >>>> > during scheduling and failing midway through if not enough
> > >> resources
> > >> > >>>> are
> > >> > >>>> > available.
> > >> > >>>> >
> > >> > >>>> > This is a stepping stone towards reactive mode, where Flink
> > will
> > >> > >>>> > automatically make use of new TaskExecutors being started.
> > >> > >>>> >
> > >> > >>>> > More details can be found here
> > >> > >>>> > <
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> > >> > >>>> >
> > >> > >>>> > .
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>>
> > >> > >>>
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> > >
> >
>