Hi all!
Here comes a pretty big FLIP: "Improvements to the Flink Deployment and Process Model", to better support Yarn, Mesos, Kubernetes, and whatever else Google, Elon Musk, and all the other folks will think up next. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 It is a pretty big FLIP where I took input and thoughts from many people, like Till, Max, Xiaowei (and his colleagues), Eron, and others. The core ideas revolve around - making the JobManager in its core a per-job component (handle multi tenancey outside the JobManager) - making resource acquisition and release more dynamic - tying deployments more naturally to jobs where desirable Let's get the discussion started... Greetings, Stephan |
Hi Stephan,
Thanks for the nice wrap-up of ideas and discussions we had over the last months (not all on the mailing list though because we were just getting started with the FLIP process). The document is very comprehensive and explains the changes in great details, even up to the message passing level. What I really like about the FLIP is that we delegate multi-tenancy away from the JobManager to the resource management framework and the dispatchers. This will help to make the JobManager component cleaner and simpler. The prospect of having the user jars directly in the system classpath of the workers, instead of dealing with custom class loaders, is very nice. The model we have for acquiring and releasing resources wouldn't work particularly well with all the new deployment options, so +1 on a new task slot request/offer system and +1 for making the ResourceManager responsible for TaskManager registration and slot management. This is well aligned with the initial idea of the ResourceManager component. We definitely need good testing for these changes since the possibility of bugs increases with the additional number of messages introduced. The only thing that bugs me is whether we make the Standalone mode a bit less nice to use. The initial bootstrapping of the nodes via the local dispatchers and the subsequent registration of TaskManagers and allocation of slots could cause some delay. It's not a major concern though because it will take little time compared to the actual job run time (unless you run a tiny WordCount). Cheers, Max On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> wrote: > Hi all! > > Here comes a pretty big FLIP: "Improvements to the Flink Deployment and > Process Model", to better support Yarn, Mesos, Kubernetes, and whatever > else Google, Elon Musk, and all the other folks will think up next. > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 > > It is a pretty big FLIP where I took input and thoughts from many people, > like Till, Max, Xiaowei (and his colleagues), Eron, and others. > > The core ideas revolve around > - making the JobManager in its core a per-job component (handle multi > tenancey outside the JobManager) > - making resource acquisition and release more dynamic > - tying deployments more naturally to jobs where desirable > > > Let's get the discussion started... > > Greetings, > Stephan |
Hello,
I just created a new FLIP which aims at exposing our metrics to the WebInterface. https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface Looking forward to feedback :) Regards, Chesnay Schepler |
In reply to this post by mxm
The design looks great - it solves for very diverse deployment modes, allows for heterogeneous TMs, and promotes job isolation.
Some feedback: *Dispatcher* The dispatcher concept here expands nicely on what was introduced in the Mesos design doc (MESOS-1984). The most significant difference being the job-centric orientation of the dispatcher API. FLIP-6 seems to eliminate the concept of a session (or, defines it simply as the lifecycle of a JM); is that correct? Do you agree I should revise the Mesos dispatcher design to be job-centric? I'll be taking the first crack at implementing the dispatcher (for Mesos only) in MESOS-1984 (T2). I’ll keep FLIP-6 in mind as I go. The dispatcher's backend behavior will vary significantly for Mesos vs standalone vs others. Assumedly a base class with concrete implementations will be introduced. To echo the FLIP-6 design as I understand it: 1) Standalone a) The dispatcher process starts an RM, dispatcher frontend, and "local" dispatcher backend at startup. b) Upon job submission, the local dispatcher backend creates an in-process JM actor for the job. c) The JM allocates slots as normal. The RM draws from its pool of registered TM, which grows and shrinks due (only) to external events. 2) Mesos a) The dispatcher process starts a dispatcher frontend and "Mesos" dispatcher backend at startup. b) Upon job submission, the Mesos dispatcher backend creates a Mesos task (dubbed an "AppMaster") which contains a JM/RM for the job. c) The system otherwise functions as described in the Mesos design doc. *Client* I'm concerned about the two code paths that the client uses to launch a job (with-dispatcher vs without-dispatcher). Maybe it could be unified by saying that the client always calls the dispatcher, and that the dispatcher is hostable in either the client or in a separate process. The only variance would be the client-to-dispatcher transport (local vs HTTP). *RM* On the issue of RM statefulness, we can say that the RM does not persist slot allocation (the ground truth is in the TM), but may persist other information (related to cluster manager interaction). For example, the Mesos RM persists the assigned framework identifier and per-task planning information (as is highly recommended by the Mesos development guide). On RM fencing, I was already wondering whether to add it to the Mesos RM, so it is nice to see it being introduced more generally. My rationale is, the dispatcher cannot guarantee that only a single RM is running, because orphaned tasks are possible in certain Mesos failure situations. Similarly, I’m unsure whether YARN provides a strong guarantee about the AM. *User Code* Having job code on the system classpath seems possible in only a subset of cases. The variability may be complex. How important is this optimization? *Security Implications* It should be noted that the standalone embodiment doesn't offer isolation between jobs. The whole system will have a single security context (as it does now). Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios is rightly emphasized. The fact that user code shouldn't be run in the dispatcher process (except in standalone) must be kept in mind. The design doc of FLINK-3929 (section C2) has more detail on that. -Eron > On Jul 28, 2016, at 2:22 AM, Maximilian Michels <[hidden email]> wrote: > > Hi Stephan, > > Thanks for the nice wrap-up of ideas and discussions we had over the > last months (not all on the mailing list though because we were just > getting started with the FLIP process). The document is very > comprehensive and explains the changes in great details, even up to > the message passing level. > > What I really like about the FLIP is that we delegate multi-tenancy > away from the JobManager to the resource management framework and the > dispatchers. This will help to make the JobManager component cleaner > and simpler. The prospect of having the user jars directly in the > system classpath of the workers, instead of dealing with custom class > loaders, is very nice. > > The model we have for acquiring and releasing resources wouldn't work > particularly well with all the new deployment options, so +1 on a new > task slot request/offer system and +1 for making the ResourceManager > responsible for TaskManager registration and slot management. This is > well aligned with the initial idea of the ResourceManager component. > > We definitely need good testing for these changes since the > possibility of bugs increases with the additional number of messages > introduced. > > The only thing that bugs me is whether we make the Standalone mode a > bit less nice to use. The initial bootstrapping of the nodes via the > local dispatchers and the subsequent registration of TaskManagers and > allocation of slots could cause some delay. It's not a major concern > though because it will take little time compared to the actual job run > time (unless you run a tiny WordCount). > > Cheers, > Max > > > > > On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> wrote: >> Hi all! >> >> Here comes a pretty big FLIP: "Improvements to the Flink Deployment and >> Process Model", to better support Yarn, Mesos, Kubernetes, and whatever >> else Google, Elon Musk, and all the other folks will think up next. >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 >> >> It is a pretty big FLIP where I took input and thoughts from many people, >> like Till, Max, Xiaowei (and his colleagues), Eron, and others. >> >> The core ideas revolve around >> - making the JobManager in its core a per-job component (handle multi >> tenancey outside the JobManager) >> - making resource acquisition and release more dynamic >> - tying deployments more naturally to jobs where desirable >> >> >> Let's get the discussion started... >> >> Greetings, >> Stephan |
+1
I don't have much to say since this already seems very well worked out. Just some small remarks: - This sentence that describes TaskManager behavior will probably have to be adapted for FLIP-1, correct? "Loss of connection to the JobManager results in triggering master-failure recovery (currently: cancel all tasks form that master)" - For docker mode there is this sentence: "To start a Flink job, one configures a service to start one container of the Job/JobManager image, and N containers of the TaskManager image." This can be achieved with Docker compose. We already use this in the docker image that we have in the Flink source. - The design mentions that the ResourceManager should be long running, especially longer than JobManager lifetime. However, this is only true for standalone mode and not for Yarn or Mesos which I think will be the two more important deployment modes. In those two modes it becomes basically a sub-component of the JobManager. Should this be made more prominent in the description of the ResourceManager? Cheers, Aljoscha On Fri, 29 Jul 2016 at 15:26 Wright, Eron <[hidden email]> wrote: > The design looks great - it solves for very diverse deployment modes, > allows for heterogeneous TMs, and promotes job isolation. > > Some feedback: > > *Dispatcher* > The dispatcher concept here expands nicely on what was introduced in the > Mesos design doc (MESOS-1984). The most significant difference being the > job-centric orientation of the dispatcher API. FLIP-6 seems to eliminate > the concept of a session (or, defines it simply as the lifecycle of a JM); > is that correct? Do you agree I should revise the Mesos dispatcher > design to be job-centric? > > I'll be taking the first crack at implementing the dispatcher (for Mesos > only) in MESOS-1984 (T2). I’ll keep FLIP-6 in mind as I go. > > The dispatcher's backend behavior will vary significantly for Mesos vs > standalone vs others. Assumedly a base class with concrete > implementations will be introduced. To echo the FLIP-6 design as I > understand it: > > 1) Standalone > a) The dispatcher process starts an RM, dispatcher frontend, and > "local" dispatcher backend at startup. > b) Upon job submission, the local dispatcher backend creates an > in-process JM actor for the job. > c) The JM allocates slots as normal. The RM draws from its pool of > registered TM, which grows and shrinks due (only) to external events. > > 2) Mesos > a) The dispatcher process starts a dispatcher frontend and "Mesos" > dispatcher backend at startup. > b) Upon job submission, the Mesos dispatcher backend creates a Mesos > task (dubbed an "AppMaster") which contains a JM/RM for the job. > c) The system otherwise functions as described in the Mesos design doc. > > *Client* > I'm concerned about the two code paths that the client uses to launch a > job (with-dispatcher vs without-dispatcher). Maybe it could be unified by > saying that the client always calls the dispatcher, and that the dispatcher > is hostable in either the client or in a separate process. The only > variance would be the client-to-dispatcher transport (local vs HTTP). > > *RM* > On the issue of RM statefulness, we can say that the RM does not persist > slot allocation (the ground truth is in the TM), but may persist other > information (related to cluster manager interaction). For example, the > Mesos RM persists the assigned framework identifier and per-task planning > information (as is highly recommended by the Mesos development guide). > > On RM fencing, I was already wondering whether to add it to the Mesos RM, > so it is nice to see it being introduced more generally. My rationale is, > the dispatcher cannot guarantee that only a single RM is running, because > orphaned tasks are possible in certain Mesos failure situations. > Similarly, I’m unsure whether YARN provides a strong guarantee about the > AM. > > *User Code* > Having job code on the system classpath seems possible in only a subset of > cases. The variability may be complex. How important is this > optimization? > > *Security Implications* > It should be noted that the standalone embodiment doesn't offer isolation > between jobs. The whole system will have a single security context (as it > does now). > > Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios is > rightly emphasized. The fact that user code shouldn't be run in the > dispatcher process (except in standalone) must be kept in mind. The > design doc of FLINK-3929 (section C2) has more detail on that. > > > -Eron > > > > On Jul 28, 2016, at 2:22 AM, Maximilian Michels <[hidden email]> wrote: > > > > Hi Stephan, > > > > Thanks for the nice wrap-up of ideas and discussions we had over the > > last months (not all on the mailing list though because we were just > > getting started with the FLIP process). The document is very > > comprehensive and explains the changes in great details, even up to > > the message passing level. > > > > What I really like about the FLIP is that we delegate multi-tenancy > > away from the JobManager to the resource management framework and the > > dispatchers. This will help to make the JobManager component cleaner > > and simpler. The prospect of having the user jars directly in the > > system classpath of the workers, instead of dealing with custom class > > loaders, is very nice. > > > > The model we have for acquiring and releasing resources wouldn't work > > particularly well with all the new deployment options, so +1 on a new > > task slot request/offer system and +1 for making the ResourceManager > > responsible for TaskManager registration and slot management. This is > > well aligned with the initial idea of the ResourceManager component. > > > > We definitely need good testing for these changes since the > > possibility of bugs increases with the additional number of messages > > introduced. > > > > The only thing that bugs me is whether we make the Standalone mode a > > bit less nice to use. The initial bootstrapping of the nodes via the > > local dispatchers and the subsequent registration of TaskManagers and > > allocation of slots could cause some delay. It's not a major concern > > though because it will take little time compared to the actual job run > > time (unless you run a tiny WordCount). > > > > Cheers, > > Max > > > > > > > > > > On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> wrote: > >> Hi all! > >> > >> Here comes a pretty big FLIP: "Improvements to the Flink Deployment and > >> Process Model", to better support Yarn, Mesos, Kubernetes, and whatever > >> else Google, Elon Musk, and all the other folks will think up next. > >> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 > >> > >> It is a pretty big FLIP where I took input and thoughts from many > people, > >> like Till, Max, Xiaowei (and his colleagues), Eron, and others. > >> > >> The core ideas revolve around > >> - making the JobManager in its core a per-job component (handle multi > >> tenancey outside the JobManager) > >> - making resource acquisition and release more dynamic > >> - tying deployments more naturally to jobs where desirable > >> > >> > >> Let's get the discussion started... > >> > >> Greetings, > >> Stephan > > |
In reply to this post by mxm
Thanks for the great proposal.
There are still 2 issues i concerned with which i want to discuss with. #1 Who should decide the resources one operator uses, user or framework? Like how much cpu or memory will cost by my "map" operator, does it seem a little bit too low level for the users, should we expose some APIs for these? #2 Who decides to combine the slots into a real container in Yarn and Mesos mode? Currently, flink has an optimize for resource utilization which called SlotSharingGroup. This took effects before flink allocate resources, we combine as many operators as we could into one single *SharedSlot* (which i think it's still a Slot). It seems all the combination or optimization are done before we allocate resources, so should we distinguish the differences between slots and containers(if we want introduces this concept, but i think it's needed by standalone mode). If the answer is yes, it will lead us to the situation that both JobManager and ResourceManager will know how to utilize resources. For logic like SlotSharingGroup, it's more appropriate to let Scheduler handle because it's has a lot informations about JobGraph and some constraint on it. But for some other logics which are more pure resources aware or Cluster specified, we may consider to let ResourceManager handle these. E.g. there are some limitation about Yarn's allocation, we can only allocate containers with "integer" vcores, so it's not possible for us to have some 0.1 or 0.2 vcore for now. We have bypassed this by combining some operators into one slot or it will cause waste of resources. But, i think it's better if we can make only one role aware all the resources utilizations. Thanks Kurt On Thu, Jul 28, 2016 at 5:22 PM, Maximilian Michels <[hidden email]> wrote: > Hi Stephan, > > Thanks for the nice wrap-up of ideas and discussions we had over the > last months (not all on the mailing list though because we were just > getting started with the FLIP process). The document is very > comprehensive and explains the changes in great details, even up to > the message passing level. > > What I really like about the FLIP is that we delegate multi-tenancy > away from the JobManager to the resource management framework and the > dispatchers. This will help to make the JobManager component cleaner > and simpler. The prospect of having the user jars directly in the > system classpath of the workers, instead of dealing with custom class > loaders, is very nice. > > The model we have for acquiring and releasing resources wouldn't work > particularly well with all the new deployment options, so +1 on a new > task slot request/offer system and +1 for making the ResourceManager > responsible for TaskManager registration and slot management. This is > well aligned with the initial idea of the ResourceManager component. > > We definitely need good testing for these changes since the > possibility of bugs increases with the additional number of messages > introduced. > > The only thing that bugs me is whether we make the Standalone mode a > bit less nice to use. The initial bootstrapping of the nodes via the > local dispatchers and the subsequent registration of TaskManagers and > allocation of slots could cause some delay. It's not a major concern > though because it will take little time compared to the actual job run > time (unless you run a tiny WordCount). > > Cheers, > Max > > > > > On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> wrote: > > Hi all! > > > > Here comes a pretty big FLIP: "Improvements to the Flink Deployment and > > Process Model", to better support Yarn, Mesos, Kubernetes, and whatever > > else Google, Elon Musk, and all the other folks will think up next. > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 > > > > It is a pretty big FLIP where I took input and thoughts from many people, > > like Till, Max, Xiaowei (and his colleagues), Eron, and others. > > > > The core ideas revolve around > > - making the JobManager in its core a per-job component (handle multi > > tenancey outside the JobManager) > > - making resource acquisition and release more dynamic > > - tying deployments more naturally to jobs where desirable > > > > > > Let's get the discussion started... > > > > Greetings, > > Stephan > |
In reply to this post by Chesnay Schepler-3
Hi!
Thanks for writing this up. I think it looks quite reasonable (I hope I understood that design correctly) There is one point of confusions left for me, though: The MetricDumper and MetricSnapshot: I think it is just the names that confuse me here. It looks like they define a way to query the metrics in the Metric Registry in a standard schema (independent of the scope formats). Should the "dumper" maybe be called "MetricsQueryService" or so (the query service returns a MetricSnapshot, if I understand correctly). It would be great if the "query service" would not need metrics to be registered - saves us some effort during startup / teardown. It looks as if the query service could just use the the root-most component metric groups to walk the tree of whatever metric is currently there and put it into the current snapshot. One open questions that I have is: How do you know how to merge the metrics from the subtasks, for example in case you want a metric across subtasks. In general, not transferring objects (only strings / numbers) would be preferable, because the WebMonitor may run in an environment where no user-code classloader can be used. It may run in the dispatcher (which must be trusted and cannot execute user code). Greetings, Stephan On Thu, Jul 28, 2016 at 3:12 PM, Chesnay Schepler <[hidden email]> wrote: > Hello, > > I just created a new FLIP which aims at exposing our metrics to the > WebInterface. > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface > > Looking forward to feedback :) > > Regards, > Chesnay Schepler > |
The metrics transfer design document looks good to me. Thanks for your work
Chesnay :-) I think the benefit of registering the metrics at the MetricDumper is that we don't have to walk through the hierarchy of metric groups to collect the metric values. Indeed, this comes with increased costs at start-up. But I'm not sure what's the concrete impact on job performance in these cases. Cheers, Till On Tue, Aug 2, 2016 at 8:34 PM, Stephan Ewen <[hidden email]> wrote: > Hi! > > Thanks for writing this up. I think it looks quite reasonable (I hope I > understood that design correctly) > > There is one point of confusions left for me, though: The MetricDumper and > MetricSnapshot: I think it is just the names that confuse me here. > It looks like they define a way to query the metrics in the Metric Registry > in a standard schema (independent of the scope formats). > Should the "dumper" maybe be called "MetricsQueryService" or so (the query > service returns a MetricSnapshot, if I understand correctly). > > It would be great if the "query service" would not need metrics to be > registered - saves us some effort during startup / teardown. It looks > as if the query service could just use the the root-most component metric > groups to walk the tree of whatever metric is currently there and put it > into the current snapshot. > > One open questions that I have is: How do you know how to merge the metrics > from the subtasks, for example in case you want a metric across subtasks. > > In general, not transferring objects (only strings / numbers) would be > preferable, because the WebMonitor may run in an environment where no > user-code classloader can be used. > It may run in the dispatcher (which must be trusted and cannot execute user > code). > > Greetings, > Stephan > > > > On Thu, Jul 28, 2016 at 3:12 PM, Chesnay Schepler <[hidden email]> > wrote: > > > Hello, > > > > I just created a new FLIP which aims at exposing our metrics to the > > WebInterface. > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface > > > > Looking forward to feedback :) > > > > Regards, > > Chesnay Schepler > > > |
Thank you for your feedback :)
Regarding names: The Dumper does not create a MetricSnapshot. The Dumper creates a list of key-value pairs; metric_name:value. A (single) MetricSnapshot exists in the WebRuntimeMonitor, into which the dumped list is inserted. So the dumper creates a snapshot but not a MetricSnapshot, and the WebRuntimeMonitor contains a MetricSnapshot which isn't really a snapshot but more a storage. The naming isn't the best. I'm not sure if "Service" really fits the bill; I associate a service with separate thread running in the background. Regarding merging of metrics: We are not merging any metrics right now. While Counters are easy to merge, for Gauge's we may have to let the user choose in the WebInterface how they should be aggregated. This is /not really/ a problem; in the sense that we don't have different versions overwriting each other: * JM/TM metrics don't have to be merged * task metrics can be kept on a per subtask/operator level for now (the prototype exposes them as "<subtask_index>_<operator_name>_<metric_name>") * job metrics are currently only gathered on the JM; so no merging here either Regarding transfer: Should we transfer numbers as numbers, or also as strings? I'm concerned about the efficiency of the whole thing; if we send some metrics as strings and some as numbers we have to decide for every metric which option we should take. That's why i was wondering whether to send everything as objects or everything as strings. Regarding traversal of groups: Yes, we would save on startup/teardown time if we traversed the groups instead. However the dumping itself should become more expensive this way; and since this is done by the TaskManager thread i wanted to keep it as simple as possible. Also, there is currently no way to access the metrics contained in a group. We would have to add another method to the AbstractMetricGroup, which i would prefer not to do as it can lead to concurrency issues during teardown. On 02.08.2016 15:05, Till Rohrmann wrote: > The metrics transfer design document looks good to me. Thanks for your work > Chesnay :-) > > I think the benefit of registering the metrics at the MetricDumper is that > we don't have to walk through the hierarchy of metric groups to collect the > metric values. Indeed, this comes with increased costs at start-up. But I'm > not sure what's the concrete impact on job performance in these cases. > > Cheers, > Till > > On Tue, Aug 2, 2016 at 8:34 PM, Stephan Ewen <[hidden email]> wrote: > >> Hi! >> >> Thanks for writing this up. I think it looks quite reasonable (I hope I >> understood that design correctly) >> >> There is one point of confusions left for me, though: The MetricDumper and >> MetricSnapshot: I think it is just the names that confuse me here. >> It looks like they define a way to query the metrics in the Metric Registry >> in a standard schema (independent of the scope formats). >> Should the "dumper" maybe be called "MetricsQueryService" or so (the query >> service returns a MetricSnapshot, if I understand correctly). >> >> It would be great if the "query service" would not need metrics to be >> registered - saves us some effort during startup / teardown. It looks >> as if the query service could just use the the root-most component metric >> groups to walk the tree of whatever metric is currently there and put it >> into the current snapshot. >> >> One open questions that I have is: How do you know how to merge the metrics >> from the subtasks, for example in case you want a metric across subtasks. >> >> In general, not transferring objects (only strings / numbers) would be >> preferable, because the WebMonitor may run in an environment where no >> user-code classloader can be used. >> It may run in the dispatcher (which must be trusted and cannot execute user >> code). >> >> Greetings, >> Stephan >> >> >> >> On Thu, Jul 28, 2016 at 3:12 PM, Chesnay Schepler <[hidden email]> >> wrote: >> >>> Hello, >>> >>> I just created a new FLIP which aims at exposing our metrics to the >>> WebInterface. >>> >>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface >>> Looking forward to feedback :) >>> >>> Regards, >>> Chesnay Schepler >>> |
Regarding transfer:
I think objects are fine, as long as they are not user-defined objects. We can limit it to String and subclasses of Number. Regarding traversal of groups: I am still thinking here in terms of the paradigm that the metrics should impact the regular system as little as possible. Shifting work to the "query/dump" action is good in that sense, unless that means permanent re-construction of the name. The metric query endpoint could (should) be a separate actor from the TaskManager, in my opinion. That also solves the issue of blocking the TaskManager actor. BTW: Can the Dumper be simply a special reporter that understands the component metric groups and does not use scope formats? On Tue, Aug 2, 2016 at 3:50 PM, Chesnay Schepler <[hidden email]> wrote: > Thank you for your feedback :) > > Regarding names: > > The Dumper does not create a MetricSnapshot. The Dumper creates a > list of key-value pairs; metric_name:value. > A (single) MetricSnapshot exists in the WebRuntimeMonitor, into > which the dumped list is inserted. > > So the dumper creates a snapshot but not a MetricSnapshot, and the > WebRuntimeMonitor contains a MetricSnapshot which isn't really a > snapshot but more a storage. > > The naming isn't the best. > > I'm not sure if "Service" really fits the bill; I associate a > service with separate thread running in the background. > > Regarding merging of metrics: > > We are not merging any metrics right now. While Counters are easy to > merge, for Gauge's we may have to let the user choose in the > WebInterface how they should be aggregated. > > This is /not really/ a problem; in the sense that we don't have > different versions overwriting each other: > > * JM/TM metrics don't have to be merged > * task metrics can be kept on a per subtask/operator level for now > (the prototype exposes them as > "<subtask_index>_<operator_name>_<metric_name>") > * job metrics are currently only gathered on the JM; so no merging > here either > > Regarding transfer: > > Should we transfer numbers as numbers, or also as strings? I'm > concerned about the efficiency of the whole thing; if we send some > metrics as strings and some as numbers we have to decide for every > metric which option we should take. That's why i was wondering > whether to send everything as objects or everything as strings. > > Regarding traversal of groups: > > Yes, we would save on startup/teardown time if we traversed the > groups instead. However the dumping itself should become more > expensive this way; and since this is done by the TaskManager thread > i wanted to keep it as simple as possible. > > Also, there is currently no way to access the metrics contained in a > group. We would have to add another method to the > AbstractMetricGroup, which i would prefer not to do as it can lead > to concurrency issues during teardown. > > > > On 02.08.2016 15:05, Till Rohrmann wrote: > >> The metrics transfer design document looks good to me. Thanks for your >> work >> Chesnay :-) >> >> I think the benefit of registering the metrics at the MetricDumper is that >> we don't have to walk through the hierarchy of metric groups to collect >> the >> metric values. Indeed, this comes with increased costs at start-up. But >> I'm >> not sure what's the concrete impact on job performance in these cases. >> >> Cheers, >> Till >> >> On Tue, Aug 2, 2016 at 8:34 PM, Stephan Ewen <[hidden email]> wrote: >> >> Hi! >>> >>> Thanks for writing this up. I think it looks quite reasonable (I hope I >>> understood that design correctly) >>> >>> There is one point of confusions left for me, though: The MetricDumper >>> and >>> MetricSnapshot: I think it is just the names that confuse me here. >>> It looks like they define a way to query the metrics in the Metric >>> Registry >>> in a standard schema (independent of the scope formats). >>> Should the "dumper" maybe be called "MetricsQueryService" or so (the >>> query >>> service returns a MetricSnapshot, if I understand correctly). >>> >>> It would be great if the "query service" would not need metrics to be >>> registered - saves us some effort during startup / teardown. It looks >>> as if the query service could just use the the root-most component metric >>> groups to walk the tree of whatever metric is currently there and put it >>> into the current snapshot. >>> >>> One open questions that I have is: How do you know how to merge the >>> metrics >>> from the subtasks, for example in case you want a metric across subtasks. >>> >>> In general, not transferring objects (only strings / numbers) would be >>> preferable, because the WebMonitor may run in an environment where no >>> user-code classloader can be used. >>> It may run in the dispatcher (which must be trusted and cannot execute >>> user >>> code). >>> >>> Greetings, >>> Stephan >>> >>> >>> >>> On Thu, Jul 28, 2016 at 3:12 PM, Chesnay Schepler <[hidden email]> >>> wrote: >>> >>> Hello, >>>> >>>> I just created a new FLIP which aims at exposing our metrics to the >>>> WebInterface. >>>> >>>> >>>> >>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface >>> >>>> Looking forward to feedback :) >>>> >>>> Regards, >>>> Chesnay Schepler >>>> >>>> > |
Names are only constructed once; when the dumper is notified of the new
metric. We don't have to use scopeFormats; we can hard-code the scope assembly into the ComponentMetricGroups. This way we don't need special behavior in the dumper. On 02.08.2016 16:29, Stephan Ewen wrote: > Regarding transfer: > I think objects are fine, as long as they are not user-defined objects. We > can limit it to String and subclasses of Number. > > Regarding traversal of groups: > I am still thinking here in terms of the paradigm that the metrics should > impact the regular system as little as possible. Shifting work to the > "query/dump" action is good in that sense, unless that means permanent > re-construction of the name. > The metric query endpoint could (should) be a separate actor from the > TaskManager, in my opinion. That also solves the issue of blocking the > TaskManager actor. > > BTW: Can the Dumper be simply a special reporter that understands the > component metric groups and does not use scope formats? > > On Tue, Aug 2, 2016 at 3:50 PM, Chesnay Schepler <[hidden email]> wrote: > >> Thank you for your feedback :) >> >> Regarding names: >> >> The Dumper does not create a MetricSnapshot. The Dumper creates a >> list of key-value pairs; metric_name:value. >> A (single) MetricSnapshot exists in the WebRuntimeMonitor, into >> which the dumped list is inserted. >> >> So the dumper creates a snapshot but not a MetricSnapshot, and the >> WebRuntimeMonitor contains a MetricSnapshot which isn't really a >> snapshot but more a storage. >> >> The naming isn't the best. >> >> I'm not sure if "Service" really fits the bill; I associate a >> service with separate thread running in the background. >> >> Regarding merging of metrics: >> >> We are not merging any metrics right now. While Counters are easy to >> merge, for Gauge's we may have to let the user choose in the >> WebInterface how they should be aggregated. >> >> This is /not really/ a problem; in the sense that we don't have >> different versions overwriting each other: >> >> * JM/TM metrics don't have to be merged >> * task metrics can be kept on a per subtask/operator level for now >> (the prototype exposes them as >> "<subtask_index>_<operator_name>_<metric_name>") >> * job metrics are currently only gathered on the JM; so no merging >> here either >> >> Regarding transfer: >> >> Should we transfer numbers as numbers, or also as strings? I'm >> concerned about the efficiency of the whole thing; if we send some >> metrics as strings and some as numbers we have to decide for every >> metric which option we should take. That's why i was wondering >> whether to send everything as objects or everything as strings. >> >> Regarding traversal of groups: >> >> Yes, we would save on startup/teardown time if we traversed the >> groups instead. However the dumping itself should become more >> expensive this way; and since this is done by the TaskManager thread >> i wanted to keep it as simple as possible. >> >> Also, there is currently no way to access the metrics contained in a >> group. We would have to add another method to the >> AbstractMetricGroup, which i would prefer not to do as it can lead >> to concurrency issues during teardown. >> >> >> >> On 02.08.2016 15:05, Till Rohrmann wrote: >> >>> The metrics transfer design document looks good to me. Thanks for your >>> work >>> Chesnay :-) >>> >>> I think the benefit of registering the metrics at the MetricDumper is that >>> we don't have to walk through the hierarchy of metric groups to collect >>> the >>> metric values. Indeed, this comes with increased costs at start-up. But >>> I'm >>> not sure what's the concrete impact on job performance in these cases. >>> >>> Cheers, >>> Till >>> >>> On Tue, Aug 2, 2016 at 8:34 PM, Stephan Ewen <[hidden email]> wrote: >>> >>> Hi! >>>> Thanks for writing this up. I think it looks quite reasonable (I hope I >>>> understood that design correctly) >>>> >>>> There is one point of confusions left for me, though: The MetricDumper >>>> and >>>> MetricSnapshot: I think it is just the names that confuse me here. >>>> It looks like they define a way to query the metrics in the Metric >>>> Registry >>>> in a standard schema (independent of the scope formats). >>>> Should the "dumper" maybe be called "MetricsQueryService" or so (the >>>> query >>>> service returns a MetricSnapshot, if I understand correctly). >>>> >>>> It would be great if the "query service" would not need metrics to be >>>> registered - saves us some effort during startup / teardown. It looks >>>> as if the query service could just use the the root-most component metric >>>> groups to walk the tree of whatever metric is currently there and put it >>>> into the current snapshot. >>>> >>>> One open questions that I have is: How do you know how to merge the >>>> metrics >>>> from the subtasks, for example in case you want a metric across subtasks. >>>> >>>> In general, not transferring objects (only strings / numbers) would be >>>> preferable, because the WebMonitor may run in an environment where no >>>> user-code classloader can be used. >>>> It may run in the dispatcher (which must be trusted and cannot execute >>>> user >>>> code). >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> >>>> On Thu, Jul 28, 2016 at 3:12 PM, Chesnay Schepler <[hidden email]> >>>> wrote: >>>> >>>> Hello, >>>>> I just created a new FLIP which aims at exposing our metrics to the >>>>> WebInterface. >>>>> >>>>> >>>>> >>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface >>>> >>>>> Looking forward to feedback :) >>>>> >>>>> Regards, >>>>> Chesnay Schepler >>>>> >>>>> |
In reply to this post by Eron Wright
Hi Eron!
Some comments on your comments: *Dispatcher* - The dispatcher should NOT be job-centric. The dispatcher should take over the "multi job" responsibilities here, now that the JobManager is single-job only. - An abstract dispatcher would be great. It could implement the connection/HTTP elements and have an abstract method to start a job -> Yarn - use YarnClusterClient to start a YarnJob -> Mesos - same thing -> Standalone - spawn a JobManager *Client* This is an interesting point. Max is currently refactoring the clients into - Cluster Client (with specialization for Yarn, Standalone) to launch jobs and control a cluster (yarn session, ...) - Job Client, which is connected to a single job and can issue commands to that job (cancel/stop/checkpoint/savepoint/change-parallelism) Let's try and get his input on this. *RM* Agreed - the base RM is "stateless", specialized RMs can behave different, if they need to. RM fencing must be generic - all cluster types can suffer from orphaned tasks (Yarn as well, I think) *User Code* I think in the cases where processes/containers are launched per-job, this should always be feasible. It is a nice optimization that I think we should do where ever possible. Makes users' life with respect to classloading much easier. Some cases with custom class loading are currently tough in Flink - that way, these jobs would at least run in the yarn/mesos individual job mode (not the session mode still, that one needs dynamic class loading). *Standalone Security* That is a known limitation and fine for now, I think. Whoever wants proper security needs to go to Yarn/Mesos initially. Standalone v2.0 may change that. Greetings, Stephan On Sat, Jul 30, 2016 at 12:26 AM, Wright, Eron <[hidden email]> wrote: > The design looks great - it solves for very diverse deployment modes, > allows for heterogeneous TMs, and promotes job isolation. > > Some feedback: > > *Dispatcher* > The dispatcher concept here expands nicely on what was introduced in the > Mesos design doc (MESOS-1984). The most significant difference being the > job-centric orientation of the dispatcher API. FLIP-6 seems to eliminate > the concept of a session (or, defines it simply as the lifecycle of a JM); > is that correct? Do you agree I should revise the Mesos dispatcher > design to be job-centric? > > I'll be taking the first crack at implementing the dispatcher (for Mesos > only) in MESOS-1984 (T2). I’ll keep FLIP-6 in mind as I go. > > The dispatcher's backend behavior will vary significantly for Mesos vs > standalone vs others. Assumedly a base class with concrete > implementations will be introduced. To echo the FLIP-6 design as I > understand it: > > 1) Standalone > a) The dispatcher process starts an RM, dispatcher frontend, and > "local" dispatcher backend at startup. > b) Upon job submission, the local dispatcher backend creates an > in-process JM actor for the job. > c) The JM allocates slots as normal. The RM draws from its pool of > registered TM, which grows and shrinks due (only) to external events. > > 2) Mesos > a) The dispatcher process starts a dispatcher frontend and "Mesos" > dispatcher backend at startup. > b) Upon job submission, the Mesos dispatcher backend creates a Mesos > task (dubbed an "AppMaster") which contains a JM/RM for the job. > c) The system otherwise functions as described in the Mesos design doc. > > *Client* > I'm concerned about the two code paths that the client uses to launch a > job (with-dispatcher vs without-dispatcher). Maybe it could be unified by > saying that the client always calls the dispatcher, and that the dispatcher > is hostable in either the client or in a separate process. The only > variance would be the client-to-dispatcher transport (local vs HTTP). > > *RM* > On the issue of RM statefulness, we can say that the RM does not persist > slot allocation (the ground truth is in the TM), but may persist other > information (related to cluster manager interaction). For example, the > Mesos RM persists the assigned framework identifier and per-task planning > information (as is highly recommended by the Mesos development guide). > > On RM fencing, I was already wondering whether to add it to the Mesos RM, > so it is nice to see it being introduced more generally. My rationale is, > the dispatcher cannot guarantee that only a single RM is running, because > orphaned tasks are possible in certain Mesos failure situations. > Similarly, I’m unsure whether YARN provides a strong guarantee about the > AM. > > *User Code* > Having job code on the system classpath seems possible in only a subset of > cases. The variability may be complex. How important is this > optimization? > > *Security Implications* > It should be noted that the standalone embodiment doesn't offer isolation > between jobs. The whole system will have a single security context (as it > does now). > > Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios is > rightly emphasized. The fact that user code shouldn't be run in the > dispatcher process (except in standalone) must be kept in mind. The > design doc of FLINK-3929 (section C2) has more detail on that. > > > -Eron > > > > On Jul 28, 2016, at 2:22 AM, Maximilian Michels <[hidden email]> wrote: > > > > Hi Stephan, > > > > Thanks for the nice wrap-up of ideas and discussions we had over the > > last months (not all on the mailing list though because we were just > > getting started with the FLIP process). The document is very > > comprehensive and explains the changes in great details, even up to > > the message passing level. > > > > What I really like about the FLIP is that we delegate multi-tenancy > > away from the JobManager to the resource management framework and the > > dispatchers. This will help to make the JobManager component cleaner > > and simpler. The prospect of having the user jars directly in the > > system classpath of the workers, instead of dealing with custom class > > loaders, is very nice. > > > > The model we have for acquiring and releasing resources wouldn't work > > particularly well with all the new deployment options, so +1 on a new > > task slot request/offer system and +1 for making the ResourceManager > > responsible for TaskManager registration and slot management. This is > > well aligned with the initial idea of the ResourceManager component. > > > > We definitely need good testing for these changes since the > > possibility of bugs increases with the additional number of messages > > introduced. > > > > The only thing that bugs me is whether we make the Standalone mode a > > bit less nice to use. The initial bootstrapping of the nodes via the > > local dispatchers and the subsequent registration of TaskManagers and > > allocation of slots could cause some delay. It's not a major concern > > though because it will take little time compared to the actual job run > > time (unless you run a tiny WordCount). > > > > Cheers, > > Max > > > > > > > > > > On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> wrote: > >> Hi all! > >> > >> Here comes a pretty big FLIP: "Improvements to the Flink Deployment and > >> Process Model", to better support Yarn, Mesos, Kubernetes, and whatever > >> else Google, Elon Musk, and all the other folks will think up next. > >> > >> https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=65147077 > >> > >> It is a pretty big FLIP where I took input and thoughts from many > people, > >> like Till, Max, Xiaowei (and his colleagues), Eron, and others. > >> > >> The core ideas revolve around > >> - making the JobManager in its core a per-job component (handle multi > >> tenancey outside the JobManager) > >> - making resource acquisition and release more dynamic > >> - tying deployments more naturally to jobs where desirable > >> > >> > >> Let's get the discussion started... > >> > >> Greetings, > >> Stephan > > |
In reply to this post by Aljoscha Krettek-2
@Aljoscha
I would not make the ResourceManager a subcomponent of the JobManager. While that may be simpler initially, I would like to keep the door open to let RM and JM run in different processes/nodes. Also, for Yarn/Mesos sessions, the ResourceManager may run longer than the JobManager. On Sun, Jul 31, 2016 at 6:58 PM, Aljoscha Krettek <[hidden email]> wrote: > +1 > > I don't have much to say since this already seems very well worked out. > Just some small remarks: > - This sentence that describes TaskManager behavior will probably have to > be adapted for FLIP-1, correct? "Loss of connection to the JobManager > results in triggering master-failure recovery (currently: cancel all tasks > form that master)" > - For docker mode there is this sentence: "To start a Flink job, one > configures a service to start one container of the Job/JobManager image, > and N containers of the TaskManager image." This can be achieved with > Docker compose. We already use this in the docker image that we have in the > Flink source. > - The design mentions that the ResourceManager should be long running, > especially longer than JobManager lifetime. However, this is only true for > standalone mode and not for Yarn or Mesos which I think will be the two > more important deployment modes. In those two modes it becomes basically a > sub-component of the JobManager. Should this be made more prominent in the > description of the ResourceManager? > > Cheers, > Aljoscha > > On Fri, 29 Jul 2016 at 15:26 Wright, Eron <[hidden email]> wrote: > > > The design looks great - it solves for very diverse deployment modes, > > allows for heterogeneous TMs, and promotes job isolation. > > > > Some feedback: > > > > *Dispatcher* > > The dispatcher concept here expands nicely on what was introduced in the > > Mesos design doc (MESOS-1984). The most significant difference being the > > job-centric orientation of the dispatcher API. FLIP-6 seems to > eliminate > > the concept of a session (or, defines it simply as the lifecycle of a > JM); > > is that correct? Do you agree I should revise the Mesos dispatcher > > design to be job-centric? > > > > I'll be taking the first crack at implementing the dispatcher (for Mesos > > only) in MESOS-1984 (T2). I’ll keep FLIP-6 in mind as I go. > > > > The dispatcher's backend behavior will vary significantly for Mesos vs > > standalone vs others. Assumedly a base class with concrete > > implementations will be introduced. To echo the FLIP-6 design as I > > understand it: > > > > 1) Standalone > > a) The dispatcher process starts an RM, dispatcher frontend, and > > "local" dispatcher backend at startup. > > b) Upon job submission, the local dispatcher backend creates an > > in-process JM actor for the job. > > c) The JM allocates slots as normal. The RM draws from its pool of > > registered TM, which grows and shrinks due (only) to external events. > > > > 2) Mesos > > a) The dispatcher process starts a dispatcher frontend and "Mesos" > > dispatcher backend at startup. > > b) Upon job submission, the Mesos dispatcher backend creates a Mesos > > task (dubbed an "AppMaster") which contains a JM/RM for the job. > > c) The system otherwise functions as described in the Mesos design > doc. > > > > *Client* > > I'm concerned about the two code paths that the client uses to launch a > > job (with-dispatcher vs without-dispatcher). Maybe it could be unified > by > > saying that the client always calls the dispatcher, and that the > dispatcher > > is hostable in either the client or in a separate process. The only > > variance would be the client-to-dispatcher transport (local vs HTTP). > > > > *RM* > > On the issue of RM statefulness, we can say that the RM does not persist > > slot allocation (the ground truth is in the TM), but may persist other > > information (related to cluster manager interaction). For example, the > > Mesos RM persists the assigned framework identifier and per-task planning > > information (as is highly recommended by the Mesos development guide). > > > > On RM fencing, I was already wondering whether to add it to the Mesos RM, > > so it is nice to see it being introduced more generally. My rationale > is, > > the dispatcher cannot guarantee that only a single RM is running, because > > orphaned tasks are possible in certain Mesos failure situations. > > Similarly, I’m unsure whether YARN provides a strong guarantee about the > > AM. > > > > *User Code* > > Having job code on the system classpath seems possible in only a subset > of > > cases. The variability may be complex. How important is this > > optimization? > > > > *Security Implications* > > It should be noted that the standalone embodiment doesn't offer isolation > > between jobs. The whole system will have a single security context (as > it > > does now). > > > > Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios > is > > rightly emphasized. The fact that user code shouldn't be run in the > > dispatcher process (except in standalone) must be kept in mind. The > > design doc of FLINK-3929 (section C2) has more detail on that. > > > > > > -Eron > > > > > > > On Jul 28, 2016, at 2:22 AM, Maximilian Michels <[hidden email]> > wrote: > > > > > > Hi Stephan, > > > > > > Thanks for the nice wrap-up of ideas and discussions we had over the > > > last months (not all on the mailing list though because we were just > > > getting started with the FLIP process). The document is very > > > comprehensive and explains the changes in great details, even up to > > > the message passing level. > > > > > > What I really like about the FLIP is that we delegate multi-tenancy > > > away from the JobManager to the resource management framework and the > > > dispatchers. This will help to make the JobManager component cleaner > > > and simpler. The prospect of having the user jars directly in the > > > system classpath of the workers, instead of dealing with custom class > > > loaders, is very nice. > > > > > > The model we have for acquiring and releasing resources wouldn't work > > > particularly well with all the new deployment options, so +1 on a new > > > task slot request/offer system and +1 for making the ResourceManager > > > responsible for TaskManager registration and slot management. This is > > > well aligned with the initial idea of the ResourceManager component. > > > > > > We definitely need good testing for these changes since the > > > possibility of bugs increases with the additional number of messages > > > introduced. > > > > > > The only thing that bugs me is whether we make the Standalone mode a > > > bit less nice to use. The initial bootstrapping of the nodes via the > > > local dispatchers and the subsequent registration of TaskManagers and > > > allocation of slots could cause some delay. It's not a major concern > > > though because it will take little time compared to the actual job run > > > time (unless you run a tiny WordCount). > > > > > > Cheers, > > > Max > > > > > > > > > > > > > > > On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> > wrote: > > >> Hi all! > > >> > > >> Here comes a pretty big FLIP: "Improvements to the Flink Deployment > and > > >> Process Model", to better support Yarn, Mesos, Kubernetes, and > whatever > > >> else Google, Elon Musk, and all the other folks will think up next. > > >> > > >> > > https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=65147077 > > >> > > >> It is a pretty big FLIP where I took input and thoughts from many > > people, > > >> like Till, Max, Xiaowei (and his colleagues), Eron, and others. > > >> > > >> The core ideas revolve around > > >> - making the JobManager in its core a per-job component (handle multi > > >> tenancey outside the JobManager) > > >> - making resource acquisition and release more dynamic > > >> - tying deployments more naturally to jobs where desirable > > >> > > >> > > >> Let's get the discussion started... > > >> > > >> Greetings, > > >> Stephan > > > > > |
In reply to this post by Stephan Ewen
Let me rephrase my comment on the dispatcher. I mean that its API would be job-centric, i.e. with operations like `execute(jobspec)` rather than operations like `createSession` that the status-quo would suggest. Since writing those comments I’ve put more time into developing the Mesos dispatcher with FLIP-6 in mind. I see that Till is spinning up an effort too, so we should all sync up in the near future. Eron > On Aug 5, 2016, at 7:30 AM, Stephan Ewen <[hidden email]> wrote: > > Hi Eron! > > Some comments on your comments: > > *Dispatcher* > - The dispatcher should NOT be job-centric. The dispatcher should take > over the "multi job" responsibilities here, now that the JobManager is > single-job only. > - An abstract dispatcher would be great. It could implement the > connection/HTTP elements and have an abstract method to start a job > -> Yarn - use YarnClusterClient to start a YarnJob > -> Mesos - same thing > -> Standalone - spawn a JobManager > > *Client* > This is an interesting point. Max is currently refactoring the clients into > - Cluster Client (with specialization for Yarn, Standalone) to launch > jobs and control a cluster (yarn session, ...) > - Job Client, which is connected to a single job and can issue commands > to that job (cancel/stop/checkpoint/savepoint/change-parallelism) > > Let's try and get his input on this. > > > *RM* > Agreed - the base RM is "stateless", specialized RMs can behave different, > if they need to. > RM fencing must be generic - all cluster types can suffer from orphaned > tasks (Yarn as well, I think) > > > *User Code* > I think in the cases where processes/containers are launched per-job, this > should always be feasible. It is a nice optimization that I think we should > do where ever possible. Makes users' life with respect to classloading much > easier. > Some cases with custom class loading are currently tough in Flink - that > way, these jobs would at least run in the yarn/mesos individual job mode > (not the session mode still, that one needs dynamic class loading). > > *Standalone Security* > That is a known limitation and fine for now, I think. Whoever wants proper > security needs to go to Yarn/Mesos initially. Standalone v2.0 may change > that. > > Greetings, > Stephan > > > > On Sat, Jul 30, 2016 at 12:26 AM, Wright, Eron <[hidden email]> wrote: > >> The design looks great - it solves for very diverse deployment modes, >> allows for heterogeneous TMs, and promotes job isolation. >> >> Some feedback: >> >> *Dispatcher* >> The dispatcher concept here expands nicely on what was introduced in the >> Mesos design doc (MESOS-1984). The most significant difference being the >> job-centric orientation of the dispatcher API. FLIP-6 seems to eliminate >> the concept of a session (or, defines it simply as the lifecycle of a JM); >> is that correct? Do you agree I should revise the Mesos dispatcher >> design to be job-centric? >> >> I'll be taking the first crack at implementing the dispatcher (for Mesos >> only) in MESOS-1984 (T2). I’ll keep FLIP-6 in mind as I go. >> >> The dispatcher's backend behavior will vary significantly for Mesos vs >> standalone vs others. Assumedly a base class with concrete >> implementations will be introduced. To echo the FLIP-6 design as I >> understand it: >> >> 1) Standalone >> a) The dispatcher process starts an RM, dispatcher frontend, and >> "local" dispatcher backend at startup. >> b) Upon job submission, the local dispatcher backend creates an >> in-process JM actor for the job. >> c) The JM allocates slots as normal. The RM draws from its pool of >> registered TM, which grows and shrinks due (only) to external events. >> >> 2) Mesos >> a) The dispatcher process starts a dispatcher frontend and "Mesos" >> dispatcher backend at startup. >> b) Upon job submission, the Mesos dispatcher backend creates a Mesos >> task (dubbed an "AppMaster") which contains a JM/RM for the job. >> c) The system otherwise functions as described in the Mesos design doc. >> >> *Client* >> I'm concerned about the two code paths that the client uses to launch a >> job (with-dispatcher vs without-dispatcher). Maybe it could be unified by >> saying that the client always calls the dispatcher, and that the dispatcher >> is hostable in either the client or in a separate process. The only >> variance would be the client-to-dispatcher transport (local vs HTTP). >> >> *RM* >> On the issue of RM statefulness, we can say that the RM does not persist >> slot allocation (the ground truth is in the TM), but may persist other >> information (related to cluster manager interaction). For example, the >> Mesos RM persists the assigned framework identifier and per-task planning >> information (as is highly recommended by the Mesos development guide). >> >> On RM fencing, I was already wondering whether to add it to the Mesos RM, >> so it is nice to see it being introduced more generally. My rationale is, >> the dispatcher cannot guarantee that only a single RM is running, because >> orphaned tasks are possible in certain Mesos failure situations. >> Similarly, I’m unsure whether YARN provides a strong guarantee about the >> AM. >> >> *User Code* >> Having job code on the system classpath seems possible in only a subset of >> cases. The variability may be complex. How important is this >> optimization? >> >> *Security Implications* >> It should be noted that the standalone embodiment doesn't offer isolation >> between jobs. The whole system will have a single security context (as it >> does now). >> >> Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios is >> rightly emphasized. The fact that user code shouldn't be run in the >> dispatcher process (except in standalone) must be kept in mind. The >> design doc of FLINK-3929 (section C2) has more detail on that. >> >> >> -Eron >> >> >>> On Jul 28, 2016, at 2:22 AM, Maximilian Michels <[hidden email]> wrote: >>> >>> Hi Stephan, >>> >>> Thanks for the nice wrap-up of ideas and discussions we had over the >>> last months (not all on the mailing list though because we were just >>> getting started with the FLIP process). The document is very >>> comprehensive and explains the changes in great details, even up to >>> the message passing level. >>> >>> What I really like about the FLIP is that we delegate multi-tenancy >>> away from the JobManager to the resource management framework and the >>> dispatchers. This will help to make the JobManager component cleaner >>> and simpler. The prospect of having the user jars directly in the >>> system classpath of the workers, instead of dealing with custom class >>> loaders, is very nice. >>> >>> The model we have for acquiring and releasing resources wouldn't work >>> particularly well with all the new deployment options, so +1 on a new >>> task slot request/offer system and +1 for making the ResourceManager >>> responsible for TaskManager registration and slot management. This is >>> well aligned with the initial idea of the ResourceManager component. >>> >>> We definitely need good testing for these changes since the >>> possibility of bugs increases with the additional number of messages >>> introduced. >>> >>> The only thing that bugs me is whether we make the Standalone mode a >>> bit less nice to use. The initial bootstrapping of the nodes via the >>> local dispatchers and the subsequent registration of TaskManagers and >>> allocation of slots could cause some delay. It's not a major concern >>> though because it will take little time compared to the actual job run >>> time (unless you run a tiny WordCount). >>> >>> Cheers, >>> Max >>> >>> >>> >>> >>> On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> wrote: >>>> Hi all! >>>> >>>> Here comes a pretty big FLIP: "Improvements to the Flink Deployment and >>>> Process Model", to better support Yarn, Mesos, Kubernetes, and whatever >>>> else Google, Elon Musk, and all the other folks will think up next. >>>> >>>> https://cwiki.apache.org/confluence/pages/viewpage. >> action?pageId=65147077 >>>> >>>> It is a pretty big FLIP where I took input and thoughts from many >> people, >>>> like Till, Max, Xiaowei (and his colleagues), Eron, and others. >>>> >>>> The core ideas revolve around >>>> - making the JobManager in its core a per-job component (handle multi >>>> tenancey outside the JobManager) >>>> - making resource acquisition and release more dynamic >>>> - tying deployments more naturally to jobs where desirable >>>> >>>> >>>> Let's get the discussion started... >>>> >>>> Greetings, >>>> Stephan >> >> |
In reply to this post by Kurt Young
@Kurt
You raise some good points. These are tricky issues indeed. Here are some thoughts: (1) I think the resources required for a function can only be decided by the user (at least in a first version). If I recall correctly think Blink used annotations in Yarn to the user code to define how many resources a function should require. For all cases where no such annotations are set, I think we should interpret that as "no special requirements" - and request slots of a default size for that. - For standalone, the size of slots is simply determined by the size of the TaskManager process, divided by the number of slots. - For Yarn, I think we need to ask for a default container size, similar as we do in the current version (through -ym and other flags) (2) Slot sharing on the level of SlotSharingGroup and CoLocationConstraint is something that I would like to keep out of the ResourceManager/SlotPool/etc These concepts may actually go away in the future (I would definitely like to remove the CoLocationConstraint once we have cleaned up a few things in the iterations code). The ResourceManager would think about combining slots into containers (i.e. allocate multi-slot containers). It could allocate a 2 vcore container with 10 slots of 0.2 vcores. The best way to think about a slot would in that sense be the unit that is independently allocated and released by the scheduler. Greetings, Stephan On Mon, Aug 1, 2016 at 3:44 AM, Kurt Young <[hidden email]> wrote: > Thanks for the great proposal. > > There are still 2 issues i concerned with which i want to discuss with. > > #1 Who should decide the resources one operator uses, user or framework? > Like how much cpu or memory will cost by my "map" operator, does it seem a > little bit too low level for the users, should we expose some APIs for > these? > > #2 Who decides to combine the slots into a real container in Yarn and Mesos > mode? Currently, flink has an optimize for resource utilization which > called SlotSharingGroup. This took effects before flink allocate resources, > we combine as many operators as we could into one single *SharedSlot* > (which i think it's still a Slot). It seems all the combination or > optimization are done before we allocate resources, so should we > distinguish the differences between slots and containers(if we want > introduces this concept, but i think it's needed by standalone mode). If > the answer is yes, it will lead us to the situation that both JobManager > and ResourceManager will know how to utilize resources. For logic like > SlotSharingGroup, it's more appropriate to let Scheduler handle because > it's has a lot informations about JobGraph and some constraint on it. But > for some other logics which are more pure resources aware or Cluster > specified, we may consider to let ResourceManager handle these. E.g. there > are some limitation about Yarn's allocation, we can only allocate > containers with "integer" vcores, so it's not possible for us to have some > 0.1 or 0.2 vcore for now. We have bypassed this by combining some operators > into one slot or it will cause waste of resources. But, i think it's better > if we can make only one role aware all the resources utilizations. > > Thanks > Kurt > > On Thu, Jul 28, 2016 at 5:22 PM, Maximilian Michels <[hidden email]> > wrote: > > > Hi Stephan, > > > > Thanks for the nice wrap-up of ideas and discussions we had over the > > last months (not all on the mailing list though because we were just > > getting started with the FLIP process). The document is very > > comprehensive and explains the changes in great details, even up to > > the message passing level. > > > > What I really like about the FLIP is that we delegate multi-tenancy > > away from the JobManager to the resource management framework and the > > dispatchers. This will help to make the JobManager component cleaner > > and simpler. The prospect of having the user jars directly in the > > system classpath of the workers, instead of dealing with custom class > > loaders, is very nice. > > > > The model we have for acquiring and releasing resources wouldn't work > > particularly well with all the new deployment options, so +1 on a new > > task slot request/offer system and +1 for making the ResourceManager > > responsible for TaskManager registration and slot management. This is > > well aligned with the initial idea of the ResourceManager component. > > > > We definitely need good testing for these changes since the > > possibility of bugs increases with the additional number of messages > > introduced. > > > > The only thing that bugs me is whether we make the Standalone mode a > > bit less nice to use. The initial bootstrapping of the nodes via the > > local dispatchers and the subsequent registration of TaskManagers and > > allocation of slots could cause some delay. It's not a major concern > > though because it will take little time compared to the actual job run > > time (unless you run a tiny WordCount). > > > > Cheers, > > Max > > > > > > > > > > On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> wrote: > > > Hi all! > > > > > > Here comes a pretty big FLIP: "Improvements to the Flink Deployment and > > > Process Model", to better support Yarn, Mesos, Kubernetes, and whatever > > > else Google, Elon Musk, and all the other folks will think up next. > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pa > geId=65147077 > > > > > > It is a pretty big FLIP where I took input and thoughts from many > people, > > > like Till, Max, Xiaowei (and his colleagues), Eron, and others. > > > > > > The core ideas revolve around > > > - making the JobManager in its core a per-job component (handle multi > > > tenancey outside the JobManager) > > > - making resource acquisition and release more dynamic > > > - tying deployments more naturally to jobs where desirable > > > > > > > > > Let's get the discussion started... > > > > > > Greetings, > > > Stephan > > > |
In reply to this post by Eron Wright
Hi Eron!
As per our separate discussion, the main thing here is a confusion about what a session is. I would keep the initial API of the dispatcher to just "launchJob(JobGraph, artifacts)". It would simply start individual jobs in the cluster. If we want to make the dispatcher "resource session" aware, we need to decide between two options, in my opinion: - The dispatcher simply launches the session's core process (ResourceManager and SessionJobManager) and returns the sessions endpoint info. The SessionExecutionEnvironment then uses a client that directly communicates with the SessionJobManager and sends it new jobs. - The dispatcher is the gateway for all interactions with that session, meaning it forwards the calls by the SessionExecutionEnvironment to the SessionJobManager. My gut feeling is that this is sufficiently separate from the "dispatch/start individual job" functionality that it would not hurt to only focus on that in the first iteration. Greetings, Stephan On Fri, Aug 5, 2016 at 6:19 PM, Wright, Eron <[hidden email]> wrote: > > Let me rephrase my comment on the dispatcher. I mean that its API would > be job-centric, i.e. with operations like `execute(jobspec)` rather than > operations like `createSession` that the status-quo would suggest. > > Since writing those comments I’ve put more time into developing the Mesos > dispatcher with FLIP-6 in mind. I see that Till is spinning up an effort > too, so we should all sync up in the near future. > > Eron > > > > > On Aug 5, 2016, at 7:30 AM, Stephan Ewen <[hidden email]> wrote: > > > > Hi Eron! > > > > Some comments on your comments: > > > > *Dispatcher* > > - The dispatcher should NOT be job-centric. The dispatcher should take > > over the "multi job" responsibilities here, now that the JobManager is > > single-job only. > > - An abstract dispatcher would be great. It could implement the > > connection/HTTP elements and have an abstract method to start a job > > -> Yarn - use YarnClusterClient to start a YarnJob > > -> Mesos - same thing > > -> Standalone - spawn a JobManager > > > > *Client* > > This is an interesting point. Max is currently refactoring the clients > into > > - Cluster Client (with specialization for Yarn, Standalone) to launch > > jobs and control a cluster (yarn session, ...) > > - Job Client, which is connected to a single job and can issue commands > > to that job (cancel/stop/checkpoint/savepoint/change-parallelism) > > > > Let's try and get his input on this. > > > > > > *RM* > > Agreed - the base RM is "stateless", specialized RMs can behave > different, > > if they need to. > > RM fencing must be generic - all cluster types can suffer from orphaned > > tasks (Yarn as well, I think) > > > > > > *User Code* > > I think in the cases where processes/containers are launched per-job, > this > > should always be feasible. It is a nice optimization that I think we > should > > do where ever possible. Makes users' life with respect to classloading > much > > easier. > > Some cases with custom class loading are currently tough in Flink - that > > way, these jobs would at least run in the yarn/mesos individual job mode > > (not the session mode still, that one needs dynamic class loading). > > > > *Standalone Security* > > That is a known limitation and fine for now, I think. Whoever wants > proper > > security needs to go to Yarn/Mesos initially. Standalone v2.0 may change > > that. > > > > Greetings, > > Stephan > > > > > > > > On Sat, Jul 30, 2016 at 12:26 AM, Wright, Eron <[hidden email]> wrote: > > > >> The design looks great - it solves for very diverse deployment modes, > >> allows for heterogeneous TMs, and promotes job isolation. > >> > >> Some feedback: > >> > >> *Dispatcher* > >> The dispatcher concept here expands nicely on what was introduced in the > >> Mesos design doc (MESOS-1984). The most significant difference being > the > >> job-centric orientation of the dispatcher API. FLIP-6 seems to > eliminate > >> the concept of a session (or, defines it simply as the lifecycle of a > JM); > >> is that correct? Do you agree I should revise the Mesos dispatcher > >> design to be job-centric? > >> > >> I'll be taking the first crack at implementing the dispatcher (for Mesos > >> only) in MESOS-1984 (T2). I’ll keep FLIP-6 in mind as I go. > >> > >> The dispatcher's backend behavior will vary significantly for Mesos vs > >> standalone vs others. Assumedly a base class with concrete > >> implementations will be introduced. To echo the FLIP-6 design as I > >> understand it: > >> > >> 1) Standalone > >> a) The dispatcher process starts an RM, dispatcher frontend, and > >> "local" dispatcher backend at startup. > >> b) Upon job submission, the local dispatcher backend creates an > >> in-process JM actor for the job. > >> c) The JM allocates slots as normal. The RM draws from its pool of > >> registered TM, which grows and shrinks due (only) to external events. > >> > >> 2) Mesos > >> a) The dispatcher process starts a dispatcher frontend and "Mesos" > >> dispatcher backend at startup. > >> b) Upon job submission, the Mesos dispatcher backend creates a Mesos > >> task (dubbed an "AppMaster") which contains a JM/RM for the job. > >> c) The system otherwise functions as described in the Mesos design > doc. > >> > >> *Client* > >> I'm concerned about the two code paths that the client uses to launch a > >> job (with-dispatcher vs without-dispatcher). Maybe it could be > unified by > >> saying that the client always calls the dispatcher, and that the > dispatcher > >> is hostable in either the client or in a separate process. The only > >> variance would be the client-to-dispatcher transport (local vs HTTP). > >> > >> *RM* > >> On the issue of RM statefulness, we can say that the RM does not persist > >> slot allocation (the ground truth is in the TM), but may persist other > >> information (related to cluster manager interaction). For example, the > >> Mesos RM persists the assigned framework identifier and per-task > planning > >> information (as is highly recommended by the Mesos development guide). > >> > >> On RM fencing, I was already wondering whether to add it to the Mesos > RM, > >> so it is nice to see it being introduced more generally. My rationale > is, > >> the dispatcher cannot guarantee that only a single RM is running, > because > >> orphaned tasks are possible in certain Mesos failure situations. > >> Similarly, I’m unsure whether YARN provides a strong guarantee about the > >> AM. > >> > >> *User Code* > >> Having job code on the system classpath seems possible in only a subset > of > >> cases. The variability may be complex. How important is this > >> optimization? > >> > >> *Security Implications* > >> It should be noted that the standalone embodiment doesn't offer > isolation > >> between jobs. The whole system will have a single security context (as > it > >> does now). > >> > >> Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios > is > >> rightly emphasized. The fact that user code shouldn't be run in the > >> dispatcher process (except in standalone) must be kept in mind. The > >> design doc of FLINK-3929 (section C2) has more detail on that. > >> > >> > >> -Eron > >> > >> > >>> On Jul 28, 2016, at 2:22 AM, Maximilian Michels <[hidden email]> > wrote: > >>> > >>> Hi Stephan, > >>> > >>> Thanks for the nice wrap-up of ideas and discussions we had over the > >>> last months (not all on the mailing list though because we were just > >>> getting started with the FLIP process). The document is very > >>> comprehensive and explains the changes in great details, even up to > >>> the message passing level. > >>> > >>> What I really like about the FLIP is that we delegate multi-tenancy > >>> away from the JobManager to the resource management framework and the > >>> dispatchers. This will help to make the JobManager component cleaner > >>> and simpler. The prospect of having the user jars directly in the > >>> system classpath of the workers, instead of dealing with custom class > >>> loaders, is very nice. > >>> > >>> The model we have for acquiring and releasing resources wouldn't work > >>> particularly well with all the new deployment options, so +1 on a new > >>> task slot request/offer system and +1 for making the ResourceManager > >>> responsible for TaskManager registration and slot management. This is > >>> well aligned with the initial idea of the ResourceManager component. > >>> > >>> We definitely need good testing for these changes since the > >>> possibility of bugs increases with the additional number of messages > >>> introduced. > >>> > >>> The only thing that bugs me is whether we make the Standalone mode a > >>> bit less nice to use. The initial bootstrapping of the nodes via the > >>> local dispatchers and the subsequent registration of TaskManagers and > >>> allocation of slots could cause some delay. It's not a major concern > >>> though because it will take little time compared to the actual job run > >>> time (unless you run a tiny WordCount). > >>> > >>> Cheers, > >>> Max > >>> > >>> > >>> > >>> > >>> On Fri, Jul 22, 2016 at 9:26 PM, Stephan Ewen <[hidden email]> > wrote: > >>>> Hi all! > >>>> > >>>> Here comes a pretty big FLIP: "Improvements to the Flink Deployment > and > >>>> Process Model", to better support Yarn, Mesos, Kubernetes, and > whatever > >>>> else Google, Elon Musk, and all the other folks will think up next. > >>>> > >>>> https://cwiki.apache.org/confluence/pages/viewpage. > >> action?pageId=65147077 > >>>> > >>>> It is a pretty big FLIP where I took input and thoughts from many > >> people, > >>>> like Till, Max, Xiaowei (and his colleagues), Eron, and others. > >>>> > >>>> The core ideas revolve around > >>>> - making the JobManager in its core a per-job component (handle multi > >>>> tenancey outside the JobManager) > >>>> - making resource acquisition and release more dynamic > >>>> - tying deployments more naturally to jobs where desirable > >>>> > >>>> > >>>> Let's get the discussion started... > >>>> > >>>> Greetings, > >>>> Stephan > >> > >> > > |
Free forum by Nabble | Edit this page |