Parameter Server for Flink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Parameter Server for Flink

Sachin Goel
Hi all
I've been working on a Parameter Server for Flink and I have some basic
functionality up, which supports registering a key, plus setting, updating
and fetching a parameter.

The idea is to start a Parameter Server somewhere and pass the address and
port in a configuration for another actor system to access it. Right now, I
have a standalone module for this, named flink-server under staging.

There is a {{ParameterClient}} which allows users to do the above
operations in a blocking fashion by waiting on a result from the server.
You can look at the code here:
https://github.com/apache/flink/compare/master...sachingoel0101:parameter_server
[It is highly derived from the JobManager implementation.]

One obvious thing to do is to ensure there are several servers which can
serve data to users. This can help achieve redundancy too by copying data
over several servers and keeping them synchronized.

1.  We can follow a slave model where starting a server anywhere starts a
server on all slave machines. After this, I plan to copy a key-value pair
on several machines by computing their hashes [key's and server's UUID's]
modulo #servers. This way every server knows where exactly all the keys are
residing. This however has a problem at the time of failures.
If a server fails, we need to recompute the modulo values and re-distribute
almost all of the data to maintain redundancy.

2. Another method is, for every task manager started, inside the same
system, one server should be started and this server will handle all data
transfers from the tasks running inside the particular TaskManager. This
way, whenever there is a failure of a machine, the JobManager at least
knows and can let other TaskManagers and their servers know of the failure
of their fellow server.
Since the Job Manager is maintaining a list of the servers/task-managers,
we can maintain a indexed list of servers very easily. Then it's just a
matter of mapping a key to an index in the JobManager's instance list.
[Of course, I'm assuming it would be hard to assign indexes to servers in a
standalone fashion such that everyone has the exact same view].

I'm more in favor of 2, since we after all need to utilize this in
iterative algorithms, and that will need integration into task manager and
runtime context anyways. Plus, having a master to control everything makes
everything easy. :')

What do you guys think?

Cheers!
Sachin

PS. Sorry about the long email on a weekend.

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685
Reply | Threaded
Open this post in threaded view
|

Re: Parameter Server for Flink

Stephan Ewen
Very interesting addition, Sachin!

I cc-ed Nam-Luc Tran, who recently opened a pull request with a draft for
SSP iterations in Flink, which usually work closely together with a
parameter server.

I think that a parameter server is an orthogonal piece to Flink, and should
be decoupled from the runtime.
But it would be great to have some common tooling and abstraction for this:

 - Sachin's parameter server is built on Akka
 - Nam-Luc used Apache Ignite for distributed Key/Value storage
 - There are several other dedicated parameter server projects (such as
https://github.com/dmlc/parameter_server)


How about creating a common abstraction with an interface that supports
startup of the distributed model store, get/update, staleness, ... ?
The different technologies would then be pluggable behind this interface.

Since this is largely decoupled from Flink, and probably involves a few
people, it might even make sense to create a dedicated GitHub project,
and later add it as a whole to Flink (or keep it independent, what ever
works better).

What do you think?

Greetings,
Stephan



On Sat, Aug 8, 2015 at 4:04 AM, Sachin Goel <[hidden email]>
wrote:

> Hi all
> I've been working on a Parameter Server for Flink and I have some basic
> functionality up, which supports registering a key, plus setting, updating
> and fetching a parameter.
>
> The idea is to start a Parameter Server somewhere and pass the address and
> port in a configuration for another actor system to access it. Right now, I
> have a standalone module for this, named flink-server under staging.
>
> There is a {{ParameterClient}} which allows users to do the above
> operations in a blocking fashion by waiting on a result from the server.
> You can look at the code here:
>
> https://github.com/apache/flink/compare/master...sachingoel0101:parameter_server
> [It is highly derived from the JobManager implementation.]
>
> One obvious thing to do is to ensure there are several servers which can
> serve data to users. This can help achieve redundancy too by copying data
> over several servers and keeping them synchronized.
>
> 1.  We can follow a slave model where starting a server anywhere starts a
> server on all slave machines. After this, I plan to copy a key-value pair
> on several machines by computing their hashes [key's and server's UUID's]
> modulo #servers. This way every server knows where exactly all the keys are
> residing. This however has a problem at the time of failures.
> If a server fails, we need to recompute the modulo values and re-distribute
> almost all of the data to maintain redundancy.
>
> 2. Another method is, for every task manager started, inside the same
> system, one server should be started and this server will handle all data
> transfers from the tasks running inside the particular TaskManager. This
> way, whenever there is a failure of a machine, the JobManager at least
> knows and can let other TaskManagers and their servers know of the failure
> of their fellow server.
> Since the Job Manager is maintaining a list of the servers/task-managers,
> we can maintain a indexed list of servers very easily. Then it's just a
> matter of mapping a key to an index in the JobManager's instance list.
> [Of course, I'm assuming it would be hard to assign indexes to servers in a
> standalone fashion such that everyone has the exact same view].
>
> I'm more in favor of 2, since we after all need to utilize this in
> iterative algorithms, and that will need integration into task manager and
> runtime context anyways. Plus, having a master to control everything makes
> everything easy. :')
>
> What do you guys think?
>
> Cheers!
> Sachin
>
> PS. Sorry about the long email on a weekend.
>
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
>
Reply | Threaded
Open this post in threaded view
|

Re: Parameter Server for Flink

Sachin Goel
Hi Stephan
I opened a PR for a version integrated with the flink runtime itself [which
is a lot more evolved than the branch I mentioned on this thread :') ].
https://github.com/apache/flink/pull/1003
I was more inclined towards an integrated implementation and finished it
over the weekend. The main idea is that, one could easily use Apache Ignite
etc. for the server, and you're spot on in saying that we should provide an
abstraction to connect those. But, like you said in the discussion on #967,
it wasn't very encouraging to include a big dependency into flink.
I've implemented a version using only Akka, and it can easily be made a
core component which is accessible to the user from Runtime context.
Furthermore, I've implemented the two update strategies: Sequential
consistency [Batch] and Eventual consistency [Asynchronous]. Still working
on Stale synchronous. Have a look at the PR and let me know your thoughts.
I had a look at an ongoing discussion on the Spark issues page about the
Parameter Server, and it does appear more suitable to have a Parameter
Server in the core api. We can, for optimization, allow the user to specify
in the configuration file whether they need it turned on or not, however.

Cheers!
Sachin

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685

On Mon, Aug 10, 2015 at 11:12 PM, Stephan Ewen <[hidden email]> wrote:

> Very interesting addition, Sachin!
>
> I cc-ed Nam-Luc Tran, who recently opened a pull request with a draft for
> SSP iterations in Flink, which usually work closely together with a
> parameter server.
>
> I think that a parameter server is an orthogonal piece to Flink, and should
> be decoupled from the runtime.
> But it would be great to have some common tooling and abstraction for this:
>
>  - Sachin's parameter server is built on Akka
>  - Nam-Luc used Apache Ignite for distributed Key/Value storage
>  - There are several other dedicated parameter server projects (such as
> https://github.com/dmlc/parameter_server)
>
>
> How about creating a common abstraction with an interface that supports
> startup of the distributed model store, get/update, staleness, ... ?
> The different technologies would then be pluggable behind this interface.
>
> Since this is largely decoupled from Flink, and probably involves a few
> people, it might even make sense to create a dedicated GitHub project,
> and later add it as a whole to Flink (or keep it independent, what ever
> works better).
>
> What do you think?
>
> Greetings,
> Stephan
>
>
>
> On Sat, Aug 8, 2015 at 4:04 AM, Sachin Goel <[hidden email]>
> wrote:
>
> > Hi all
> > I've been working on a Parameter Server for Flink and I have some basic
> > functionality up, which supports registering a key, plus setting,
> updating
> > and fetching a parameter.
> >
> > The idea is to start a Parameter Server somewhere and pass the address
> and
> > port in a configuration for another actor system to access it. Right
> now, I
> > have a standalone module for this, named flink-server under staging.
> >
> > There is a {{ParameterClient}} which allows users to do the above
> > operations in a blocking fashion by waiting on a result from the server.
> > You can look at the code here:
> >
> >
> https://github.com/apache/flink/compare/master...sachingoel0101:parameter_server
> > [It is highly derived from the JobManager implementation.]
> >
> > One obvious thing to do is to ensure there are several servers which can
> > serve data to users. This can help achieve redundancy too by copying data
> > over several servers and keeping them synchronized.
> >
> > 1.  We can follow a slave model where starting a server anywhere starts a
> > server on all slave machines. After this, I plan to copy a key-value pair
> > on several machines by computing their hashes [key's and server's UUID's]
> > modulo #servers. This way every server knows where exactly all the keys
> are
> > residing. This however has a problem at the time of failures.
> > If a server fails, we need to recompute the modulo values and
> re-distribute
> > almost all of the data to maintain redundancy.
> >
> > 2. Another method is, for every task manager started, inside the same
> > system, one server should be started and this server will handle all data
> > transfers from the tasks running inside the particular TaskManager. This
> > way, whenever there is a failure of a machine, the JobManager at least
> > knows and can let other TaskManagers and their servers know of the
> failure
> > of their fellow server.
> > Since the Job Manager is maintaining a list of the servers/task-managers,
> > we can maintain a indexed list of servers very easily. Then it's just a
> > matter of mapping a key to an index in the JobManager's instance list.
> > [Of course, I'm assuming it would be hard to assign indexes to servers
> in a
> > standalone fashion such that everyone has the exact same view].
> >
> > I'm more in favor of 2, since we after all need to utilize this in
> > iterative algorithms, and that will need integration into task manager
> and
> > runtime context anyways. Plus, having a master to control everything
> makes
> > everything easy. :')
> >
> > What do you guys think?
> >
> > Cheers!
> > Sachin
> >
> > PS. Sorry about the long email on a weekend.
> >
> > -- Sachin Goel
> > Computer Science, IIT Delhi
> > m. +91-9871457685
> >
>