hi guysFlink must take over all the resources all the time. That wastes resources sometime especially in the sharing cluster.For example when using Flink on Yarn the resource can't be returned back to RM even if no graph is running.So I want to know: Is it possible to add some apis in the scheduler to request the resource(slot) and release the resource(slot)?
These apis can be customized in the different circumstance. thanks a lot! |
Hi,
I think for more details on giving back resources of a running cluster we have to wait for Robert's opinion. In the mean time, you can also just run a single job that will bring up some yarn containers and then release them afterward using this: https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn Cheers, Aljoscha On Mon, 27 Jul 2015 at 03:51 MaGuoWei <[hidden email]> wrote: > hi guysFlink must take over all the resources all the time. That wastes > resources sometime especially in the sharing cluster.For example when > using Flink on Yarn the resource can't be returned back to RM even if no > graph is running.So I want to know: Is it possible to add some apis in the > scheduler to request the resource(slot) and release the resource(slot)? > These apis can be customized in the different circumstance. > thanks a lot! > > > |
It is great! Is there any document? I am very interested in this.
thanks > From: [hidden email] > Date: Mon, 27 Jul 2015 05:14:00 +0000 > Subject: Re: add some new api to the scheduler in the job manager > To: [hidden email] > > Hi, > I think for more details on giving back resources of a running cluster we > have to wait for Robert's opinion. In the mean time, you can also just run > a single job that will bring up some yarn containers and then release them > afterward using this: > https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn > > Cheers, > Aljoscha > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <[hidden email]> wrote: > > > hi guysFlink must take over all the resources all the time. That wastes > > resources sometime especially in the sharing cluster.For example when > > using Flink on Yarn the resource can't be returned back to RM even if no > > graph is running.So I want to know: Is it possible to add some apis in the > > scheduler to request the resource(slot) and release the resource(slot)? > > These apis can be customized in the different circumstance. > > thanks a lot! > > > > > > |
Hi MaGuoWei,
If I understand correctly, you're are looking for a way to have a job manager (master) contentiously running which requests YARN containers for the task managers (workers) on the fly. This is currently not supported by Flink although you can add or remove task managers while the cluster is running. The job manager distributes task to the task manager which are available at scheduling time. As Aljoscha mentioned, the most flexible way of sharing resources in a YARN environment is to start a per-job cluster for each job. Cheers, Max On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <[hidden email]> wrote: > It is great! Is there any document? I am very interested in this. > thanks > > > > From: [hidden email] > > Date: Mon, 27 Jul 2015 05:14:00 +0000 > > Subject: Re: add some new api to the scheduler in the job manager > > To: [hidden email] > > > > Hi, > > I think for more details on giving back resources of a running cluster we > > have to wait for Robert's opinion. In the mean time, you can also just > run > > a single job that will bring up some yarn containers and then release > them > > afterward using this: > > > https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn > > > > Cheers, > > Aljoscha > > > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <[hidden email]> wrote: > > > > > hi guysFlink must take over all the resources all the time. That wastes > > > resources sometime especially in the sharing cluster.For example when > > > using Flink on Yarn the resource can't be returned back to RM even if > no > > > graph is running.So I want to know: Is it possible to add some apis in > the > > > scheduler to request the resource(slot) and release the resource(slot)? > > > These apis can be customized in the different circumstance. > > > thanks a lot! > > > > > > > > > > > |
Hi MaGuoWei,
would you like to have done automatically by Flink or based on some user input? Adding commands to the ./bin/yarn-session.sh to change the cluster size is quite easy. However, reducing the cluster size while a job is running will fail the job. Making this automatically is much harder. On Mon, Jul 27, 2015 at 12:07 PM, Maximilian Michels <[hidden email]> wrote: > Hi MaGuoWei, > > If I understand correctly, you're are looking for a way to have a job > manager (master) contentiously running which requests YARN containers for > the task managers (workers) on the fly. This is currently not supported by > Flink although you can add or remove task managers while the cluster is > running. The job manager distributes task to the task manager which are > available at scheduling time. > > As Aljoscha mentioned, the most flexible way of sharing resources in a YARN > environment is to start a per-job cluster for each job. > > Cheers, > Max > > On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <[hidden email]> wrote: > > > It is great! Is there any document? I am very interested in this. > > thanks > > > > > > > From: [hidden email] > > > Date: Mon, 27 Jul 2015 05:14:00 +0000 > > > Subject: Re: add some new api to the scheduler in the job manager > > > To: [hidden email] > > > > > > Hi, > > > I think for more details on giving back resources of a running cluster > we > > > have to wait for Robert's opinion. In the mean time, you can also just > > run > > > a single job that will bring up some yarn containers and then release > > them > > > afterward using this: > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn > > > > > > Cheers, > > > Aljoscha > > > > > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <[hidden email]> wrote: > > > > > > > hi guysFlink must take over all the resources all the time. That > wastes > > > > resources sometime especially in the sharing cluster.For example > when > > > > using Flink on Yarn the resource can't be returned back to RM even if > > no > > > > graph is running.So I want to know: Is it possible to add some apis > in > > the > > > > scheduler to request the resource(slot) and release the > resource(slot)? > > > > These apis can be customized in the different circumstance. > > > > thanks a lot! > > > > > > > > > > > > > > > > > |
thanks all you guys.Now I know I can achieve this goal by creating a cluster per topology and estimating the cluster size by analyzing the JobGraph(or by some user input).But I think it can be more beautiful if Flink can abstract some common resource api such as get/release/deploy. :-)
thanks. > From: [hidden email] > Date: Mon, 27 Jul 2015 14:01:11 +0200 > Subject: Re: add some new api to the scheduler in the job manager > To: [hidden email] > > Hi MaGuoWei, > > would you like to have done automatically by Flink or based on some user > input? > Adding commands to the ./bin/yarn-session.sh to change the cluster size is > quite easy. However, reducing the cluster size while a job is running will > fail the job. > > Making this automatically is much harder. > > On Mon, Jul 27, 2015 at 12:07 PM, Maximilian Michels <[hidden email]> wrote: > > > Hi MaGuoWei, > > > > If I understand correctly, you're are looking for a way to have a job > > manager (master) contentiously running which requests YARN containers for > > the task managers (workers) on the fly. This is currently not supported by > > Flink although you can add or remove task managers while the cluster is > > running. The job manager distributes task to the task manager which are > > available at scheduling time. > > > > As Aljoscha mentioned, the most flexible way of sharing resources in a YARN > > environment is to start a per-job cluster for each job. > > > > Cheers, > > Max > > > > On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <[hidden email]> wrote: > > > > > It is great! Is there any document? I am very interested in this. > > > thanks > > > > > > > > > > From: [hidden email] > > > > Date: Mon, 27 Jul 2015 05:14:00 +0000 > > > > Subject: Re: add some new api to the scheduler in the job manager > > > > To: [hidden email] > > > > > > > > Hi, > > > > I think for more details on giving back resources of a running cluster > > we > > > > have to wait for Robert's opinion. In the mean time, you can also just > > > run > > > > a single job that will bring up some yarn containers and then release > > > them > > > > afterward using this: > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn > > > > > > > > Cheers, > > > > Aljoscha > > > > > > > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <[hidden email]> wrote: > > > > > > > > > hi guysFlink must take over all the resources all the time. That > > wastes > > > > > resources sometime especially in the sharing cluster.For example > > when > > > > > using Flink on Yarn the resource can't be returned back to RM even if > > > no > > > > > graph is running.So I want to know: Is it possible to add some apis > > in > > > the > > > > > scheduler to request the resource(slot) and release the > > resource(slot)? > > > > > These apis can be customized in the different circumstance. > > > > > thanks a lot! > > > > > > > > > > > > > > > > > > > > > > > |
I think what MaGuoWei is looking for is "intra-job" YARN elasticity.
Adding these hooks is going to be important also to trigger streaming jobs to scale in/out during runtime. On Tue, Jul 28, 2015 at 3:27 AM, MaGuoWei <[hidden email]> wrote: > thanks all you guys.Now I know I can achieve this goal by creating a > cluster per topology and estimating the cluster size by analyzing the > JobGraph(or by some user input).But I think it can be more beautiful if > Flink can abstract some common resource api such as get/release/deploy. :-) > thanks. > > > > > From: [hidden email] > > Date: Mon, 27 Jul 2015 14:01:11 +0200 > > Subject: Re: add some new api to the scheduler in the job manager > > To: [hidden email] > > > > Hi MaGuoWei, > > > > would you like to have done automatically by Flink or based on some user > > input? > > Adding commands to the ./bin/yarn-session.sh to change the cluster size > is > > quite easy. However, reducing the cluster size while a job is running > will > > fail the job. > > > > Making this automatically is much harder. > > > > On Mon, Jul 27, 2015 at 12:07 PM, Maximilian Michels <[hidden email]> > wrote: > > > > > Hi MaGuoWei, > > > > > > If I understand correctly, you're are looking for a way to have a job > > > manager (master) contentiously running which requests YARN containers > for > > > the task managers (workers) on the fly. This is currently not > supported by > > > Flink although you can add or remove task managers while the cluster is > > > running. The job manager distributes task to the task manager which are > > > available at scheduling time. > > > > > > As Aljoscha mentioned, the most flexible way of sharing resources in a > YARN > > > environment is to start a per-job cluster for each job. > > > > > > Cheers, > > > Max > > > > > > On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <[hidden email]> > wrote: > > > > > > > It is great! Is there any document? I am very interested in this. > > > > thanks > > > > > > > > > > > > > From: [hidden email] > > > > > Date: Mon, 27 Jul 2015 05:14:00 +0000 > > > > > Subject: Re: add some new api to the scheduler in the job manager > > > > > To: [hidden email] > > > > > > > > > > Hi, > > > > > I think for more details on giving back resources of a running > cluster > > > we > > > > > have to wait for Robert's opinion. In the mean time, you can also > just > > > > run > > > > > a single job that will bring up some yarn containers and then > release > > > > them > > > > > afterward using this: > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn > > > > > > > > > > Cheers, > > > > > Aljoscha > > > > > > > > > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <[hidden email]> > wrote: > > > > > > > > > > > hi guysFlink must take over all the resources all the time. That > > > wastes > > > > > > resources sometime especially in the sharing cluster.For example > > > when > > > > > > using Flink on Yarn the resource can't be returned back to RM > even if > > > > no > > > > > > graph is running.So I want to know: Is it possible to add some > apis > > > in > > > > the > > > > > > scheduler to request the resource(slot) and release the > > > resource(slot)? > > > > > > These apis can be customized in the different circumstance. > > > > > > thanks a lot! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
To properly abstract the resource manager behind an interface, it would be
good to see what the common overlap of the YARN integration, and the pending mesos integration is. Can anyone working on this chime in? On Tue, Jul 28, 2015 at 1:08 PM, Stephan Ewen <[hidden email]> wrote: > I think what MaGuoWei is looking for is "intra-job" YARN elasticity. > > Adding these hooks is going to be important also to trigger streaming jobs > to scale in/out during runtime. > > On Tue, Jul 28, 2015 at 3:27 AM, MaGuoWei <[hidden email]> wrote: > >> thanks all you guys.Now I know I can achieve this goal by creating a >> cluster per topology and estimating the cluster size by analyzing the >> JobGraph(or by some user input).But I think it can be more beautiful if >> Flink can abstract some common resource api such as get/release/deploy. :-) >> thanks. >> >> >> >> > From: [hidden email] >> > Date: Mon, 27 Jul 2015 14:01:11 +0200 >> > Subject: Re: add some new api to the scheduler in the job manager >> > To: [hidden email] >> > >> > Hi MaGuoWei, >> > >> > would you like to have done automatically by Flink or based on some user >> > input? >> > Adding commands to the ./bin/yarn-session.sh to change the cluster size >> is >> > quite easy. However, reducing the cluster size while a job is running >> will >> > fail the job. >> > >> > Making this automatically is much harder. >> > >> > On Mon, Jul 27, 2015 at 12:07 PM, Maximilian Michels <[hidden email]> >> wrote: >> > >> > > Hi MaGuoWei, >> > > >> > > If I understand correctly, you're are looking for a way to have a job >> > > manager (master) contentiously running which requests YARN containers >> for >> > > the task managers (workers) on the fly. This is currently not >> supported by >> > > Flink although you can add or remove task managers while the cluster >> is >> > > running. The job manager distributes task to the task manager which >> are >> > > available at scheduling time. >> > > >> > > As Aljoscha mentioned, the most flexible way of sharing resources in >> a YARN >> > > environment is to start a per-job cluster for each job. >> > > >> > > Cheers, >> > > Max >> > > >> > > On Mon, Jul 27, 2015 at 11:19 AM, MaGuoWei <[hidden email]> >> wrote: >> > > >> > > > It is great! Is there any document? I am very interested in this. >> > > > thanks >> > > > >> > > > >> > > > > From: [hidden email] >> > > > > Date: Mon, 27 Jul 2015 05:14:00 +0000 >> > > > > Subject: Re: add some new api to the scheduler in the job manager >> > > > > To: [hidden email] >> > > > > >> > > > > Hi, >> > > > > I think for more details on giving back resources of a running >> cluster >> > > we >> > > > > have to wait for Robert's opinion. In the mean time, you can also >> just >> > > > run >> > > > > a single job that will bring up some yarn containers and then >> release >> > > > them >> > > > > afterward using this: >> > > > > >> > > > >> > > >> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn >> > > > > >> > > > > Cheers, >> > > > > Aljoscha >> > > > > >> > > > > On Mon, 27 Jul 2015 at 03:51 MaGuoWei <[hidden email]> >> wrote: >> > > > > >> > > > > > hi guysFlink must take over all the resources all the time. That >> > > wastes >> > > > > > resources sometime especially in the sharing cluster.For >> example >> > > when >> > > > > > using Flink on Yarn the resource can't be returned back to RM >> even if >> > > > no >> > > > > > graph is running.So I want to know: Is it possible to add some >> apis >> > > in >> > > > the >> > > > > > scheduler to request the resource(slot) and release the >> > > resource(slot)? >> > > > > > These apis can be customized in the different circumstance. >> > > > > > thanks a lot! >> > > > > > >> > > > > > >> > > > > > >> > > > >> > > > >> > > >> >> > > |
Hi Stephan / others interested,
I have been working on the flink-mesos integration and there are definitely some thoughts that I would like to share some thoughts about the commonalities with the flink-yarn integration. * Both flink-mesos and flink-yarn integration as they stand today can be considered as "coarse-grained" scheduling integrations. This means that the tasks that are spawned (the task managers) are long-lived. * MaGuoWei is referring to something (as correctly identified by Stephan) that I like to call "fine-grained" scheduling integration where, the task managers are relinquished by the framework when they aren't being utilised by Flink. This means that when the next job is executed, the job manager and/or framework will spawn new task managers. This also has an implied requirement that each taskManager runs one task and is then discarded. * Coarse-grained scheduling is preferable when we want interactive (sub-second response) and waiting for a resource offer to be accepted and a new taskManager JVM spin up time is not acceptable. The downside is that long running tasks means that it may lead to underutilisation of the shared cluster. * Fine-grained scheduling is preferable when a little delay (due to starting a new taskManager JVM) is acceptable. This means that we will have higher utilisation of the cluster in a shared setting as resources that aren't being used are relinquished. But, we need to be a lot more extensive about this approach. Some of the cases that I can think of are: * The jobManager/integration-framework may need to monitor the utilisation of the taskManagers and kill of taskManagers based on some cool-down timeout. * The taskManagers that are being killed off may have resources that are needed but other tasks so they can't always be killed off (files/intermediate results etc). This means that there needs to be some sort of "are you idle?" handshake that needs to be done. * I like "fine-grained" mode but there may need to be a middle ground where tasks are "coarse-grained" i.e. run multiple operators and once idle for a certain amount of time, they are reaped/killed-off by the jobManager/integration-framework. * Ideally, we would want to isolate the logic (a general scheduler) that says "get me a slot meeting constraints X" into one module which utilises another module (Yarn or Mesos) that takes such a request and satisfies the needs of the former. This idea is sort of inspired from the way this separation exists in apache spark and seems to work out well. * I don't know the codebase well enough to say where these things go based on my reading of the overall architecture of the system, there is nothing that can't be satisfied by the flink-runtime and it *should* not need any detailed access to the execution plan. I'll defer this to someone who knows the internals better. |
Hi,
I'm sorry for the late reply, I'm still working through a long email backlog from a one week vacation ;) Thank you for the long reply in this thread. Your observations are very good. I think we should indeed work towards a more fine grained "intra-job" elasticity. Let me comment on some of your statements below ... * The taskManagers that are being killed off may have resources that are > needed but other tasks so they can't always be killed off > (files/intermediate results etc). This means that there needs to be some > sort of "are you idle?" handshake that needs to be done. I think here we have to distinguish between streaming and batch API jobs here. - For deployed streaming jobs, its usually impossible to take away TaskManagers, because we are working on infinite streams (tasks are never done). The simplest thing we can do is stopping machines where no tasks are deployed to. As Stephan mentioned, dynamic scaling of streaming jobs is certainly something interesting for the future. There, we would need a component which is implementing some sort of scaling policy (for example based on throughput, load or latency). For up or down scaling, we would then redeploy a job. For this feature, we certainly need nicely abstracted APIs for YARN and Mesos to alter the running cluster. - For batch jobs which are usually executed in a pipelined = streaming fashion, we would need to execute them in a batch-fashion. (Otherwise, tasks do not finish one after another)) Flink's runtime has already support for that. With some additional logic, allowing us to recognize when an intermediate dataset has been fully consumed by downstream tasks, we can safely deallocate machines in a Flink cluster. I think such a logic can be implemented in the JobManager's scheduler. * Ideally, we would want to isolate the logic (a general scheduler) that > says "get me a slot meeting constraints X" into one module which utilises > another module (Yarn or Mesos) that takes such a request and satisfies the > needs of the former. This idea is sort of inspired from the way this > separation exists in apache spark and seems to work out well. The JobManager of Flink has a component which is scheduling a job graph in the cluster. I think right now the system assumes that a certain number of machines and processing slots are available. But I it should not be too difficult to have something like "fake" machines and slots there which are allocated on demand as needed (so that you basically give the system an upper limit of resources to allocate) I agree with Stephan, that a first good step for fine-grained elasticity would be a common interface for both YARN and Mesos. For YARN, there are currently these (pretty YARN specific) abstract classes: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java I'd suggest that we first merge the "flink-mesos" integration once its done. After that, we can try to come up with a common interface. Are you interested on working towards that feature after the "flink-mesos" integration? Best, Robert On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <[hidden email]> wrote: > Hi Stephan / others interested, > > I have been working on the flink-mesos integration and there are definitely > some thoughts that I would like to share some thoughts about the > commonalities with the flink-yarn integration. > > * Both flink-mesos and flink-yarn integration as they stand today can be > considered as "coarse-grained" scheduling integrations. This means that the > tasks that are spawned (the task managers) are long-lived. > > * MaGuoWei is referring to something (as correctly identified by Stephan) > that I like to call "fine-grained" scheduling integration where, the task > managers are relinquished by the framework when they aren't being utilised > by Flink. This means that when the next job is executed, the job manager > and/or framework will spawn new task managers. This also has an implied > requirement that each taskManager runs one task and is then discarded. > > * Coarse-grained scheduling is preferable when we want interactive > (sub-second response) and waiting for a resource offer to be accepted and a > new taskManager JVM spin up time is not acceptable. The downside is that > long running tasks means that it may lead to underutilisation of the shared > cluster. > > * Fine-grained scheduling is preferable when a little delay (due to > starting > a new taskManager JVM) is acceptable. This means that we will have higher > utilisation of the cluster in a shared setting as resources that aren't > being used are relinquished. But, we need to be a lot more extensive about > this approach. Some of the cases that I can think of are: > * The jobManager/integration-framework may need to monitor the > utilisation > of the taskManagers and kill of taskManagers based on some cool-down > timeout. > * The taskManagers that are being killed off may have resources that are > needed but other tasks so they can't always be killed off > (files/intermediate results etc). This means that there needs to be some > sort of "are you idle?" handshake that needs to be done. > * I like "fine-grained" mode but there may need to be a middle ground > where tasks are "coarse-grained" i.e. run multiple operators and once idle > for a certain amount of time, they are reaped/killed-off by the > jobManager/integration-framework. > > * Ideally, we would want to isolate the logic (a general scheduler) that > says "get me a slot meeting constraints X" into one module which utilises > another module (Yarn or Mesos) that takes such a request and satisfies the > needs of the former. This idea is sort of inspired from the way this > separation exists in apache spark and seems to work out well. > > * I don't know the codebase well enough to say where these things go based > on my reading of the overall architecture of the system, there is nothing > that can't be satisfied by the flink-runtime and it *should* not need any > detailed access to the execution plan. I'll defer this to someone who knows > the internals better. > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Hi,
one thing to also keep in mind here is that TaskManagers might have to keep the intermediate data even after a job is finished. For example, if the user has an interactive scala-shell session where they are exploring same data and transforming it in several steps as they go. Cheers, Aljoscha On Wed, 19 Aug 2015 at 22:05 Robert Metzger <[hidden email]> wrote: > Hi, > > I'm sorry for the late reply, I'm still working through a long email > backlog from a one week vacation ;) > Thank you for the long reply in this thread. Your observations are very > good. > I think we should indeed work towards a more fine grained "intra-job" > elasticity. Let me comment on some of your statements below ... > > * The taskManagers that are being killed off may have resources that are > > needed but other tasks so they can't always be killed off > > (files/intermediate results etc). This means that there needs to be some > > sort of "are you idle?" handshake that needs to be done. > > > I think here we have to distinguish between streaming and batch API jobs > here. > - For deployed streaming jobs, its usually impossible to take away > TaskManagers, because we are working on infinite streams (tasks are never > done). The simplest thing we can do is stopping machines where no tasks are > deployed to. > As Stephan mentioned, dynamic scaling of streaming jobs is certainly > something interesting for the future. There, we would need a component > which is implementing some sort of scaling policy (for example based on > throughput, load or latency). For up or down scaling, we would then > redeploy a job. For this feature, we certainly need nicely abstracted APIs > for YARN and Mesos to alter the running cluster. > - For batch jobs which are usually executed in a pipelined = streaming > fashion, we would need to execute them in a batch-fashion. (Otherwise, > tasks do not finish one after another)) Flink's runtime has already support > for that. With some additional logic, allowing us to recognize when an > intermediate dataset has been fully consumed by downstream tasks, we can > safely deallocate machines in a Flink cluster. I think such a logic can be > implemented in the JobManager's scheduler. > > * Ideally, we would want to isolate the logic (a general scheduler) that > > says "get me a slot meeting constraints X" into one module which utilises > > another module (Yarn or Mesos) that takes such a request and satisfies > the > > needs of the former. This idea is sort of inspired from the way this > > separation exists in apache spark and seems to work out well. > > > The JobManager of Flink has a component which is scheduling a job graph in > the cluster. I think right now the system assumes that a certain number of > machines and processing slots are available. > But I it should not be too difficult to have something like "fake" machines > and slots there which are allocated on demand as needed (so that you > basically give the system an upper limit of resources to allocate) > > I agree with Stephan, that a first good step for fine-grained elasticity > would be a common interface for both YARN and Mesos. > For YARN, there are currently these (pretty YARN specific) abstract > classes: > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java > > I'd suggest that we first merge the "flink-mesos" integration once its > done. After that, we can try to come up with a common interface. > > Are you interested on working towards that feature after the "flink-mesos" > integration? > > Best, > Robert > > > On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <[hidden email]> wrote: > > > Hi Stephan / others interested, > > > > I have been working on the flink-mesos integration and there are > definitely > > some thoughts that I would like to share some thoughts about the > > commonalities with the flink-yarn integration. > > > > * Both flink-mesos and flink-yarn integration as they stand today can be > > considered as "coarse-grained" scheduling integrations. This means that > the > > tasks that are spawned (the task managers) are long-lived. > > > > * MaGuoWei is referring to something (as correctly identified by Stephan) > > that I like to call "fine-grained" scheduling integration where, the task > > managers are relinquished by the framework when they aren't being > utilised > > by Flink. This means that when the next job is executed, the job manager > > and/or framework will spawn new task managers. This also has an implied > > requirement that each taskManager runs one task and is then discarded. > > > > * Coarse-grained scheduling is preferable when we want interactive > > (sub-second response) and waiting for a resource offer to be accepted > and a > > new taskManager JVM spin up time is not acceptable. The downside is that > > long running tasks means that it may lead to underutilisation of the > shared > > cluster. > > > > * Fine-grained scheduling is preferable when a little delay (due to > > starting > > a new taskManager JVM) is acceptable. This means that we will have higher > > utilisation of the cluster in a shared setting as resources that aren't > > being used are relinquished. But, we need to be a lot more extensive > about > > this approach. Some of the cases that I can think of are: > > * The jobManager/integration-framework may need to monitor the > > utilisation > > of the taskManagers and kill of taskManagers based on some cool-down > > timeout. > > * The taskManagers that are being killed off may have resources that > are > > needed but other tasks so they can't always be killed off > > (files/intermediate results etc). This means that there needs to be some > > sort of "are you idle?" handshake that needs to be done. > > * I like "fine-grained" mode but there may need to be a middle ground > > where tasks are "coarse-grained" i.e. run multiple operators and once > idle > > for a certain amount of time, they are reaped/killed-off by the > > jobManager/integration-framework. > > > > * Ideally, we would want to isolate the logic (a general scheduler) that > > says "get me a slot meeting constraints X" into one module which utilises > > another module (Yarn or Mesos) that takes such a request and satisfies > the > > needs of the former. This idea is sort of inspired from the way this > > separation exists in apache spark and seems to work out well. > > > > * I don't know the codebase well enough to say where these things go > based > > on my reading of the overall architecture of the system, there is nothing > > that can't be satisfied by the flink-runtime and it *should* not need any > > detailed access to the execution plan. I'll defer this to someone who > knows > > the internals better. > > > > > > > > -- > > View this message in context: > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html > > Sent from the Apache Flink Mailing List archive. mailing list archive at > > Nabble.com. > > > |
In reply to this post by Robert Metzger
Hi Robert,
I agree with everything that you and Stephan are saying. I haven't looked into the flink codebase and the papers/design docs to comment at a finer level so maybe that's the first piece of homework that I need to do ( need pointers for that). And, yes I would definitely be interested in owning/maintaining/extending the flink-mesos integration. -- Ankur Chauhan > On 19 Aug 2015, at 13:05, Robert Metzger [via Apache Flink Mailing List archive.] <[hidden email]> wrote: > > Hi, > > I'm sorry for the late reply, I'm still working through a long email > backlog from a one week vacation ;) > Thank you for the long reply in this thread. Your observations are very > good. > I think we should indeed work towards a more fine grained "intra-job" > elasticity. Let me comment on some of your statements below ... > > * The taskManagers that are being killed off may have resources that are > > needed but other tasks so they can't always be killed off > > (files/intermediate results etc). This means that there needs to be some > > sort of "are you idle?" handshake that needs to be done. > > > I think here we have to distinguish between streaming and batch API jobs > here. > - For deployed streaming jobs, its usually impossible to take away > TaskManagers, because we are working on infinite streams (tasks are never > done). The simplest thing we can do is stopping machines where no tasks are > deployed to. > As Stephan mentioned, dynamic scaling of streaming jobs is certainly > something interesting for the future. There, we would need a component > which is implementing some sort of scaling policy (for example based on > throughput, load or latency). For up or down scaling, we would then > redeploy a job. For this feature, we certainly need nicely abstracted APIs > for YARN and Mesos to alter the running cluster. > - For batch jobs which are usually executed in a pipelined = streaming > fashion, we would need to execute them in a batch-fashion. (Otherwise, > tasks do not finish one after another)) Flink's runtime has already support > for that. With some additional logic, allowing us to recognize when an > intermediate dataset has been fully consumed by downstream tasks, we can > safely deallocate machines in a Flink cluster. I think such a logic can be > implemented in the JobManager's scheduler. > > * Ideally, we would want to isolate the logic (a general scheduler) that > > says "get me a slot meeting constraints X" into one module which utilises > > another module (Yarn or Mesos) that takes such a request and satisfies the > > needs of the former. This idea is sort of inspired from the way this > > separation exists in apache spark and seems to work out well. > > > The JobManager of Flink has a component which is scheduling a job graph in > the cluster. I think right now the system assumes that a certain number of > machines and processing slots are available. > But I it should not be too difficult to have something like "fake" machines > and slots there which are allocated on demand as needed (so that you > basically give the system an upper limit of resources to allocate) > > I agree with Stephan, that a first good step for fine-grained elasticity > would be a common interface for both YARN and Mesos. > For YARN, there are currently these (pretty YARN specific) abstract classes: > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java > > I'd suggest that we first merge the "flink-mesos" integration once its > done. After that, we can try to come up with a common interface. > > Are you interested on working towards that feature after the "flink-mesos" > integration? > > Best, > Robert > > > On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <[hidden email]> wrote: > > > Hi Stephan / others interested, > > > > I have been working on the flink-mesos integration and there are definitely > > some thoughts that I would like to share some thoughts about the > > commonalities with the flink-yarn integration. > > > > * Both flink-mesos and flink-yarn integration as they stand today can be > > considered as "coarse-grained" scheduling integrations. This means that the > > tasks that are spawned (the task managers) are long-lived. > > > > * MaGuoWei is referring to something (as correctly identified by Stephan) > > that I like to call "fine-grained" scheduling integration where, the task > > managers are relinquished by the framework when they aren't being utilised > > by Flink. This means that when the next job is executed, the job manager > > and/or framework will spawn new task managers. This also has an implied > > requirement that each taskManager runs one task and is then discarded. > > > > * Coarse-grained scheduling is preferable when we want interactive > > (sub-second response) and waiting for a resource offer to be accepted and a > > new taskManager JVM spin up time is not acceptable. The downside is that > > long running tasks means that it may lead to underutilisation of the shared > > cluster. > > > > * Fine-grained scheduling is preferable when a little delay (due to > > starting > > a new taskManager JVM) is acceptable. This means that we will have higher > > utilisation of the cluster in a shared setting as resources that aren't > > being used are relinquished. But, we need to be a lot more extensive about > > this approach. Some of the cases that I can think of are: > > * The jobManager/integration-framework may need to monitor the > > utilisation > > of the taskManagers and kill of taskManagers based on some cool-down > > timeout. > > * The taskManagers that are being killed off may have resources that are > > needed but other tasks so they can't always be killed off > > (files/intermediate results etc). This means that there needs to be some > > sort of "are you idle?" handshake that needs to be done. > > * I like "fine-grained" mode but there may need to be a middle ground > > where tasks are "coarse-grained" i.e. run multiple operators and once idle > > for a certain amount of time, they are reaped/killed-off by the > > jobManager/integration-framework. > > > > * Ideally, we would want to isolate the logic (a general scheduler) that > > says "get me a slot meeting constraints X" into one module which utilises > > another module (Yarn or Mesos) that takes such a request and satisfies the > > needs of the former. This idea is sort of inspired from the way this > > separation exists in apache spark and seems to work out well. > > > > * I don't know the codebase well enough to say where these things go based > > on my reading of the overall architecture of the system, there is nothing > > that can't be satisfied by the flink-runtime and it *should* not need any > > detailed access to the execution plan. I'll defer this to someone who knows > > the internals better. > > > > > > > > -- > > View this message in context: > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html > > Sent from the Apache Flink Mailing List archive. mailing list archive at > > Nabble.com. > > > > > If you reply to this email, your message will be added to the discussion below: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7578.html > To unsubscribe from add some new api to the scheduler in the job manager, click here. > NAML |
Hi Ankur,
I am not aware of any up-to-date papers about the internals of Flink, but the links on this wiki page: https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals contain a lot of helpful material. I'm very happy to see that you are interested in contributing and maintaining the flink-mesos integration :) Robert On Thu, Aug 20, 2015 at 8:10 AM, ankurcha <[hidden email]> wrote: > Hi Robert, > > I agree with everything that you and Stephan are saying. I haven't looked > into the flink codebase and the papers/design docs to comment at a finer > level so maybe that's the first piece of homework that I need to do ( need > pointers for that). > > And, yes I would definitely be interested in owning/maintaining/extending > the > flink-mesos integration. > > -- Ankur Chauhan > > > On 19 Aug 2015, at 13:05, Robert Metzger [via Apache Flink Mailing List > archive.] <[hidden email]> wrote: > > > > Hi, > > > > I'm sorry for the late reply, I'm still working through a long email > > backlog from a one week vacation ;) > > Thank you for the long reply in this thread. Your observations are very > > good. > > I think we should indeed work towards a more fine grained "intra-job" > > elasticity. Let me comment on some of your statements below ... > > > > * The taskManagers that are being killed off may have resources that > are > > > needed but other tasks so they can't always be killed off > > > (files/intermediate results etc). This means that there needs to be > some > > > sort of "are you idle?" handshake that needs to be done. > > > > > > I think here we have to distinguish between streaming and batch API jobs > > here. > > - For deployed streaming jobs, its usually impossible to take away > > TaskManagers, because we are working on infinite streams (tasks are never > > done). The simplest thing we can do is stopping machines where no tasks > are > > deployed to. > > As Stephan mentioned, dynamic scaling of streaming jobs is certainly > > something interesting for the future. There, we would need a component > > which is implementing some sort of scaling policy (for example based on > > throughput, load or latency). For up or down scaling, we would then > > redeploy a job. For this feature, we certainly need nicely abstracted > APIs > > for YARN and Mesos to alter the running cluster. > > - For batch jobs which are usually executed in a pipelined = streaming > > fashion, we would need to execute them in a batch-fashion. (Otherwise, > > tasks do not finish one after another)) Flink's runtime has already > support > > for that. With some additional logic, allowing us to recognize when an > > intermediate dataset has been fully consumed by downstream tasks, we can > > safely deallocate machines in a Flink cluster. I think such a logic can > be > > implemented in the JobManager's scheduler. > > > > * Ideally, we would want to isolate the logic (a general scheduler) that > > > says "get me a slot meeting constraints X" into one module which > utilises > > > another module (Yarn or Mesos) that takes such a request and satisfies > the > > > needs of the former. This idea is sort of inspired from the way this > > > separation exists in apache spark and seems to work out well. > > > > > > The JobManager of Flink has a component which is scheduling a job graph > in > > the cluster. I think right now the system assumes that a certain number > of > > machines and processing slots are available. > > But I it should not be too difficult to have something like "fake" > machines > > and slots there which are allocated on demand as needed (so that you > > basically give the system an upper limit of resources to allocate) > > > > I agree with Stephan, that a first good step for fine-grained elasticity > > would be a common interface for both YARN and Mesos. > > For YARN, there are currently these (pretty YARN specific) abstract > classes: > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java > > > > I'd suggest that we first merge the "flink-mesos" integration once its > > done. After that, we can try to come up with a common interface. > > > > Are you interested on working towards that feature after the > "flink-mesos" > > integration? > > > > Best, > > Robert > > > > > > On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <[hidden email]> wrote: > > > > > Hi Stephan / others interested, > > > > > > I have been working on the flink-mesos integration and there are > definitely > > > some thoughts that I would like to share some thoughts about the > > > commonalities with the flink-yarn integration. > > > > > > * Both flink-mesos and flink-yarn integration as they stand today can > be > > > considered as "coarse-grained" scheduling integrations. This means > that the > > > tasks that are spawned (the task managers) are long-lived. > > > > > > * MaGuoWei is referring to something (as correctly identified by > Stephan) > > > that I like to call "fine-grained" scheduling integration where, the > task > > > managers are relinquished by the framework when they aren't being > utilised > > > by Flink. This means that when the next job is executed, the job > manager > > > and/or framework will spawn new task managers. This also has an implied > > > requirement that each taskManager runs one task and is then discarded. > > > > > > * Coarse-grained scheduling is preferable when we want interactive > > > (sub-second response) and waiting for a resource offer to be accepted > and a > > > new taskManager JVM spin up time is not acceptable. The downside is > that > > > long running tasks means that it may lead to underutilisation of the > shared > > > cluster. > > > > > > * Fine-grained scheduling is preferable when a little delay (due to > > > starting > > > a new taskManager JVM) is acceptable. This means that we will have > higher > > > utilisation of the cluster in a shared setting as resources that aren't > > > being used are relinquished. But, we need to be a lot more extensive > about > > > this approach. Some of the cases that I can think of are: > > > * The jobManager/integration-framework may need to monitor the > > > utilisation > > > of the taskManagers and kill of taskManagers based on some cool-down > > > timeout. > > > * The taskManagers that are being killed off may have resources that > are > > > needed but other tasks so they can't always be killed off > > > (files/intermediate results etc). This means that there needs to be > some > > > sort of "are you idle?" handshake that needs to be done. > > > * I like "fine-grained" mode but there may need to be a middle ground > > > where tasks are "coarse-grained" i.e. run multiple operators and once > idle > > > for a certain amount of time, they are reaped/killed-off by the > > > jobManager/integration-framework. > > > > > > * Ideally, we would want to isolate the logic (a general scheduler) > that > > > says "get me a slot meeting constraints X" into one module which > utilises > > > another module (Yarn or Mesos) that takes such a request and satisfies > the > > > needs of the former. This idea is sort of inspired from the way this > > > separation exists in apache spark and seems to work out well. > > > > > > * I don't know the codebase well enough to say where these things go > based > > > on my reading of the overall architecture of the system, there is > nothing > > > that can't be satisfied by the flink-runtime and it *should* not need > any > > > detailed access to the execution plan. I'll defer this to someone who > knows > > > the internals better. > > > > > > > > > > > > -- > > > View this message in context: > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html > > > Sent from the Apache Flink Mailing List archive. mailing list archive > at > > > Nabble.com. > > > > > > > > > If you reply to this email, your message will be added to the discussion > below: > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7578.html > > To unsubscribe from add some new api to the scheduler in the job > manager, click here. > > NAML > > > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7581.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Free forum by Nabble | Edit this page |