[DISCUSS] Retrieval services in non-high-availability scenario

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Retrieval services in non-high-availability scenario

tison
Hi devs,

I'd like to start a discussion thread on the topic how we provide
retrieval services in non-high-availability scenario. To clarify
terminology, non-high-availability scenario refers to
StandaloneHaServices and EmbeddedHaServices.

***The problem***

We notice that retrieval services of current StandaloneHAServices
(pre-configured) and EmbeddedHAServices(in-memory) has their
respective problems.

For pre-configured scenario, we now have a
getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
to workaround the problem that it is impossible to configure JM
address previously. The parameter defaultJMAddress is not in use in
any other defaultJMAddress with any other high-availability mode.
Also in MiniCluster scenario and anywhere else leader address
pre-configure becomes impossible, StandaloneHAServices cannot be used.

For in-memory case, it is clearly that it doesn't fit any distributed
scenario.

***The proposal***

In order to address the inconsistency between pre-configured retrieval
services and zookeeper based retrieval services, we reconsider the
promises provided by "non-high-availability" and regard it as
similar services as zookeeper based one except it doesn't tolerate
node failure. Thus, we implement a service acts like a standalone
zookeeper cluster, named LeaderServer.

A leader server is an actor runs on jobmanager actor system and reacts
to leader contender register and leader retriever request. If
jobmanager fails, the leader server associated fails, too, where
"non-high-availability" stands.

In order to communicate with leader server, we start leader client per
high-availability services(JM, TM, ClusterClient). When leader
election service starts, it registers the contender to leader server
via leader client(by akka communication); when leader retriever
starts, it registers itself to leader server via leader client.

Leader server handles leader election internally just like Embedded
implementation, and notify retrievers with new leader information
when there is new leader elected.

In this way, we unify the view of retrieval services in all scenario:

1. Configure a name services to communicate with. In zookeeper mode
it is zookeeper and in non-high-availability mode it is leader server.
2. Any retrieval request is sent to the name services and is handled
by that services.

Apart from a unified view, there are other advantages:

+ We need not to use a special method
getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use
getJobManagerLeaderRetriever(JobID). And so that we need not include
JobManager address in slot request which might become stale during
transmission.

+ Separated configuration concerns on launch and retrieval. JobManager
address & port, REST address & port is only configured when launch
a cluster(even in YARN scenario, no need to configure). And when
retrieval requested, configure the connect info to name services(zk
or leader server).

+ Embedded implementation could be also included in this abstraction
without any regression on multiple leader simulation for test purpose.
Actually, leader server acts as a limited standalone zookeeper
cluster. And thus, from where this proposal comes from, when we
refactor metadata storage with transaction store proposed in
FLINK-10333, we only take care of zookeeper implementation and a
unified non-high-availability implementation.

***Clean up***

It is also noticed that there are several stale & unimplemented
high-availability services implementations which I'd like to remove for
a clean codebase work on this thread and FLINK-10333. They are:

- YarnHighAvailabilityServices
- AbstractYarnNonHaServices
- YarnIntraNonHaMasterServices
- YarnPreConfiguredMasterNonHaServices
- SingleLeaderElectionService
- FsNegativeRunningJobsRegistry

Any feedback is appreciated.

Best,
tison.
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Retrieval services in non-high-availability scenario

Till Rohrmann
Hi Tison,

thanks for starting this discussion. I think your mail includes multiple
points which are worth being treated separately (might even make sense to
have separate discussion threads). Please correct me if I understood things
wrongly:

1. Adding new non-ha HAServices:

Based on your description I could see the "ZooKeeper-light" non-ha
HAServices implementation work. Would any changes to the existing
interfaces be needed? How would the LeaderServer integrate in the lifecycle
of the cluster entrypoint?

2. Replacing existing non-ha HAServices with LeaderServer implementation:

I'm not sure whether we need to enforce that every non-ha HAServices
implementation works as you've described. I think it is pretty much an
implementation detail whether the services talk to a LeaderServer or are
being started with a pre-configured address. I also think that it is fair
to have different implementations with different characteristics and usage
scenarios. As you've said the EmbeddedHaServices are targeted for single
process cluster setups and they are only used by the MiniCluster.

What I like about the StandaloneHaServices is that they are dead simple
(apart from the configuration). With a new implementation based on the
LeaderServer, the client side implementation becomes much more complex
because now one needs to handle all kind of network issues properly.
Moreover, it adds more complexity to the system because it starts a new
distributed component which needs to be managed. I could see that once the
new implementation has matured enough that it might replace the
EmbeddedHaServices. But I wouldn't start with removing them.

You are right that due to the fact that we don't know the JM address before
it's being started that we need to send the address with every slot
request. Moreover we have the method #getJobManagerLeaderRetriever(JobID,
defaultJMAddress) on the HAServices. While this is not super nice, I don't
think that this is a fundamental problem at the moment. What we pay is a
couple of extra bytes we need to send over the network.

Configuration-wise, I'm not so sure whether we gain too much by replacing
the StandaloneHaServices with the LeaderServer based implementation. For
the new implementation one needs to configure a static address as well at
cluster start-up time. The only benefit I can see is that we don't need to
send the JM address to the RM and TMs. But as I've said, I don't think that
this is a big problem for which we need to introduce new HAServices.
Instead I could see that we might be able to remove it once the
LeaderServer HAServices implementation has proven to be stable.

3. Configuration of HAServices:

I agree that Flink's address and port configuration is not done
consistently. I might make sense to group the address and port
configuration under the ha service configuration section. Maybe it makes
also sense to rename ha services into ServiceDiscovery because it also
works in the non-ha case. it could be possible to only configure address
and port if one is using the non-ha services, for example. However, this
definitely deserves a separate discussion and design because one needs to
check where exactly the respective configuration options are being used.

I think improving the configuration of HAServices is actually orthogonal to
introducing the LeaderServer HAServices implementation and could also be
done for the existing HAServices.

4. Clean up of HAServices implementations:

You are right that some of the existing HAServices implementations are
"dead code" at the moment. They are the result of some implementation ideas
which haven't been completed. I would suggest to start a separate
discussion to discuss what to do with them.

Cheers,
Till

On Mon, Sep 9, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote:

> Hi devs,
>
> I'd like to start a discussion thread on the topic how we provide
> retrieval services in non-high-availability scenario. To clarify
> terminology, non-high-availability scenario refers to
> StandaloneHaServices and EmbeddedHaServices.
>
> ***The problem***
>
> We notice that retrieval services of current StandaloneHAServices
> (pre-configured) and EmbeddedHAServices(in-memory) has their
> respective problems.
>
> For pre-configured scenario, we now have a
> getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
> to workaround the problem that it is impossible to configure JM
> address previously. The parameter defaultJMAddress is not in use in
> any other defaultJMAddress with any other high-availability mode.
> Also in MiniCluster scenario and anywhere else leader address
> pre-configure becomes impossible, StandaloneHAServices cannot be used.
>
> For in-memory case, it is clearly that it doesn't fit any distributed
> scenario.
>
> ***The proposal***
>
> In order to address the inconsistency between pre-configured retrieval
> services and zookeeper based retrieval services, we reconsider the
> promises provided by "non-high-availability" and regard it as
> similar services as zookeeper based one except it doesn't tolerate
> node failure. Thus, we implement a service acts like a standalone
> zookeeper cluster, named LeaderServer.
>
> A leader server is an actor runs on jobmanager actor system and reacts
> to leader contender register and leader retriever request. If
> jobmanager fails, the leader server associated fails, too, where
> "non-high-availability" stands.
>
> In order to communicate with leader server, we start leader client per
> high-availability services(JM, TM, ClusterClient). When leader
> election service starts, it registers the contender to leader server
> via leader client(by akka communication); when leader retriever
> starts, it registers itself to leader server via leader client.
>
> Leader server handles leader election internally just like Embedded
> implementation, and notify retrievers with new leader information
> when there is new leader elected.
>
> In this way, we unify the view of retrieval services in all scenario:
>
> 1. Configure a name services to communicate with. In zookeeper mode
> it is zookeeper and in non-high-availability mode it is leader server.
> 2. Any retrieval request is sent to the name services and is handled
> by that services.
>
> Apart from a unified view, there are other advantages:
>
> + We need not to use a special method
> getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use
> getJobManagerLeaderRetriever(JobID). And so that we need not include
> JobManager address in slot request which might become stale during
> transmission.
>
> + Separated configuration concerns on launch and retrieval. JobManager
> address & port, REST address & port is only configured when launch
> a cluster(even in YARN scenario, no need to configure). And when
> retrieval requested, configure the connect info to name services(zk
> or leader server).
>
> + Embedded implementation could be also included in this abstraction
> without any regression on multiple leader simulation for test purpose.
> Actually, leader server acts as a limited standalone zookeeper
> cluster. And thus, from where this proposal comes from, when we
> refactor metadata storage with transaction store proposed in
> FLINK-10333, we only take care of zookeeper implementation and a
> unified non-high-availability implementation.
>
> ***Clean up***
>
> It is also noticed that there are several stale & unimplemented
> high-availability services implementations which I'd like to remove for
> a clean codebase work on this thread and FLINK-10333. They are:
>
> - YarnHighAvailabilityServices
> - AbstractYarnNonHaServices
> - YarnIntraNonHaMasterServices
> - YarnPreConfiguredMasterNonHaServices
> - SingleLeaderElectionService
> - FsNegativeRunningJobsRegistry
>
> Any feedback is appreciated.
>
> Best,
> tison.
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Retrieval services in non-high-availability scenario

tison
Hi Till,

Thanks for your reply. I agree point 3 and 4 in your email worth a separated
thread to discuss. Let me answer your questions and concerns in point 1 and
2
respectively.

1.Lifecycle of LeaderServer and requirement to implement it

LeaderServer starts on cluster entrypoint and its lifecycle is bound to the
lifecycle of cluster entrypoint. That is, when the cluster entrypoint
starts,
a LeaderServer also starts; and LeaderServer gets shut down when the cluster
entrypoint gets shut down. This is because we need to provide services
discovery
during the cluster is running.

For implementation part, conceptually it is a service running on cluster
entrypoint which holds in memory services information and can be
communicatied
with. As our internal specific implementation, LeaderServer is an actor
running
on the actor system running on cluster entrypoint, which is referred as
`commonRpcService`. It is just another unfenced rpc endpoint and required
no extra changes to the existing interfaces.

Apart from LeaderServer, there is another concept in this implementation,
the
LeaderClient. LeaderClient forwards register request from election service
and
retrieval service; forwards leader changed message from LeaderServer. As our
specific implementation, LeaderClient is an actor and runs on cluster
entrypoint, task manager and cluster client.

(1). cluster entrypoint

The lifecycle of LeaderClient is like LeaderServer.

(2). task manager

The lifecycle of LeaderClient is bound to the lifecycle of task manager.
Specifically, it runs on `rpcService` starts on task manager runner and
stops
when the service gets shut down.

(3). cluster client

The lifecycle of LeaderClient is bound to the ClusterClient. With our
codebase,
only RestClusterClient should do the adaptation. When start ClientHAService
based on LeaderClient, it starts a dedicated rpc service on which the
LeaderClient runs. The service as well as the LeaderClient gets shut down on
RestClusterClient closed, where ClientHAService#close called. It is a
transparent implementation inside a specific ClientHAService; thus also, no
changes to the existing interfaces.

2. The proposal to replace existing non-ha services

Well, I see your concerns on replace existing stable services hurriedly with
a new implementation. Here I list the pros and cons of this replacement. If
we
agree that it does good I can provide an neat and full featured
implementation
for preview and see concretely what we add and what we gain. For
integration,
we can then first integrate with MiniCluster and later.

pros:

+ We don't need to pass the address of job manager among slot request.

With the new implementation retriever running on task manager registers
itself
on the LeaderServer which has a global static address. And the retriever
retrieves the address of job manager based on JobID. It is not only unify
the
interfaces #getJobManagerLeaderRetriever, but reduce the cost on job manager
switched.

Currently, when job manager lost leadership, slots offered to the old job
manager
are unaware of it immediately. They don't get released until heartbeat from
job manager timeout. With LeaderServer based implementation, LeaderServer
notifies LeaderClient once job manager lost leadership.

+ We have a unified implementation in non-ha scenario

It can be regarded as a location transparent embedded implementation.

+ We have a unified view of high-availability services

LeaderServer based implementation follows the same view of ZooKeeper based
implementation. Since these high-availability services don't have natural
difference from one to the other, we can instead naturally handle them under
a unified view.

I know that we should still configure the address of LeaderServer, but now
it
is more like connect string of ZooKeeper, instead of address of internal
component. In fact, then we can deprecate configuration of job manager port
and auto detect a port as we do in ZooKeeper based scenario.

cons:

- Overhead on client side

The overhead of transmitting job manager address is lower enough so I don't
list
it as valid pros. Correspondingly, messages between actors among existing
actor
system are regarded as significant overhead.

The visible overhead is that we start a dedicated actor system in
RestClusterClient. It is due to the implementation of LeaderServer &
LeaderClient based on akka. Conceptually it can be any services but we
always
introduce some overhead.

**contrast to current implementations**

LeaderServer based implementation can be regarded as a location transparent
embedded implementation. Thus there isn't too many contrasts. Also, embedded
implementation is used only in MiniCluster scenario where an actor system is
already running, so there isn't significant performance concerns.

As for pre-configured implementation, named StandaloneHaServices, I agree
that
it is deadly simple. But apart from the benefit of unification, it is
gradually
unrealistic to require users pre-configure the port of job manager,
especially
on cloud native scenario. Although the specific implementation couple the
address and port of LeaderServer and that of job manager, it is not a
fundamental constraint. Thus, LeaderServer based implementation is more
flexible for evolution.

Best,
tison.


Till Rohrmann <[hidden email]> 于2019年9月9日周一 下午5:37写道:

> Hi Tison,
>
> thanks for starting this discussion. I think your mail includes multiple
> points which are worth being treated separately (might even make sense to
> have separate discussion threads). Please correct me if I understood things
> wrongly:
>
> 1. Adding new non-ha HAServices:
>
> Based on your description I could see the "ZooKeeper-light" non-ha
> HAServices implementation work. Would any changes to the existing
> interfaces be needed? How would the LeaderServer integrate in the lifecycle
> of the cluster entrypoint?
>
> 2. Replacing existing non-ha HAServices with LeaderServer implementation:
>
> I'm not sure whether we need to enforce that every non-ha HAServices
> implementation works as you've described. I think it is pretty much an
> implementation detail whether the services talk to a LeaderServer or are
> being started with a pre-configured address. I also think that it is fair
> to have different implementations with different characteristics and usage
> scenarios. As you've said the EmbeddedHaServices are targeted for single
> process cluster setups and they are only used by the MiniCluster.
>
> What I like about the StandaloneHaServices is that they are dead simple
> (apart from the configuration). With a new implementation based on the
> LeaderServer, the client side implementation becomes much more complex
> because now one needs to handle all kind of network issues properly.
> Moreover, it adds more complexity to the system because it starts a new
> distributed component which needs to be managed. I could see that once the
> new implementation has matured enough that it might replace the
> EmbeddedHaServices. But I wouldn't start with removing them.
>
> You are right that due to the fact that we don't know the JM address before
> it's being started that we need to send the address with every slot
> request. Moreover we have the method #getJobManagerLeaderRetriever(JobID,
> defaultJMAddress) on the HAServices. While this is not super nice, I don't
> think that this is a fundamental problem at the moment. What we pay is a
> couple of extra bytes we need to send over the network.
>
> Configuration-wise, I'm not so sure whether we gain too much by replacing
> the StandaloneHaServices with the LeaderServer based implementation. For
> the new implementation one needs to configure a static address as well at
> cluster start-up time. The only benefit I can see is that we don't need to
> send the JM address to the RM and TMs. But as I've said, I don't think that
> this is a big problem for which we need to introduce new HAServices.
> Instead I could see that we might be able to remove it once the
> LeaderServer HAServices implementation has proven to be stable.
>
> 3. Configuration of HAServices:
>
> I agree that Flink's address and port configuration is not done
> consistently. I might make sense to group the address and port
> configuration under the ha service configuration section. Maybe it makes
> also sense to rename ha services into ServiceDiscovery because it also
> works in the non-ha case. it could be possible to only configure address
> and port if one is using the non-ha services, for example. However, this
> definitely deserves a separate discussion and design because one needs to
> check where exactly the respective configuration options are being used.
>
> I think improving the configuration of HAServices is actually orthogonal to
> introducing the LeaderServer HAServices implementation and could also be
> done for the existing HAServices.
>
> 4. Clean up of HAServices implementations:
>
> You are right that some of the existing HAServices implementations are
> "dead code" at the moment. They are the result of some implementation ideas
> which haven't been completed. I would suggest to start a separate
> discussion to discuss what to do with them.
>
> Cheers,
> Till
>
> On Mon, Sep 9, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote:
>
> > Hi devs,
> >
> > I'd like to start a discussion thread on the topic how we provide
> > retrieval services in non-high-availability scenario. To clarify
> > terminology, non-high-availability scenario refers to
> > StandaloneHaServices and EmbeddedHaServices.
> >
> > ***The problem***
> >
> > We notice that retrieval services of current StandaloneHAServices
> > (pre-configured) and EmbeddedHAServices(in-memory) has their
> > respective problems.
> >
> > For pre-configured scenario, we now have a
> > getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
> > to workaround the problem that it is impossible to configure JM
> > address previously. The parameter defaultJMAddress is not in use in
> > any other defaultJMAddress with any other high-availability mode.
> > Also in MiniCluster scenario and anywhere else leader address
> > pre-configure becomes impossible, StandaloneHAServices cannot be used.
> >
> > For in-memory case, it is clearly that it doesn't fit any distributed
> > scenario.
> >
> > ***The proposal***
> >
> > In order to address the inconsistency between pre-configured retrieval
> > services and zookeeper based retrieval services, we reconsider the
> > promises provided by "non-high-availability" and regard it as
> > similar services as zookeeper based one except it doesn't tolerate
> > node failure. Thus, we implement a service acts like a standalone
> > zookeeper cluster, named LeaderServer.
> >
> > A leader server is an actor runs on jobmanager actor system and reacts
> > to leader contender register and leader retriever request. If
> > jobmanager fails, the leader server associated fails, too, where
> > "non-high-availability" stands.
> >
> > In order to communicate with leader server, we start leader client per
> > high-availability services(JM, TM, ClusterClient). When leader
> > election service starts, it registers the contender to leader server
> > via leader client(by akka communication); when leader retriever
> > starts, it registers itself to leader server via leader client.
> >
> > Leader server handles leader election internally just like Embedded
> > implementation, and notify retrievers with new leader information
> > when there is new leader elected.
> >
> > In this way, we unify the view of retrieval services in all scenario:
> >
> > 1. Configure a name services to communicate with. In zookeeper mode
> > it is zookeeper and in non-high-availability mode it is leader server.
> > 2. Any retrieval request is sent to the name services and is handled
> > by that services.
> >
> > Apart from a unified view, there are other advantages:
> >
> > + We need not to use a special method
> > getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use
> > getJobManagerLeaderRetriever(JobID). And so that we need not include
> > JobManager address in slot request which might become stale during
> > transmission.
> >
> > + Separated configuration concerns on launch and retrieval. JobManager
> > address & port, REST address & port is only configured when launch
> > a cluster(even in YARN scenario, no need to configure). And when
> > retrieval requested, configure the connect info to name services(zk
> > or leader server).
> >
> > + Embedded implementation could be also included in this abstraction
> > without any regression on multiple leader simulation for test purpose.
> > Actually, leader server acts as a limited standalone zookeeper
> > cluster. And thus, from where this proposal comes from, when we
> > refactor metadata storage with transaction store proposed in
> > FLINK-10333, we only take care of zookeeper implementation and a
> > unified non-high-availability implementation.
> >
> > ***Clean up***
> >
> > It is also noticed that there are several stale & unimplemented
> > high-availability services implementations which I'd like to remove for
> > a clean codebase work on this thread and FLINK-10333. They are:
> >
> > - YarnHighAvailabilityServices
> > - AbstractYarnNonHaServices
> > - YarnIntraNonHaMasterServices
> > - YarnPreConfiguredMasterNonHaServices
> > - SingleLeaderElectionService
> > - FsNegativeRunningJobsRegistry
> >
> > Any feedback is appreciated.
> >
> > Best,
> > tison.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Retrieval services in non-high-availability scenario

Till Rohrmann
Hi Tison,

thanks for the detailed response. I put some comments inline:

On Tue, Sep 10, 2019 at 10:51 AM Zili Chen <[hidden email]> wrote:

> Hi Till,
>
> Thanks for your reply. I agree point 3 and 4 in your email worth a
> separated
> thread to discuss. Let me answer your questions and concerns in point 1 and
> 2
> respectively.
>
> 1.Lifecycle of LeaderServer and requirement to implement it
>
> LeaderServer starts on cluster entrypoint and its lifecycle is bound to the
> lifecycle of cluster entrypoint. That is, when the cluster entrypoint
> starts,
> a LeaderServer also starts; and LeaderServer gets shut down when the
> cluster
> entrypoint gets shut down. This is because we need to provide services
> discovery
> during the cluster is running.
>
> For implementation part, conceptually it is a service running on cluster
> entrypoint which holds in memory services information and can be
> communicatied
> with. As our internal specific implementation, LeaderServer is an actor
> running
> on the actor system running on cluster entrypoint, which is referred as
> `commonRpcService`. It is just another unfenced rpc endpoint and required
> no extra changes to the existing interfaces.
>
> Apart from LeaderServer, there is another concept in this implementation,
> the
> LeaderClient. LeaderClient forwards register request from election service
> and
> retrieval service; forwards leader changed message from LeaderServer. As
> our
> specific implementation, LeaderClient is an actor and runs on cluster
> entrypoint, task manager and cluster client.


I think these kind of changes to the ClusterEntrypoint deserve a separate
design and discussion.

>


> (1). cluster entrypoint
>
> The lifecycle of LeaderClient is like LeaderServer.
>
> (2). task manager
>
> The lifecycle of LeaderClient is bound to the lifecycle of task manager.
> Specifically, it runs on `rpcService` starts on task manager runner and
> stops
> when the service gets shut down.
>
> (3). cluster client
>
> The lifecycle of LeaderClient is bound to the ClusterClient. With our
> codebase,
> only RestClusterClient should do the adaptation. When start ClientHAService
> based on LeaderClient, it starts a dedicated rpc service on which the
> LeaderClient runs. The service as well as the LeaderClient gets shut down
> on
> RestClusterClient closed, where ClientHAService#close called. It is a
> transparent implementation inside a specific ClientHAService; thus also, no
> changes to the existing interfaces.
>
> 2. The proposal to replace existing non-ha services
>
> Well, I see your concerns on replace existing stable services hurriedly
> with
> a new implementation. Here I list the pros and cons of this replacement. If
> we
> agree that it does good I can provide an neat and full featured
> implementation
> for preview and see concretely what we add and what we gain. For
> integration,
> we can then first integrate with MiniCluster and later.
>
> pros:
>
> + We don't need to pass the address of job manager among slot request.
>
> With the new implementation retriever running on task manager registers
> itself
> on the LeaderServer which has a global static address. And the retriever
> retrieves the address of job manager based on JobID. It is not only unify
> the
> interfaces #getJobManagerLeaderRetriever, but reduce the cost on job
> manager
> switched.
>

Why would the job manager switch its address in the non-ha case? We don't
support this if I'm not mistaken. Moreover, why would this change unify the
HighAvailabilityServices interface? At the moment there is
only getJobManagerLeaderRetriever(JobID jobID, String
defaultJobManagerAddress) which should be used. I acknowledge that it could
save use the second parameter but I think the benefits would be minor atm.

>
> Currently, when job manager lost leadership, slots offered to the old job
> manager
> are unaware of it immediately. They don't get released until heartbeat from
> job manager timeout. With LeaderServer based implementation, LeaderServer
> notifies LeaderClient once job manager lost leadership.
>

This could indeed be an additional signal for the system. However, if the
LeaderServer runs as part of the ClusterEntrypoint, then it will rarely if
not never happen that the LeaderServer is still running and the JM
died/lost leadership.

>
> + We have a unified implementation in non-ha scenario
>
> It can be regarded as a location transparent embedded implementation.
>

Why do we have remove the StandaloneHaServices for that? Implementation of
one services implementation should not be relevant for others.

>
> + We have a unified view of high-availability services
>
> LeaderServer based implementation follows the same view of ZooKeeper based
> implementation. Since these high-availability services don't have natural
> difference from one to the other, we can instead naturally handle them
> under
> a unified view.
>

I don't get this point tbh. Why do you want to impose a ZooKeeper based
view on ha services if there is a more general one which does not restrict
us in any way?

>
> I know that we should still configure the address of LeaderServer, but now
> it
> is more like connect string of ZooKeeper, instead of address of internal
> component. In fact, then we can deprecate configuration of job manager port
> and auto detect a port as we do in ZooKeeper based scenario.
>

Auto selecting the port won't work because the clients need to know where
to connect to. Moreover, whether you call it quorum address/connect string
or JM address and port is effectively the same. I admit that we should
clean it up but at the end of the day it's all the same.

>
> cons:
>
> - Overhead on client side
>
> The overhead of transmitting job manager address is lower enough so I don't
> list
> it as valid pros. Correspondingly, messages between actors among existing
> actor
> system are regarded as significant overhead.
>
> The visible overhead is that we start a dedicated actor system in
> RestClusterClient. It is due to the implementation of LeaderServer &
> LeaderClient based on akka. Conceptually it can be any services but we
> always
> introduce some overhead.
>
> **contrast to current implementations**
>
> LeaderServer based implementation can be regarded as a location transparent
> embedded implementation. Thus there isn't too many contrasts. Also,
> embedded
> implementation is used only in MiniCluster scenario where an actor system
> is
> already running, so there isn't significant performance concerns.
>
> As for pre-configured implementation, named StandaloneHaServices, I agree
> that
> it is deadly simple. But apart from the benefit of unification, it is
> gradually
> unrealistic to require users pre-configure the port of job manager,
> especially
> on cloud native scenario.


How would this change with the LeaderServer? Users still need to configure
the address of it (which includes the hostname and port).


> Although the specific implementation couple the
> address and port of LeaderServer and that of job manager, it is not a
> fundamental constraint. Thus, LeaderServer based implementation is more
> flexible for evolution.
>

Yes this makes sense.

I think it makes sense to add a new HighAvailabilityServices implementation
based on what you've described. However, what I don't understand is why is
it so important to remove the existing non-ha HighAvailabilityServices? I
think we can add a new HAServices implementation which provides all the
things you've described without touching the others. This has also the
benefit to do things in incremental steps instead of trying to do
everything at once which is usually a recipe for disaster.

Cheers,
Till




> Best,
> tison.
>
>
> Till Rohrmann <[hidden email]> 于2019年9月9日周一 下午5:37写道:
>
> > Hi Tison,
> >
> > thanks for starting this discussion. I think your mail includes multiple
> > points which are worth being treated separately (might even make sense to
> > have separate discussion threads). Please correct me if I understood
> things
> > wrongly:
> >
> > 1. Adding new non-ha HAServices:
> >
> > Based on your description I could see the "ZooKeeper-light" non-ha
> > HAServices implementation work. Would any changes to the existing
> > interfaces be needed? How would the LeaderServer integrate in the
> lifecycle
> > of the cluster entrypoint?
> >
> > 2. Replacing existing non-ha HAServices with LeaderServer implementation:
> >
> > I'm not sure whether we need to enforce that every non-ha HAServices
> > implementation works as you've described. I think it is pretty much an
> > implementation detail whether the services talk to a LeaderServer or are
> > being started with a pre-configured address. I also think that it is fair
> > to have different implementations with different characteristics and
> usage
> > scenarios. As you've said the EmbeddedHaServices are targeted for single
> > process cluster setups and they are only used by the MiniCluster.
> >
> > What I like about the StandaloneHaServices is that they are dead simple
> > (apart from the configuration). With a new implementation based on the
> > LeaderServer, the client side implementation becomes much more complex
> > because now one needs to handle all kind of network issues properly.
> > Moreover, it adds more complexity to the system because it starts a new
> > distributed component which needs to be managed. I could see that once
> the
> > new implementation has matured enough that it might replace the
> > EmbeddedHaServices. But I wouldn't start with removing them.
> >
> > You are right that due to the fact that we don't know the JM address
> before
> > it's being started that we need to send the address with every slot
> > request. Moreover we have the method #getJobManagerLeaderRetriever(JobID,
> > defaultJMAddress) on the HAServices. While this is not super nice, I
> don't
> > think that this is a fundamental problem at the moment. What we pay is a
> > couple of extra bytes we need to send over the network.
> >
> > Configuration-wise, I'm not so sure whether we gain too much by replacing
> > the StandaloneHaServices with the LeaderServer based implementation. For
> > the new implementation one needs to configure a static address as well at
> > cluster start-up time. The only benefit I can see is that we don't need
> to
> > send the JM address to the RM and TMs. But as I've said, I don't think
> that
> > this is a big problem for which we need to introduce new HAServices.
> > Instead I could see that we might be able to remove it once the
> > LeaderServer HAServices implementation has proven to be stable.
> >
> > 3. Configuration of HAServices:
> >
> > I agree that Flink's address and port configuration is not done
> > consistently. I might make sense to group the address and port
> > configuration under the ha service configuration section. Maybe it makes
> > also sense to rename ha services into ServiceDiscovery because it also
> > works in the non-ha case. it could be possible to only configure address
> > and port if one is using the non-ha services, for example. However, this
> > definitely deserves a separate discussion and design because one needs to
> > check where exactly the respective configuration options are being used.
> >
> > I think improving the configuration of HAServices is actually orthogonal
> to
> > introducing the LeaderServer HAServices implementation and could also be
> > done for the existing HAServices.
> >
> > 4. Clean up of HAServices implementations:
> >
> > You are right that some of the existing HAServices implementations are
> > "dead code" at the moment. They are the result of some implementation
> ideas
> > which haven't been completed. I would suggest to start a separate
> > discussion to discuss what to do with them.
> >
> > Cheers,
> > Till
> >
> > On Mon, Sep 9, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote:
> >
> > > Hi devs,
> > >
> > > I'd like to start a discussion thread on the topic how we provide
> > > retrieval services in non-high-availability scenario. To clarify
> > > terminology, non-high-availability scenario refers to
> > > StandaloneHaServices and EmbeddedHaServices.
> > >
> > > ***The problem***
> > >
> > > We notice that retrieval services of current StandaloneHAServices
> > > (pre-configured) and EmbeddedHAServices(in-memory) has their
> > > respective problems.
> > >
> > > For pre-configured scenario, we now have a
> > > getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
> > > to workaround the problem that it is impossible to configure JM
> > > address previously. The parameter defaultJMAddress is not in use in
> > > any other defaultJMAddress with any other high-availability mode.
> > > Also in MiniCluster scenario and anywhere else leader address
> > > pre-configure becomes impossible, StandaloneHAServices cannot be used.
> > >
> > > For in-memory case, it is clearly that it doesn't fit any distributed
> > > scenario.
> > >
> > > ***The proposal***
> > >
> > > In order to address the inconsistency between pre-configured retrieval
> > > services and zookeeper based retrieval services, we reconsider the
> > > promises provided by "non-high-availability" and regard it as
> > > similar services as zookeeper based one except it doesn't tolerate
> > > node failure. Thus, we implement a service acts like a standalone
> > > zookeeper cluster, named LeaderServer.
> > >
> > > A leader server is an actor runs on jobmanager actor system and reacts
> > > to leader contender register and leader retriever request. If
> > > jobmanager fails, the leader server associated fails, too, where
> > > "non-high-availability" stands.
> > >
> > > In order to communicate with leader server, we start leader client per
> > > high-availability services(JM, TM, ClusterClient). When leader
> > > election service starts, it registers the contender to leader server
> > > via leader client(by akka communication); when leader retriever
> > > starts, it registers itself to leader server via leader client.
> > >
> > > Leader server handles leader election internally just like Embedded
> > > implementation, and notify retrievers with new leader information
> > > when there is new leader elected.
> > >
> > > In this way, we unify the view of retrieval services in all scenario:
> > >
> > > 1. Configure a name services to communicate with. In zookeeper mode
> > > it is zookeeper and in non-high-availability mode it is leader server.
> > > 2. Any retrieval request is sent to the name services and is handled
> > > by that services.
> > >
> > > Apart from a unified view, there are other advantages:
> > >
> > > + We need not to use a special method
> > > getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use
> > > getJobManagerLeaderRetriever(JobID). And so that we need not include
> > > JobManager address in slot request which might become stale during
> > > transmission.
> > >
> > > + Separated configuration concerns on launch and retrieval. JobManager
> > > address & port, REST address & port is only configured when launch
> > > a cluster(even in YARN scenario, no need to configure). And when
> > > retrieval requested, configure the connect info to name services(zk
> > > or leader server).
> > >
> > > + Embedded implementation could be also included in this abstraction
> > > without any regression on multiple leader simulation for test purpose.
> > > Actually, leader server acts as a limited standalone zookeeper
> > > cluster. And thus, from where this proposal comes from, when we
> > > refactor metadata storage with transaction store proposed in
> > > FLINK-10333, we only take care of zookeeper implementation and a
> > > unified non-high-availability implementation.
> > >
> > > ***Clean up***
> > >
> > > It is also noticed that there are several stale & unimplemented
> > > high-availability services implementations which I'd like to remove for
> > > a clean codebase work on this thread and FLINK-10333. They are:
> > >
> > > - YarnHighAvailabilityServices
> > > - AbstractYarnNonHaServices
> > > - YarnIntraNonHaMasterServices
> > > - YarnPreConfiguredMasterNonHaServices
> > > - SingleLeaderElectionService
> > > - FsNegativeRunningJobsRegistry
> > >
> > > Any feedback is appreciated.
> > >
> > > Best,
> > > tison.
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Retrieval services in non-high-availability scenario

tison
Hi Till,

Thanks for your quick reply. I'd like to narrow the intention of this
thread as I posted above

>Well, I see your concerns on replace existing stable services hurriedly
with
a new implementation. Here I list the pros and cons of this replacement. If
we
agree that it does good I can provide an neat and full featured
implementation
for preview and see concretely what we add and what we gain. For
integration,
we can then first integrate with MiniCluster and later.

To clarify, the intention of this thread is narrowed to introduce a new
HighAvailabilityServices implementation based on LeaderServer described
above.
For now, we introduce such an implementation aimed at using it in
MiniCluster
scenario and it is a location transparent version of EmbeddedHaServices. It
would
serve as EmbeddedHaServices and be flexible for evolution. Let's defer all
topics
about the concrete evolutions until the implementation converges to be
stable.

A quick answer for a point above possibly raises confusion,

>Why would the job manager switch its address in the non-ha case? We don't
support this if I'm not mistaken.

Yes we don't support this because we fail the whole dispatcher resource
manager
component on job manager failures. It is less than awesome since we can let
Dispatcher the supervisor launch a new job manager to execute the job.
However,
as described above, let's defer all topics about the concrete evolutions
until
the implementation converges to be stable.

Best,
tison.


Till Rohrmann <[hidden email]> 于2019年9月10日周二 下午6:06写道:

> Hi Tison,
>
> thanks for the detailed response. I put some comments inline:
>
> On Tue, Sep 10, 2019 at 10:51 AM Zili Chen <[hidden email]> wrote:
>
> > Hi Till,
> >
> > Thanks for your reply. I agree point 3 and 4 in your email worth a
> > separated
> > thread to discuss. Let me answer your questions and concerns in point 1
> and
> > 2
> > respectively.
> >
> > 1.Lifecycle of LeaderServer and requirement to implement it
> >
> > LeaderServer starts on cluster entrypoint and its lifecycle is bound to
> the
> > lifecycle of cluster entrypoint. That is, when the cluster entrypoint
> > starts,
> > a LeaderServer also starts; and LeaderServer gets shut down when the
> > cluster
> > entrypoint gets shut down. This is because we need to provide services
> > discovery
> > during the cluster is running.
> >
> > For implementation part, conceptually it is a service running on cluster
> > entrypoint which holds in memory services information and can be
> > communicatied
> > with. As our internal specific implementation, LeaderServer is an actor
> > running
> > on the actor system running on cluster entrypoint, which is referred as
> > `commonRpcService`. It is just another unfenced rpc endpoint and required
> > no extra changes to the existing interfaces.
> >
> > Apart from LeaderServer, there is another concept in this implementation,
> > the
> > LeaderClient. LeaderClient forwards register request from election
> service
> > and
> > retrieval service; forwards leader changed message from LeaderServer. As
> > our
> > specific implementation, LeaderClient is an actor and runs on cluster
> > entrypoint, task manager and cluster client.
>
>
> I think these kind of changes to the ClusterEntrypoint deserve a separate
> design and discussion.
>
> >
>
>
> > (1). cluster entrypoint
> >
> > The lifecycle of LeaderClient is like LeaderServer.
> >
> > (2). task manager
> >
> > The lifecycle of LeaderClient is bound to the lifecycle of task manager.
> > Specifically, it runs on `rpcService` starts on task manager runner and
> > stops
> > when the service gets shut down.
> >
> > (3). cluster client
> >
> > The lifecycle of LeaderClient is bound to the ClusterClient. With our
> > codebase,
> > only RestClusterClient should do the adaptation. When start
> ClientHAService
> > based on LeaderClient, it starts a dedicated rpc service on which the
> > LeaderClient runs. The service as well as the LeaderClient gets shut down
> > on
> > RestClusterClient closed, where ClientHAService#close called. It is a
> > transparent implementation inside a specific ClientHAService; thus also,
> no
> > changes to the existing interfaces.
> >
> > 2. The proposal to replace existing non-ha services
> >
> > Well, I see your concerns on replace existing stable services hurriedly
> > with
> > a new implementation. Here I list the pros and cons of this replacement.
> If
> > we
> > agree that it does good I can provide an neat and full featured
> > implementation
> > for preview and see concretely what we add and what we gain. For
> > integration,
> > we can then first integrate with MiniCluster and later.
> >
> > pros:
> >
> > + We don't need to pass the address of job manager among slot request.
> >
> > With the new implementation retriever running on task manager registers
> > itself
> > on the LeaderServer which has a global static address. And the retriever
> > retrieves the address of job manager based on JobID. It is not only unify
> > the
> > interfaces #getJobManagerLeaderRetriever, but reduce the cost on job
> > manager
> > switched.
> >
>
> Why would the job manager switch its address in the non-ha case? We don't
> support this if I'm not mistaken. Moreover, why would this change unify the
> HighAvailabilityServices interface? At the moment there is
> only getJobManagerLeaderRetriever(JobID jobID, String
> defaultJobManagerAddress) which should be used. I acknowledge that it could
> save use the second parameter but I think the benefits would be minor atm.
>
> >
> > Currently, when job manager lost leadership, slots offered to the old job
> > manager
> > are unaware of it immediately. They don't get released until heartbeat
> from
> > job manager timeout. With LeaderServer based implementation, LeaderServer
> > notifies LeaderClient once job manager lost leadership.
> >
>
> This could indeed be an additional signal for the system. However, if the
> LeaderServer runs as part of the ClusterEntrypoint, then it will rarely if
> not never happen that the LeaderServer is still running and the JM
> died/lost leadership.
>
> >
> > + We have a unified implementation in non-ha scenario
> >
> > It can be regarded as a location transparent embedded implementation.
> >
>
> Why do we have remove the StandaloneHaServices for that? Implementation of
> one services implementation should not be relevant for others.
>
> >
> > + We have a unified view of high-availability services
> >
> > LeaderServer based implementation follows the same view of ZooKeeper
> based
> > implementation. Since these high-availability services don't have natural
> > difference from one to the other, we can instead naturally handle them
> > under
> > a unified view.
> >
>
> I don't get this point tbh. Why do you want to impose a ZooKeeper based
> view on ha services if there is a more general one which does not restrict
> us in any way?
>
> >
> > I know that we should still configure the address of LeaderServer, but
> now
> > it
> > is more like connect string of ZooKeeper, instead of address of internal
> > component. In fact, then we can deprecate configuration of job manager
> port
> > and auto detect a port as we do in ZooKeeper based scenario.
> >
>
> Auto selecting the port won't work because the clients need to know where
> to connect to. Moreover, whether you call it quorum address/connect string
> or JM address and port is effectively the same. I admit that we should
> clean it up but at the end of the day it's all the same.
>
> >
> > cons:
> >
> > - Overhead on client side
> >
> > The overhead of transmitting job manager address is lower enough so I
> don't
> > list
> > it as valid pros. Correspondingly, messages between actors among existing
> > actor
> > system are regarded as significant overhead.
> >
> > The visible overhead is that we start a dedicated actor system in
> > RestClusterClient. It is due to the implementation of LeaderServer &
> > LeaderClient based on akka. Conceptually it can be any services but we
> > always
> > introduce some overhead.
> >
> > **contrast to current implementations**
> >
> > LeaderServer based implementation can be regarded as a location
> transparent
> > embedded implementation. Thus there isn't too many contrasts. Also,
> > embedded
> > implementation is used only in MiniCluster scenario where an actor system
> > is
> > already running, so there isn't significant performance concerns.
> >
> > As for pre-configured implementation, named StandaloneHaServices, I agree
> > that
> > it is deadly simple. But apart from the benefit of unification, it is
> > gradually
> > unrealistic to require users pre-configure the port of job manager,
> > especially
> > on cloud native scenario.
>
>
> How would this change with the LeaderServer? Users still need to configure
> the address of it (which includes the hostname and port).
>
>
> > Although the specific implementation couple the
> > address and port of LeaderServer and that of job manager, it is not a
> > fundamental constraint. Thus, LeaderServer based implementation is more
> > flexible for evolution.
> >
>
> Yes this makes sense.
>
> I think it makes sense to add a new HighAvailabilityServices implementation
> based on what you've described. However, what I don't understand is why is
> it so important to remove the existing non-ha HighAvailabilityServices? I
> think we can add a new HAServices implementation which provides all the
> things you've described without touching the others. This has also the
> benefit to do things in incremental steps instead of trying to do
> everything at once which is usually a recipe for disaster.
>
> Cheers,
> Till
>
>
>
>
> > Best,
> > tison.
> >
> >
> > Till Rohrmann <[hidden email]> 于2019年9月9日周一 下午5:37写道:
> >
> > > Hi Tison,
> > >
> > > thanks for starting this discussion. I think your mail includes
> multiple
> > > points which are worth being treated separately (might even make sense
> to
> > > have separate discussion threads). Please correct me if I understood
> > things
> > > wrongly:
> > >
> > > 1. Adding new non-ha HAServices:
> > >
> > > Based on your description I could see the "ZooKeeper-light" non-ha
> > > HAServices implementation work. Would any changes to the existing
> > > interfaces be needed? How would the LeaderServer integrate in the
> > lifecycle
> > > of the cluster entrypoint?
> > >
> > > 2. Replacing existing non-ha HAServices with LeaderServer
> implementation:
> > >
> > > I'm not sure whether we need to enforce that every non-ha HAServices
> > > implementation works as you've described. I think it is pretty much an
> > > implementation detail whether the services talk to a LeaderServer or
> are
> > > being started with a pre-configured address. I also think that it is
> fair
> > > to have different implementations with different characteristics and
> > usage
> > > scenarios. As you've said the EmbeddedHaServices are targeted for
> single
> > > process cluster setups and they are only used by the MiniCluster.
> > >
> > > What I like about the StandaloneHaServices is that they are dead simple
> > > (apart from the configuration). With a new implementation based on the
> > > LeaderServer, the client side implementation becomes much more complex
> > > because now one needs to handle all kind of network issues properly.
> > > Moreover, it adds more complexity to the system because it starts a new
> > > distributed component which needs to be managed. I could see that once
> > the
> > > new implementation has matured enough that it might replace the
> > > EmbeddedHaServices. But I wouldn't start with removing them.
> > >
> > > You are right that due to the fact that we don't know the JM address
> > before
> > > it's being started that we need to send the address with every slot
> > > request. Moreover we have the method
> #getJobManagerLeaderRetriever(JobID,
> > > defaultJMAddress) on the HAServices. While this is not super nice, I
> > don't
> > > think that this is a fundamental problem at the moment. What we pay is
> a
> > > couple of extra bytes we need to send over the network.
> > >
> > > Configuration-wise, I'm not so sure whether we gain too much by
> replacing
> > > the StandaloneHaServices with the LeaderServer based implementation.
> For
> > > the new implementation one needs to configure a static address as well
> at
> > > cluster start-up time. The only benefit I can see is that we don't need
> > to
> > > send the JM address to the RM and TMs. But as I've said, I don't think
> > that
> > > this is a big problem for which we need to introduce new HAServices.
> > > Instead I could see that we might be able to remove it once the
> > > LeaderServer HAServices implementation has proven to be stable.
> > >
> > > 3. Configuration of HAServices:
> > >
> > > I agree that Flink's address and port configuration is not done
> > > consistently. I might make sense to group the address and port
> > > configuration under the ha service configuration section. Maybe it
> makes
> > > also sense to rename ha services into ServiceDiscovery because it also
> > > works in the non-ha case. it could be possible to only configure
> address
> > > and port if one is using the non-ha services, for example. However,
> this
> > > definitely deserves a separate discussion and design because one needs
> to
> > > check where exactly the respective configuration options are being
> used.
> > >
> > > I think improving the configuration of HAServices is actually
> orthogonal
> > to
> > > introducing the LeaderServer HAServices implementation and could also
> be
> > > done for the existing HAServices.
> > >
> > > 4. Clean up of HAServices implementations:
> > >
> > > You are right that some of the existing HAServices implementations are
> > > "dead code" at the moment. They are the result of some implementation
> > ideas
> > > which haven't been completed. I would suggest to start a separate
> > > discussion to discuss what to do with them.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Sep 9, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I'd like to start a discussion thread on the topic how we provide
> > > > retrieval services in non-high-availability scenario. To clarify
> > > > terminology, non-high-availability scenario refers to
> > > > StandaloneHaServices and EmbeddedHaServices.
> > > >
> > > > ***The problem***
> > > >
> > > > We notice that retrieval services of current StandaloneHAServices
> > > > (pre-configured) and EmbeddedHAServices(in-memory) has their
> > > > respective problems.
> > > >
> > > > For pre-configured scenario, we now have a
> > > > getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
> > > > to workaround the problem that it is impossible to configure JM
> > > > address previously. The parameter defaultJMAddress is not in use in
> > > > any other defaultJMAddress with any other high-availability mode.
> > > > Also in MiniCluster scenario and anywhere else leader address
> > > > pre-configure becomes impossible, StandaloneHAServices cannot be
> used.
> > > >
> > > > For in-memory case, it is clearly that it doesn't fit any distributed
> > > > scenario.
> > > >
> > > > ***The proposal***
> > > >
> > > > In order to address the inconsistency between pre-configured
> retrieval
> > > > services and zookeeper based retrieval services, we reconsider the
> > > > promises provided by "non-high-availability" and regard it as
> > > > similar services as zookeeper based one except it doesn't tolerate
> > > > node failure. Thus, we implement a service acts like a standalone
> > > > zookeeper cluster, named LeaderServer.
> > > >
> > > > A leader server is an actor runs on jobmanager actor system and
> reacts
> > > > to leader contender register and leader retriever request. If
> > > > jobmanager fails, the leader server associated fails, too, where
> > > > "non-high-availability" stands.
> > > >
> > > > In order to communicate with leader server, we start leader client
> per
> > > > high-availability services(JM, TM, ClusterClient). When leader
> > > > election service starts, it registers the contender to leader server
> > > > via leader client(by akka communication); when leader retriever
> > > > starts, it registers itself to leader server via leader client.
> > > >
> > > > Leader server handles leader election internally just like Embedded
> > > > implementation, and notify retrievers with new leader information
> > > > when there is new leader elected.
> > > >
> > > > In this way, we unify the view of retrieval services in all scenario:
> > > >
> > > > 1. Configure a name services to communicate with. In zookeeper mode
> > > > it is zookeeper and in non-high-availability mode it is leader
> server.
> > > > 2. Any retrieval request is sent to the name services and is handled
> > > > by that services.
> > > >
> > > > Apart from a unified view, there are other advantages:
> > > >
> > > > + We need not to use a special method
> > > > getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use
> > > > getJobManagerLeaderRetriever(JobID). And so that we need not include
> > > > JobManager address in slot request which might become stale during
> > > > transmission.
> > > >
> > > > + Separated configuration concerns on launch and retrieval.
> JobManager
> > > > address & port, REST address & port is only configured when launch
> > > > a cluster(even in YARN scenario, no need to configure). And when
> > > > retrieval requested, configure the connect info to name services(zk
> > > > or leader server).
> > > >
> > > > + Embedded implementation could be also included in this abstraction
> > > > without any regression on multiple leader simulation for test
> purpose.
> > > > Actually, leader server acts as a limited standalone zookeeper
> > > > cluster. And thus, from where this proposal comes from, when we
> > > > refactor metadata storage with transaction store proposed in
> > > > FLINK-10333, we only take care of zookeeper implementation and a
> > > > unified non-high-availability implementation.
> > > >
> > > > ***Clean up***
> > > >
> > > > It is also noticed that there are several stale & unimplemented
> > > > high-availability services implementations which I'd like to remove
> for
> > > > a clean codebase work on this thread and FLINK-10333. They are:
> > > >
> > > > - YarnHighAvailabilityServices
> > > > - AbstractYarnNonHaServices
> > > > - YarnIntraNonHaMasterServices
> > > > - YarnPreConfiguredMasterNonHaServices
> > > > - SingleLeaderElectionService
> > > > - FsNegativeRunningJobsRegistry
> > > >
> > > > Any feedback is appreciated.
> > > >
> > > > Best,
> > > > tison.
> > > >
> > >
> >
>