The partition tracker should support remote shuffle properly

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

The partition tracker should support remote shuffle properly

XING JIN
Hi devs ~
Recently our team designed and started to build Flink remote shuffle
service based on 'pluggable shuffle service framework'[1] for batch
processing jobs. We found some potential enhancements could be made on
'pluggable shuffle service' and created an umbrella JIRA[2]. I raise this
DISCUSSION and want to hear broader feedback / comments on one ticket [3]
-- "The partition tracker should support remote shuffle properly".

In current Flink, data partition is bound with the ResourceID of TM in
Execution#startTrackingPartitions and JM partition tracker will stop
tracking corresponding partitions when a TM
disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle
data is bound with computing resource (TM). It works fine for internal
shuffle service, but doesn't for remote shuffle service. Note that shuffle
data is accommodated on remote, the lifecycle of a completed partition is
capable to be decoupled with TM, i.e. TM is totally fine to be released
when no computing task is on it and further shuffle reading requests could
be directed to remote shuffle cluster. In addition, when a TM is lost, its
completed data partitions on remote shuffle cluster could avoid reproducing.

The issue mentioned above is because Flink JobMasterPartitionTracker mixed
up partition's locationID (where the partition is located) and tmID (which
TM the partition is produced from). In TM internal shuffle, partition's
locationID is the same with tmID, but it is not in remote shuffle;
JobMasterPartitionTracker as an independent component should be able to
differentiate locationID and tmID of a partition, thus to handle the
lifecycle of a partition properly;

We propose that JobMasterPartitionTracker indexes partitions with both
locationID and tmID. The process of registration and unregistration will be
like below:

A. Partition Registration
- Execution#registerProducedPartitions registers partition to ShuffleMaster
and gets a ShuffleDescriptor. Current
ShuffleDescriptor#storesLocalResourcesOn returns the location of the
producing TM ONLY IF the partition occupies local resources there. We
propose to change this method a proper name and always return the
locationID of the partition. It might be as below:
    ResourceID getLocationID();
- Execution#registerProducePartitions then registers partition to
JMPartitionTracker with tmID (ResourceID of TaskManager from
TaskManagerLocation) and the locationID (acquired in above step).
JobMasterPartitionTracker will indexes a partition with both tmID and
locationID;

B. Invokes from JM and ShuffleMaster
JobMasterPartitionTracker listens invokes from both JM and ShuffleMaster.
- When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a
TM disconnects, it will check whether the disconnected tmID equals a
certain locationID of a partition. If so, tracking of the corresponding
partition will be stopped.
- When JobMasterPartitionTracker hears from ShuffleMaster that a data
location gets lost, it will unregister corresponding partitions by
locationID;

C. Partition Unregistration
When unregister a partition, JobMasterPartitionTracker removes the
corresponding indexes to tmID and locationID firstly, and then release the
partition according to shuffle service types:
- If the locationID equals to the tmID, it indicates the partition is
accommodated by TM internal shuffle service, JMPartitionTracker will invoke
TaskExecutorGateway for the release;
- If the locationID doesn't equal to tmID, it indicates the partition is
accommodated by external shuffle service, JMPartitionTracker will invoke
ShuffleMaster for the release;

With the above change, JobMasterPartitionTracker can adapt with customized
shuffle service properly for partition's lifecycle.

Looking forward to inputs on this ~~

Best,
Jin


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Service
[2] https://issues.apache.org/jira/browse/FLINK-22672
[3] https://issues.apache.org/jira/browse/FLINK-22676
Reply | Threaded
Open this post in threaded view
|

Re: The partition tracker should support remote shuffle properly

Till Rohrmann
Hi Jin,

thanks for starting this discussion and the initiative to implement a
remote shuffle service. It has always been the idea of the ShuffleService
abstraction to make this possible and we probably have overlooked some
details.

What I have understood from your description, you would like to introduce
the locationID which points to the location where the result partition is
stored (potentially external). Using the locationID and the tmID it is
possible to say whether the partition is stored externally or not.

I think that deciding whether the partition is stored externally or not (or
more precisely whether the partition occupies resources on the TM) can be
answered by using the ShuffleDescriptor.storesLocalResourcesOn method. If
it returns some ResourceID then we have to tell the TM about the release.
If not, then we only tell the shuffle master about the partition release.
How the data can be accessed on the external system is then encapsulated by
the ShuffleMaster and the ShuffleDescriptor. The logic for releasing the
partitions on the TMs and the ShuffleMaster should already be implemented
in the JobMasterPartitionTrackerImpl.

I think what we need to change is that we don't stop the tracking of
completed partitions when a TM on which the producers run disconnects and
if we store the result partition externally. This is required to make
partitions survive in case of TM failures. What this also requires is to
distinguish between finished and in-progress partitions.

What indeed is currently not implemented is the channel from the
ShuffleMaster to the JobMasterPartitionTrackerImpl. This is, however, not a
big problem atm. If the ShuffleMaster should lose a result partition, then
a reading task should fail with a PartitionException which will invalidate
the partition on the JMPartitionTracker so that it is reproduced. Listening
to the ShuffleMaster would be an optimization to learn more quickly about
this fact and to avoid a restart cycle.

Did I understand you correctly, Jin, and do my comments make sense?

Cheers,
Till

On Wed, May 26, 2021 at 5:52 AM XING JIN <[hidden email]> wrote:

> Hi devs ~
> Recently our team designed and started to build Flink remote shuffle
> service based on 'pluggable shuffle service framework'[1] for batch
> processing jobs. We found some potential enhancements could be made on
> 'pluggable shuffle service' and created an umbrella JIRA[2]. I raise this
> DISCUSSION and want to hear broader feedback / comments on one ticket [3]
> -- "The partition tracker should support remote shuffle properly".
>
> In current Flink, data partition is bound with the ResourceID of TM in
> Execution#startTrackingPartitions and JM partition tracker will stop
> tracking corresponding partitions when a TM
> disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of shuffle
> data is bound with computing resource (TM). It works fine for internal
> shuffle service, but doesn't for remote shuffle service. Note that shuffle
> data is accommodated on remote, the lifecycle of a completed partition is
> capable to be decoupled with TM, i.e. TM is totally fine to be released
> when no computing task is on it and further shuffle reading requests could
> be directed to remote shuffle cluster. In addition, when a TM is lost, its
> completed data partitions on remote shuffle cluster could avoid
> reproducing.
>
> The issue mentioned above is because Flink JobMasterPartitionTracker mixed
> up partition's locationID (where the partition is located) and tmID (which
> TM the partition is produced from). In TM internal shuffle, partition's
> locationID is the same with tmID, but it is not in remote shuffle;
> JobMasterPartitionTracker as an independent component should be able to
> differentiate locationID and tmID of a partition, thus to handle the
> lifecycle of a partition properly;
>
> We propose that JobMasterPartitionTracker indexes partitions with both
> locationID and tmID. The process of registration and unregistration will be
> like below:
>
> A. Partition Registration
> - Execution#registerProducedPartitions registers partition to ShuffleMaster
> and gets a ShuffleDescriptor. Current
> ShuffleDescriptor#storesLocalResourcesOn returns the location of the
> producing TM ONLY IF the partition occupies local resources there. We
> propose to change this method a proper name and always return the
> locationID of the partition. It might be as below:
>     ResourceID getLocationID();
> - Execution#registerProducePartitions then registers partition to
> JMPartitionTracker with tmID (ResourceID of TaskManager from
> TaskManagerLocation) and the locationID (acquired in above step).
> JobMasterPartitionTracker will indexes a partition with both tmID and
> locationID;
>
> B. Invokes from JM and ShuffleMaster
> JobMasterPartitionTracker listens invokes from both JM and ShuffleMaster.
> - When JMPartitionTracker hears from JobMaster#disconnectTaskManager that a
> TM disconnects, it will check whether the disconnected tmID equals a
> certain locationID of a partition. If so, tracking of the corresponding
> partition will be stopped.
> - When JobMasterPartitionTracker hears from ShuffleMaster that a data
> location gets lost, it will unregister corresponding partitions by
> locationID;
>
> C. Partition Unregistration
> When unregister a partition, JobMasterPartitionTracker removes the
> corresponding indexes to tmID and locationID firstly, and then release the
> partition according to shuffle service types:
> - If the locationID equals to the tmID, it indicates the partition is
> accommodated by TM internal shuffle service, JMPartitionTracker will invoke
> TaskExecutorGateway for the release;
> - If the locationID doesn't equal to tmID, it indicates the partition is
> accommodated by external shuffle service, JMPartitionTracker will invoke
> ShuffleMaster for the release;
>
> With the above change, JobMasterPartitionTracker can adapt with customized
> shuffle service properly for partition's lifecycle.
>
> Looking forward to inputs on this ~~
>
> Best,
> Jin
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Service
> [2] https://issues.apache.org/jira/browse/FLINK-22672
> [3] https://issues.apache.org/jira/browse/FLINK-22676
>
Reply | Threaded
Open this post in threaded view
|

Re: The partition tracker should support remote shuffle properly

XING JIN
Hi, Till ~ Thanks for your comments. Yes, I think we are on the same page –
we are discussing how should JMPartitionTracker manage partitions properly ~

> If the ShuffleMaster should lose a result partition, then a reading task
should fail with a PartitionException which will invalidate the partition
on the JobMasterPartitionTracker so that it is reproduced.

1. True, the reproduction of a lost upstream partition could be triggered
by the shuffle-read failure of downstream. But it tends to be unacceptable
in the production environment for batch processing jobs. Say there are 100
upstream partitions lost due to failure of an external shuffle worker, if
there's no notification from ShuffleMaster to JMPartitionTracker, 100
PartitionExceptions on downstream will happen and upstream partitions will
be reproduced one by one sequentially. The time overhead will be
unacceptable. From this point of view, I tend to think ShuffleMaster should
have the ability to unregister partitions by locationID on
JMPartitionTracker.

> I think that deciding whether the partition is stored externally or not
can be answered by using the ShuffleDescriptor.storesLocalResourcesOn
method.

2. Yes, I agree it works (though it's not easy to access ShuffleDescriptor
by tmID from JMPartitionTracker at this moment). But if we agree that my
first point is valid, JMPartitionTracker should maintain the index from
locationID to partition. Then it will be straightforward to check whether a
partition is accommodated on remote by comparing its tmID and locationID;

Best,
Jin

Till Rohrmann <[hidden email]> 于2021年6月1日周二 下午4:59写道:

> Hi Jin,
>
> thanks for starting this discussion and the initiative to implement a
> remote shuffle service. It has always been the idea of the ShuffleService
> abstraction to make this possible and we probably have overlooked some
> details.
>
> What I have understood from your description, you would like to introduce
> the locationID which points to the location where the result partition is
> stored (potentially external). Using the locationID and the tmID it is
> possible to say whether the partition is stored externally or not.
>
> I think that deciding whether the partition is stored externally or not (or
> more precisely whether the partition occupies resources on the TM) can be
> answered by using the ShuffleDescriptor.storesLocalResourcesOn method. If
> it returns some ResourceID then we have to tell the TM about the release.
> If not, then we only tell the shuffle master about the partition release.
> How the data can be accessed on the external system is then encapsulated by
> the ShuffleMaster and the ShuffleDescriptor. The logic for releasing the
> partitions on the TMs and the ShuffleMaster should already be implemented
> in the JobMasterPartitionTrackerImpl.
>
> I think what we need to change is that we don't stop the tracking of
> completed partitions when a TM on which the producers run disconnects and
> if we store the result partition externally. This is required to make
> partitions survive in case of TM failures. What this also requires is to
> distinguish between finished and in-progress partitions.
>
> What indeed is currently not implemented is the channel from the
> ShuffleMaster to the JobMasterPartitionTrackerImpl. This is, however, not a
> big problem atm. If the ShuffleMaster should lose a result partition, then
> a reading task should fail with a PartitionException which will invalidate
> the partition on the JMPartitionTracker so that it is reproduced. Listening
> to the ShuffleMaster would be an optimization to learn more quickly about
> this fact and to avoid a restart cycle.
>
> Did I understand you correctly, Jin, and do my comments make sense?
>
> Cheers,
> Till
>
> On Wed, May 26, 2021 at 5:52 AM XING JIN <[hidden email]> wrote:
>
> > Hi devs ~
> > Recently our team designed and started to build Flink remote shuffle
> > service based on 'pluggable shuffle service framework'[1] for batch
> > processing jobs. We found some potential enhancements could be made on
> > 'pluggable shuffle service' and created an umbrella JIRA[2]. I raise this
> > DISCUSSION and want to hear broader feedback / comments on one ticket [3]
> > -- "The partition tracker should support remote shuffle properly".
> >
> > In current Flink, data partition is bound with the ResourceID of TM in
> > Execution#startTrackingPartitions and JM partition tracker will stop
> > tracking corresponding partitions when a TM
> > disconnects(JobMaster#disconnectTaskManager), i.e. the lifecycle of
> shuffle
> > data is bound with computing resource (TM). It works fine for internal
> > shuffle service, but doesn't for remote shuffle service. Note that
> shuffle
> > data is accommodated on remote, the lifecycle of a completed partition is
> > capable to be decoupled with TM, i.e. TM is totally fine to be released
> > when no computing task is on it and further shuffle reading requests
> could
> > be directed to remote shuffle cluster. In addition, when a TM is lost,
> its
> > completed data partitions on remote shuffle cluster could avoid
> > reproducing.
> >
> > The issue mentioned above is because Flink JobMasterPartitionTracker
> mixed
> > up partition's locationID (where the partition is located) and tmID
> (which
> > TM the partition is produced from). In TM internal shuffle, partition's
> > locationID is the same with tmID, but it is not in remote shuffle;
> > JobMasterPartitionTracker as an independent component should be able to
> > differentiate locationID and tmID of a partition, thus to handle the
> > lifecycle of a partition properly;
> >
> > We propose that JobMasterPartitionTracker indexes partitions with both
> > locationID and tmID. The process of registration and unregistration will
> be
> > like below:
> >
> > A. Partition Registration
> > - Execution#registerProducedPartitions registers partition to
> ShuffleMaster
> > and gets a ShuffleDescriptor. Current
> > ShuffleDescriptor#storesLocalResourcesOn returns the location of the
> > producing TM ONLY IF the partition occupies local resources there. We
> > propose to change this method a proper name and always return the
> > locationID of the partition. It might be as below:
> >     ResourceID getLocationID();
> > - Execution#registerProducePartitions then registers partition to
> > JMPartitionTracker with tmID (ResourceID of TaskManager from
> > TaskManagerLocation) and the locationID (acquired in above step).
> > JobMasterPartitionTracker will indexes a partition with both tmID and
> > locationID;
> >
> > B. Invokes from JM and ShuffleMaster
> > JobMasterPartitionTracker listens invokes from both JM and ShuffleMaster.
> > - When JMPartitionTracker hears from JobMaster#disconnectTaskManager
> that a
> > TM disconnects, it will check whether the disconnected tmID equals a
> > certain locationID of a partition. If so, tracking of the corresponding
> > partition will be stopped.
> > - When JobMasterPartitionTracker hears from ShuffleMaster that a data
> > location gets lost, it will unregister corresponding partitions by
> > locationID;
> >
> > C. Partition Unregistration
> > When unregister a partition, JobMasterPartitionTracker removes the
> > corresponding indexes to tmID and locationID firstly, and then release
> the
> > partition according to shuffle service types:
> > - If the locationID equals to the tmID, it indicates the partition is
> > accommodated by TM internal shuffle service, JMPartitionTracker will
> invoke
> > TaskExecutorGateway for the release;
> > - If the locationID doesn't equal to tmID, it indicates the partition is
> > accommodated by external shuffle service, JMPartitionTracker will invoke
> > ShuffleMaster for the release;
> >
> > With the above change, JobMasterPartitionTracker can adapt with
> customized
> > shuffle service properly for partition's lifecycle.
> >
> > Looking forward to inputs on this ~~
> >
> > Best,
> > Jin
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Service
> > [2] https://issues.apache.org/jira/browse/FLINK-22672
> > [3] https://issues.apache.org/jira/browse/FLINK-22676
> >
>