[DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

yingjie
Hi devs,

I'd like to start a discussion about "Lifecycle of ShuffleMaster and its
Relationship with JobMaster and PartitionTracker". (These are things we
found when moving our external shuffle to the pluggable shuffle service
framework.)

The mail client may fail to display the right format. If so, please refer
to this document:
https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
.
Lifecycle of ShuffleMaster

Currently, the lifecycle of ShuffleMaster seems unclear.  The
ShuffleServiceFactory is loaded for each JobMaster instance and then
ShuffleServiceFactory#createShuffleMaster will be called to create a
ShuffleMaser instance. However, the default NettyShuffleServiceFactory
always returns the same ShuffleMaser singleton instance for all jobs. Based
on the current implementation, the lifecycle of ShuffleMaster seems open
and depends on the shuffle plugin themselves. However, at the TM side,
the ShuffleEnvironment
is a part of the TaskManagerServices whose lifecycle is decoupled with jobs
which is more like a service. It means there is also an inconsistency
between the TM side and the JM side.

From my understanding, the reason for this is that the pluggable shuffle
framework is still not completely finished yet, for example, there is a
follow up umbrella ticket  FLINK-19551
<https://issues.apache.org/jira/browse/FLINK-19551> for the pluggable
shuffle service framework and in its subtasks, there is one task (
FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>) which aims
to load shuffle plugin with the PluginManager. I think this can solve the
issue mentioned above. After the corresponding factory  loaded by the
PluginManager, all ShuffleMaster instances can be stored in a map indexed
by the corresponding factory class name  which can be shared by all jobs.
After that, the ShuffleMaster becomes a cluster level service which is
consistent with the ShuffleEnvironment at the TM side.

As a summary, we propose to finish FLINK-12731
<https://issues.apache.org/jira/browse/FLINK-12731> and make the shuffle
service a real cluster level service first. Furthermore, we add two
lifecycle methods to the ShuffleMaster interface, including start and
close responding
for initialization (for example, contacting the external system) and
graceful shutdown (for example, releasing the resources) respectively
(these methods already exist in the ShuffleEnvironment interface at the TM
side). What do you think?
Relationship of ShuffleMaster & JobMaster

Currently, JobMaster holds the reference to the corresponding ShuffleMaster
and it can register partitions (allocate ShuffleDescriptor from) to
ShuffleMaster
by the registerPartitionWithProducer method. To support use cases like
allocating external resources when a job starts and releasing all allocated
resources when a job terminates, we may also need some job level
initialization and finalization. These job level initialization and
finalization are also helpful when serving multiple jobs simultaneously.

As a summary,  we propose to add two job level lifecycle methods registerJob
and unregisterJob responding for job level shuffle initialization and
finalization, for example, releasing all external resources occupied by the
corresponding job. What do you think?
Relationship of ShuffleMaster & PartitionTracker

Currently, the JobMasterPartitionTracker can release external result
partitions through the releasePartitionExternally method of ShuffleMaster.
However, the shuffle plugin (ShuffleMaster) may also need the ability of
stopping  tracking some partitions depending on the status of the external
services, for example, if the external storage node which stores some
partitions crashes, we need to stop tracking all partitions in it to avoid
reproducing the lost partitions one by one. By introducing something like
ShuffleContext which delegates to the partition tracker, this requirement
can be easily satisfied. Besides, for cluster partitions, we also need to
have the ability to release them.

As a summary, we propose to add a releaseDataSetExternally method to
the ShuffleMaster
interface which is responsible for releasing cluster partitions. Besides,
we propose to add a ShuffleContext which can delegate to the
PartitionTracker and stop tracking partitions. For the cluster partitions
and job partitions, two separated ShuffleContext abstracts are needed.
What do you think?
Interface Change Summary

As discussed in the above sections, we propose to make some interface
changes around the ShuffleMaster interface. The first change is to
pass a ShuffleMasterContex
instance to the ShuffleServiceFactory when creating the ShuffleMaster just
like the ShuffleEnvironment creation at the TM side. Changes are marked
with bold texts (the same below).

public interface ShuffleServiceFactory<
        SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G
extends IndexedInputGate> {

    /**
    * Factory method to create a specific {@link ShuffleMaster}
implementation.
    */
    ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
shuffleMasterContext);

    /**
    * Factory method to create a specific local {@link ShuffleEnvironment}
implementation.
    */
    ShuffleEnvironment<P, G> createShuffleEnvironment(
            ShuffleEnvironmentContext shuffleEnvironmentContext);
}

The following  is the ShuffleMasterContext interface. It will be
implemented by the pluggable shuffle framework itself and can be used by
the shuffle plugin. A context Interface is more friendly if we want to
extend it in the future.

public interface ShuffleMasterContext {

    /** Gets the cluster configuration. */
    Configuration getConfiguration();

    /** Handles the fatal error if any. */
    void onFatalError(Throwable throwable);

    /**
    * Stops tracking the target dataset (cluster partitions), which means
these data can not be reused anymore.
    */
    CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID
dataSetID);

    /** Returns IDs of all datasets (cluster partitions) being tracked by
this cluster currently. */
    CompletableFuture<List<IntermediateDataSetID>> listDataSets();
}

The second part to be enhanced is the ShuffleMaster interface. Methods to
be added include start, close, registerJob, unregisterJob and
releaseDataSetExternally. In addition, because each ShuffleMaster instance
can serve multiple jobs simultaneously, when registering partitions, one
should also provide the corresponding JobID. The following shows the
updated ShuffleMaster interface:

public interface ShuffleMaster<T extends ShuffleDescriptor> extends
AutoCloseable {

    /**
    * Starts this shuffle master, for example getting the access and
connecting to the external
    * system.
    */
    void start() throws Exception;

    /** Closes this shuffle master which releases all resources. */
    void close() throws Exception;

    /** Registers the target job to this shuffle master. */
    void registerJob(JobShuffleContext context);

    /** Unregisters the target job from this shuffle master. */
    void unregisterJob(JobID jobID);

    /** Asynchronously register a partition and its producer with the
shuffle service. */
    CompletableFuture<T> registerPartitionWithProducer(
            JobID jobID,
            PartitionDescriptor partitionDescriptor,
            ProducerDescriptor producerDescriptor);

    /** Releases any external resources occupied by the given partition. */
    void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);

    /** Releases the target cluster partitions stored externally if any. */
    void releaseDataSetExternally(IntermediateDataSetID dataSetID);
}

The following  is the JobShuffleContext interface. It will be implemented
by the pluggable shuffle framework itself and can be used by the shuffle
plugin.

public interface JobShuffleContext {

    /** Gets the corresponding job configuration. */
    Configuration getConfiguration();

    /** Gets the corresponding {@link JobID}. */
    JobID getJobID();

    /**
    * Stops tracking the target result partitions, which means these
partitions will be reproduced if used afterwards.
    */
    CompletableFuture<Void>
stopTrackingPartitions(Collection<ResultPartitionID>
partitionIDS);

    /** Returns information of all partitions being tracked for the current
job. */
    CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
listPartitions();
}

What do you think of these changes? Any feedback is highly appreciated.