Hi all
I've been working on a Parameter Server for Flink and I have some basic functionality up, which supports registering a key, plus setting, updating and fetching a parameter. The idea is to start a Parameter Server somewhere and pass the address and port in a configuration for another actor system to access it. Right now, I have a standalone module for this, named flink-server under staging. There is a {{ParameterClient}} which allows users to do the above operations in a blocking fashion by waiting on a result from the server. You can look at the code here: https://github.com/apache/flink/compare/master...sachingoel0101:parameter_server [It is highly derived from the JobManager implementation.] One obvious thing to do is to ensure there are several servers which can serve data to users. This can help achieve redundancy too by copying data over several servers and keeping them synchronized. 1. We can follow a slave model where starting a server anywhere starts a server on all slave machines. After this, I plan to copy a key-value pair on several machines by computing their hashes [key's and server's UUID's] modulo #servers. This way every server knows where exactly all the keys are residing. This however has a problem at the time of failures. If a server fails, we need to recompute the modulo values and re-distribute almost all of the data to maintain redundancy. 2. Another method is, for every task manager started, inside the same system, one server should be started and this server will handle all data transfers from the tasks running inside the particular TaskManager. This way, whenever there is a failure of a machine, the JobManager at least knows and can let other TaskManagers and their servers know of the failure of their fellow server. Since the Job Manager is maintaining a list of the servers/task-managers, we can maintain a indexed list of servers very easily. Then it's just a matter of mapping a key to an index in the JobManager's instance list. [Of course, I'm assuming it would be hard to assign indexes to servers in a standalone fashion such that everyone has the exact same view]. I'm more in favor of 2, since we after all need to utilize this in iterative algorithms, and that will need integration into task manager and runtime context anyways. Plus, having a master to control everything makes everything easy. :') What do you guys think? Cheers! Sachin PS. Sorry about the long email on a weekend. -- Sachin Goel Computer Science, IIT Delhi m. +91-9871457685 |
Very interesting addition, Sachin!
I cc-ed Nam-Luc Tran, who recently opened a pull request with a draft for SSP iterations in Flink, which usually work closely together with a parameter server. I think that a parameter server is an orthogonal piece to Flink, and should be decoupled from the runtime. But it would be great to have some common tooling and abstraction for this: - Sachin's parameter server is built on Akka - Nam-Luc used Apache Ignite for distributed Key/Value storage - There are several other dedicated parameter server projects (such as https://github.com/dmlc/parameter_server) How about creating a common abstraction with an interface that supports startup of the distributed model store, get/update, staleness, ... ? The different technologies would then be pluggable behind this interface. Since this is largely decoupled from Flink, and probably involves a few people, it might even make sense to create a dedicated GitHub project, and later add it as a whole to Flink (or keep it independent, what ever works better). What do you think? Greetings, Stephan On Sat, Aug 8, 2015 at 4:04 AM, Sachin Goel <[hidden email]> wrote: > Hi all > I've been working on a Parameter Server for Flink and I have some basic > functionality up, which supports registering a key, plus setting, updating > and fetching a parameter. > > The idea is to start a Parameter Server somewhere and pass the address and > port in a configuration for another actor system to access it. Right now, I > have a standalone module for this, named flink-server under staging. > > There is a {{ParameterClient}} which allows users to do the above > operations in a blocking fashion by waiting on a result from the server. > You can look at the code here: > > https://github.com/apache/flink/compare/master...sachingoel0101:parameter_server > [It is highly derived from the JobManager implementation.] > > One obvious thing to do is to ensure there are several servers which can > serve data to users. This can help achieve redundancy too by copying data > over several servers and keeping them synchronized. > > 1. We can follow a slave model where starting a server anywhere starts a > server on all slave machines. After this, I plan to copy a key-value pair > on several machines by computing their hashes [key's and server's UUID's] > modulo #servers. This way every server knows where exactly all the keys are > residing. This however has a problem at the time of failures. > If a server fails, we need to recompute the modulo values and re-distribute > almost all of the data to maintain redundancy. > > 2. Another method is, for every task manager started, inside the same > system, one server should be started and this server will handle all data > transfers from the tasks running inside the particular TaskManager. This > way, whenever there is a failure of a machine, the JobManager at least > knows and can let other TaskManagers and their servers know of the failure > of their fellow server. > Since the Job Manager is maintaining a list of the servers/task-managers, > we can maintain a indexed list of servers very easily. Then it's just a > matter of mapping a key to an index in the JobManager's instance list. > [Of course, I'm assuming it would be hard to assign indexes to servers in a > standalone fashion such that everyone has the exact same view]. > > I'm more in favor of 2, since we after all need to utilize this in > iterative algorithms, and that will need integration into task manager and > runtime context anyways. Plus, having a master to control everything makes > everything easy. :') > > What do you guys think? > > Cheers! > Sachin > > PS. Sorry about the long email on a weekend. > > -- Sachin Goel > Computer Science, IIT Delhi > m. +91-9871457685 > |
Hi Stephan
I opened a PR for a version integrated with the flink runtime itself [which is a lot more evolved than the branch I mentioned on this thread :') ]. https://github.com/apache/flink/pull/1003 I was more inclined towards an integrated implementation and finished it over the weekend. The main idea is that, one could easily use Apache Ignite etc. for the server, and you're spot on in saying that we should provide an abstraction to connect those. But, like you said in the discussion on #967, it wasn't very encouraging to include a big dependency into flink. I've implemented a version using only Akka, and it can easily be made a core component which is accessible to the user from Runtime context. Furthermore, I've implemented the two update strategies: Sequential consistency [Batch] and Eventual consistency [Asynchronous]. Still working on Stale synchronous. Have a look at the PR and let me know your thoughts. I had a look at an ongoing discussion on the Spark issues page about the Parameter Server, and it does appear more suitable to have a Parameter Server in the core api. We can, for optimization, allow the user to specify in the configuration file whether they need it turned on or not, however. Cheers! Sachin -- Sachin Goel Computer Science, IIT Delhi m. +91-9871457685 On Mon, Aug 10, 2015 at 11:12 PM, Stephan Ewen <[hidden email]> wrote: > Very interesting addition, Sachin! > > I cc-ed Nam-Luc Tran, who recently opened a pull request with a draft for > SSP iterations in Flink, which usually work closely together with a > parameter server. > > I think that a parameter server is an orthogonal piece to Flink, and should > be decoupled from the runtime. > But it would be great to have some common tooling and abstraction for this: > > - Sachin's parameter server is built on Akka > - Nam-Luc used Apache Ignite for distributed Key/Value storage > - There are several other dedicated parameter server projects (such as > https://github.com/dmlc/parameter_server) > > > How about creating a common abstraction with an interface that supports > startup of the distributed model store, get/update, staleness, ... ? > The different technologies would then be pluggable behind this interface. > > Since this is largely decoupled from Flink, and probably involves a few > people, it might even make sense to create a dedicated GitHub project, > and later add it as a whole to Flink (or keep it independent, what ever > works better). > > What do you think? > > Greetings, > Stephan > > > > On Sat, Aug 8, 2015 at 4:04 AM, Sachin Goel <[hidden email]> > wrote: > > > Hi all > > I've been working on a Parameter Server for Flink and I have some basic > > functionality up, which supports registering a key, plus setting, > updating > > and fetching a parameter. > > > > The idea is to start a Parameter Server somewhere and pass the address > and > > port in a configuration for another actor system to access it. Right > now, I > > have a standalone module for this, named flink-server under staging. > > > > There is a {{ParameterClient}} which allows users to do the above > > operations in a blocking fashion by waiting on a result from the server. > > You can look at the code here: > > > > > https://github.com/apache/flink/compare/master...sachingoel0101:parameter_server > > [It is highly derived from the JobManager implementation.] > > > > One obvious thing to do is to ensure there are several servers which can > > serve data to users. This can help achieve redundancy too by copying data > > over several servers and keeping them synchronized. > > > > 1. We can follow a slave model where starting a server anywhere starts a > > server on all slave machines. After this, I plan to copy a key-value pair > > on several machines by computing their hashes [key's and server's UUID's] > > modulo #servers. This way every server knows where exactly all the keys > are > > residing. This however has a problem at the time of failures. > > If a server fails, we need to recompute the modulo values and > re-distribute > > almost all of the data to maintain redundancy. > > > > 2. Another method is, for every task manager started, inside the same > > system, one server should be started and this server will handle all data > > transfers from the tasks running inside the particular TaskManager. This > > way, whenever there is a failure of a machine, the JobManager at least > > knows and can let other TaskManagers and their servers know of the > failure > > of their fellow server. > > Since the Job Manager is maintaining a list of the servers/task-managers, > > we can maintain a indexed list of servers very easily. Then it's just a > > matter of mapping a key to an index in the JobManager's instance list. > > [Of course, I'm assuming it would be hard to assign indexes to servers > in a > > standalone fashion such that everyone has the exact same view]. > > > > I'm more in favor of 2, since we after all need to utilize this in > > iterative algorithms, and that will need integration into task manager > and > > runtime context anyways. Plus, having a master to control everything > makes > > everything easy. :') > > > > What do you guys think? > > > > Cheers! > > Sachin > > > > PS. Sorry about the long email on a weekend. > > > > -- Sachin Goel > > Computer Science, IIT Delhi > > m. +91-9871457685 > > > |
Free forum by Nabble | Edit this page |