Hi all,
Summary from the discussion about introducing Flink JobClient API[1] we draft FLIP-74[2] to gather thoughts and towards a standard public user-facing interfaces. This discussion thread aims at standardizing job level client API. But I'd like to emphasize that how to retrieve JobClient possibly causes further discussion on different level clients exposed from Flink so that a following thread will be started later to coordinate FLIP-73 and FLIP-74 on expose issue. Looking forward to your opinions. Best, tison. [1] https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API |
Hi Tison,
Thanks for proposing the document! I had some comments on the document. I think the only complex thing that we still need to figure out is how to get a JobClient for a job that is already running. As you mentioned in the document. Currently I’m thinking that its ok to add a method to Executor for retrieving a JobClient for a running job by providing an ID. Let’s see what Kostas has to say on the topic. Best, Aljoscha > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote: > > Hi all, > > Summary from the discussion about introducing Flink JobClient API[1] we > draft FLIP-74[2] to > gather thoughts and towards a standard public user-facing interfaces. > > This discussion thread aims at standardizing job level client API. But I'd > like to emphasize that > how to retrieve JobClient possibly causes further discussion on different > level clients exposed from > Flink so that a following thread will be started later to coordinate > FLIP-73 and FLIP-74 on > expose issue. > > Looking forward to your opinions. > > Best, > tison. > > [1] > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API |
Hi Tison,
Thanks for the FLIP and launching the discussion! As a first note, big +1 on providing/exposing a JobClient to the users! Some points that would be nice to be clarified: 1) You mention that we can get rid of the DETACHED mode: I agree that at a high level, given that everything will now be asynchronous, there is no need to keep the DETACHED mode but I think we should specify some aspects. For example, without the explicit separation of the modes, what happens when the job finishes. Does the client periodically poll for the result always or the result is pushed when in NON-DETACHED mode? What happens if the client disconnects and reconnects? 2) On the "how to retrieve a JobClient for a running Job", I think this is related to the other discussion you opened in the ML about multi-layered clients. First of all, I agree that exposing different "levels" of clients would be a nice addition, and actually there have been some discussions about doing so in the future. Now for this specific discussion: i) I do not think that we should expose the ClusterDescriptor/ClusterSpecification to the user, as this ties us to a specific architecture which may change in the future. ii) I do not think it should be the Executor that will provide a JobClient for an already running job (only for the Jobs that it submits). The job of the executor should just be to execute() a pipeline. iii) I think a solution that respects the separation of concerns could be the addition of another component (in the future), something like a ClientFactory, or ClusterFactory that will have methods like: ClusterClient createCluster(Configuration), JobClient retrieveJobClient(Configuration , JobId), maybe even (although not sure) Executor getExecutor(Configuration ) and maybe more. This component would be responsible to interact with a cluster manager like Yarn and do what is now being done by the ClusterDescriptor plus some more stuff. Although under the hood all these abstractions (Environments, Executors, ...) underneath use the same clients, I believe their job/existence is not contradicting but they simply hide some of the complexity from the user, and give us, as developers some freedom to change in the future some of the parts. For example, the executor will take a Pipeline, create a JobGraph and submit it, instead of requiring the user to do each step separately. This allows us to, for example, get rid of the Plan if in the future everything is DataStream. Essentially, I think of these as layers of an onion with the clients being close to the core. The higher you go, the more functionality is included and hidden from the public eye. Point iii) by the way is just a thought and by no means final. I also like the idea of multi-layered clients so this may spark up the discussion. Cheers, Kostas On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]> wrote: > > Hi Tison, > > Thanks for proposing the document! I had some comments on the document. > > I think the only complex thing that we still need to figure out is how to get a JobClient for a job that is already running. As you mentioned in the document. Currently I’m thinking that its ok to add a method to Executor for retrieving a JobClient for a running job by providing an ID. Let’s see what Kostas has to say on the topic. > > Best, > Aljoscha > > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote: > > > > Hi all, > > > > Summary from the discussion about introducing Flink JobClient API[1] we > > draft FLIP-74[2] to > > gather thoughts and towards a standard public user-facing interfaces. > > > > This discussion thread aims at standardizing job level client API. But I'd > > like to emphasize that > > how to retrieve JobClient possibly causes further discussion on different > > level clients exposed from > > Flink so that a following thread will be started later to coordinate > > FLIP-73 and FLIP-74 on > > expose issue. > > > > Looking forward to your opinions. > > > > Best, > > tison. > > > > [1] > > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > |
Thanks for your replies Kostas & Aljoscha!
Below are replies point by point. 1. For DETACHED mode, what I said there is about the DETACHED mode in client side. There are two configurations overload the item DETACHED[1]. In client side, it means whether or not client.submitJob is blocking to job execution result. Due to client.submitJob returns CompletableFuture<JobClient> NON-DETACHED is no power at all. Caller of submitJob makes the decision whether or not blocking to get the JobClient and request for the job execution result. If client crashes, it is a user scope exception that should be handled in user code; if client lost connection to cluster, we have a retry times and interval configuration that automatically retry and throws an user scope exception if exceed. Your comment about poll for result or job result sounds like a concern on cluster side. In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED configured, JobCluster exits on job finished; if NON-DETACHED configured, JobCluster exits on job execution result delivered. FLIP-74 doesn't stick to changes on this scope, it is just remained. However, it is an interesting part we can revisit this implementation a bit. <see the next email for compact reply in this one> 2. The retrieval of JobClient is so important that if we don't have a way to retrieve JobClient it is a dumb public user-facing interface(what a strange state :P). About the retrieval of JobClient, as mentioned in the document, two ways should be supported. (1). Retrieved as return type of job submission. (2). Retrieve a JobClient of existing job.(with job id) I highly respect your thoughts about how Executors should be and thoughts on multi-layered clients. Although, (2) is not supported by public interfaces as summary of discussion above, we can discuss a bit on the place of Executors on multi-layered clients and find a way to retrieve JobClient of existing job with public client API. I will comment in FLIP-73 thread[2] since it is almost about Executors. Best, tison. [1] https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 [2] https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > Hi Tison, > > Thanks for the FLIP and launching the discussion! > > As a first note, big +1 on providing/exposing a JobClient to the users! > > Some points that would be nice to be clarified: > 1) You mention that we can get rid of the DETACHED mode: I agree that > at a high level, given that everything will now be asynchronous, there > is no need to keep the DETACHED mode but I think we should specify > some aspects. For example, without the explicit separation of the > modes, what happens when the job finishes. Does the client > periodically poll for the result always or the result is pushed when > in NON-DETACHED mode? What happens if the client disconnects and > reconnects? > > 2) On the "how to retrieve a JobClient for a running Job", I think > this is related to the other discussion you opened in the ML about > multi-layered clients. First of all, I agree that exposing different > "levels" of clients would be a nice addition, and actually there have > been some discussions about doing so in the future. Now for this > specific discussion: > i) I do not think that we should expose the > ClusterDescriptor/ClusterSpecification to the user, as this ties us to > a specific architecture which may change in the future. > ii) I do not think it should be the Executor that will provide a > JobClient for an already running job (only for the Jobs that it > submits). The job of the executor should just be to execute() a > pipeline. > iii) I think a solution that respects the separation of concerns > could be the addition of another component (in the future), something > like a ClientFactory, or ClusterFactory that will have methods like: > ClusterClient createCluster(Configuration), JobClient > retrieveJobClient(Configuration , JobId), maybe even (although not > sure) Executor getExecutor(Configuration ) and maybe more. This > component would be responsible to interact with a cluster manager like > Yarn and do what is now being done by the ClusterDescriptor plus some > more stuff. > > Although under the hood all these abstractions (Environments, > Executors, ...) underneath use the same clients, I believe their > job/existence is not contradicting but they simply hide some of the > complexity from the user, and give us, as developers some freedom to > change in the future some of the parts. For example, the executor will > take a Pipeline, create a JobGraph and submit it, instead of requiring > the user to do each step separately. This allows us to, for example, > get rid of the Plan if in the future everything is DataStream. > Essentially, I think of these as layers of an onion with the clients > being close to the core. The higher you go, the more functionality is > included and hidden from the public eye. > > Point iii) by the way is just a thought and by no means final. I also > like the idea of multi-layered clients so this may spark up the > discussion. > > Cheers, > Kostas > > On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]> > wrote: > > > > Hi Tison, > > > > Thanks for proposing the document! I had some comments on the document. > > > > I think the only complex thing that we still need to figure out is how > to get a JobClient for a job that is already running. As you mentioned in > the document. Currently I’m thinking that its ok to add a method to > Executor for retrieving a JobClient for a running job by providing an ID. > Let’s see what Kostas has to say on the topic. > > > > Best, > > Aljoscha > > > > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote: > > > > > > Hi all, > > > > > > Summary from the discussion about introducing Flink JobClient API[1] we > > > draft FLIP-74[2] to > > > gather thoughts and towards a standard public user-facing interfaces. > > > > > > This discussion thread aims at standardizing job level client API. But > I'd > > > like to emphasize that > > > how to retrieve JobClient possibly causes further discussion on > different > > > level clients exposed from > > > Flink so that a following thread will be started later to coordinate > > > FLIP-73 and FLIP-74 on > > > expose issue. > > > > > > Looking forward to your opinions. > > > > > > Best, > > > tison. > > > > > > [1] > > > > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > > [2] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > > > |
About JobCluster
Actually I am not quite sure what we gains from DETACHED configuration on cluster side. We don't have a NON-DETACHED JobCluster in fact in our codebase, right? It comes to me one major questions we have to answer first. *What JobCluster conceptually is exactly* Related discussion can be found in JIRA[1] and mailing list[2]. Stephan gives a nice description of JobCluster: Two things to add: - The job mode is very nice in the way that it runs the client inside the cluster (in the same image/process that is the JM) and thus unifies both applications and what the Spark world calls the "driver mode". - Another thing I would add is that during the FLIP-6 design, we were thinking about setups where Dispatcher and JobManager are separate processes. A Yarn or Mesos Dispatcher of a session could run independently (even as privileged processes executing no code). Then you the "per-job" mode could still be helpful: when a job is submitted to the dispatcher, it launches the JM again in a per-job mode, so that JM and TM processes are bound to teh job only. For higher security setups, it is important that processes are not reused across jobs. However, currently in "per-job" mode we generate JobGraph in client side, launching the JobCluster and retrieve the JobGraph for execution. So actually, we don't "run the client inside the cluster". Besides, refer to the discussion with Till[1], it would be helpful we follow the same process of session mode for that of "per-job" mode in user perspective, that we don't use OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink cluster in env.execute. Generally 2 points 1. Running Flink job by invoke user main method and execute throughout, instead of create JobGraph from main-class. 2. Run the client inside the cluster. If 1 and 2 are implemented. There is obvious no need for DETACHED mode in cluster side because we just shutdown the cluster on the exit of client that running inside cluster. Whether or not delivered the result is up to user code. [1] https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 [2] https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > Thanks for your replies Kostas & Aljoscha! > > Below are replies point by point. > > 1. For DETACHED mode, what I said there is about the DETACHED mode in > client side. > There are two configurations overload the item DETACHED[1]. > > In client side, it means whether or not client.submitJob is blocking to > job execution result. > Due to client.submitJob returns CompletableFuture<JobClient> NON-DETACHED > is no > power at all. Caller of submitJob makes the decision whether or not > blocking to get the > JobClient and request for the job execution result. If client crashes, it > is a user scope > exception that should be handled in user code; if client lost connection > to cluster, we have > a retry times and interval configuration that automatically retry and > throws an user scope > exception if exceed. > > Your comment about poll for result or job result sounds like a concern on > cluster side. > > In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED > configured, > JobCluster exits on job finished; if NON-DETACHED configured, JobCluster > exits on job > execution result delivered. FLIP-74 doesn't stick to changes on this > scope, it is just remained. > > However, it is an interesting part we can revisit this implementation a > bit. > > <see the next email for compact reply in this one> > > 2. The retrieval of JobClient is so important that if we don't have a way > to retrieve JobClient it is > a dumb public user-facing interface(what a strange state :P). > > About the retrieval of JobClient, as mentioned in the document, two ways > should be supported. > > (1). Retrieved as return type of job submission. > (2). Retrieve a JobClient of existing job.(with job id) > > I highly respect your thoughts about how Executors should be and thoughts > on multi-layered clients. > Although, (2) is not supported by public interfaces as summary of > discussion above, we can discuss > a bit on the place of Executors on multi-layered clients and find a way to > retrieve JobClient of > existing job with public client API. I will comment in FLIP-73 thread[2] > since it is almost about Executors. > > Best, > tison. > > [1] > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 > [2] > https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E > > > > > Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > >> Hi Tison, >> >> Thanks for the FLIP and launching the discussion! >> >> As a first note, big +1 on providing/exposing a JobClient to the users! >> >> Some points that would be nice to be clarified: >> 1) You mention that we can get rid of the DETACHED mode: I agree that >> at a high level, given that everything will now be asynchronous, there >> is no need to keep the DETACHED mode but I think we should specify >> some aspects. For example, without the explicit separation of the >> modes, what happens when the job finishes. Does the client >> periodically poll for the result always or the result is pushed when >> in NON-DETACHED mode? What happens if the client disconnects and >> reconnects? >> >> 2) On the "how to retrieve a JobClient for a running Job", I think >> this is related to the other discussion you opened in the ML about >> multi-layered clients. First of all, I agree that exposing different >> "levels" of clients would be a nice addition, and actually there have >> been some discussions about doing so in the future. Now for this >> specific discussion: >> i) I do not think that we should expose the >> ClusterDescriptor/ClusterSpecification to the user, as this ties us to >> a specific architecture which may change in the future. >> ii) I do not think it should be the Executor that will provide a >> JobClient for an already running job (only for the Jobs that it >> submits). The job of the executor should just be to execute() a >> pipeline. >> iii) I think a solution that respects the separation of concerns >> could be the addition of another component (in the future), something >> like a ClientFactory, or ClusterFactory that will have methods like: >> ClusterClient createCluster(Configuration), JobClient >> retrieveJobClient(Configuration , JobId), maybe even (although not >> sure) Executor getExecutor(Configuration ) and maybe more. This >> component would be responsible to interact with a cluster manager like >> Yarn and do what is now being done by the ClusterDescriptor plus some >> more stuff. >> >> Although under the hood all these abstractions (Environments, >> Executors, ...) underneath use the same clients, I believe their >> job/existence is not contradicting but they simply hide some of the >> complexity from the user, and give us, as developers some freedom to >> change in the future some of the parts. For example, the executor will >> take a Pipeline, create a JobGraph and submit it, instead of requiring >> the user to do each step separately. This allows us to, for example, >> get rid of the Plan if in the future everything is DataStream. >> Essentially, I think of these as layers of an onion with the clients >> being close to the core. The higher you go, the more functionality is >> included and hidden from the public eye. >> >> Point iii) by the way is just a thought and by no means final. I also >> like the idea of multi-layered clients so this may spark up the >> discussion. >> >> Cheers, >> Kostas >> >> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]> >> wrote: >> > >> > Hi Tison, >> > >> > Thanks for proposing the document! I had some comments on the document. >> > >> > I think the only complex thing that we still need to figure out is how >> to get a JobClient for a job that is already running. As you mentioned in >> the document. Currently I’m thinking that its ok to add a method to >> Executor for retrieving a JobClient for a running job by providing an ID. >> Let’s see what Kostas has to say on the topic. >> > >> > Best, >> > Aljoscha >> > >> > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote: >> > > >> > > Hi all, >> > > >> > > Summary from the discussion about introducing Flink JobClient API[1] >> we >> > > draft FLIP-74[2] to >> > > gather thoughts and towards a standard public user-facing interfaces. >> > > >> > > This discussion thread aims at standardizing job level client API. >> But I'd >> > > like to emphasize that >> > > how to retrieve JobClient possibly causes further discussion on >> different >> > > level clients exposed from >> > > Flink so that a following thread will be started later to coordinate >> > > FLIP-73 and FLIP-74 on >> > > expose issue. >> > > >> > > Looking forward to your opinions. >> > > >> > > Best, >> > > tison. >> > > >> > > [1] >> > > >> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >> > > [2] >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API >> > >> > |
modify
/we just shutdown the cluster on the exit of client that running inside cluster/ to we just shutdown the cluster on both the exit of client that running inside cluster and the finish of job. Since client is running inside cluster we can easily wait for the end of two both in ClusterEntrypoint. Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: > About JobCluster > > Actually I am not quite sure what we gains from DETACHED configuration on > cluster side. > We don't have a NON-DETACHED JobCluster in fact in our codebase, right? > > It comes to me one major questions we have to answer first. > > *What JobCluster conceptually is exactly* > > Related discussion can be found in JIRA[1] and mailing list[2]. Stephan > gives a nice > description of JobCluster: > > Two things to add: - The job mode is very nice in the way that it runs the > client inside the cluster (in the same image/process that is the JM) and > thus unifies both applications and what the Spark world calls the "driver > mode". - Another thing I would add is that during the FLIP-6 design, we > were thinking about setups where Dispatcher and JobManager are separate > processes. A Yarn or Mesos Dispatcher of a session could run independently > (even as privileged processes executing no code). Then you the "per-job" > mode could still be helpful: when a job is submitted to the dispatcher, it > launches the JM again in a per-job mode, so that JM and TM processes are > bound to teh job only. For higher security setups, it is important that > processes are not reused across jobs. > > However, currently in "per-job" mode we generate JobGraph in client side, > launching > the JobCluster and retrieve the JobGraph for execution. So actually, we > don't "run the > client inside the cluster". > > Besides, refer to the discussion with Till[1], it would be helpful we > follow the same process > of session mode for that of "per-job" mode in user perspective, that we > don't use > OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink > cluster in env.execute. > > Generally 2 points > > 1. Running Flink job by invoke user main method and execute throughout, > instead of create > JobGraph from main-class. > 2. Run the client inside the cluster. > > If 1 and 2 are implemented. There is obvious no need for DETACHED mode in > cluster side > because we just shutdown the cluster on the exit of client that running > inside cluster. Whether > or not delivered the result is up to user code. > > [1] > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 > [2] > https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E > > > Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > >> Thanks for your replies Kostas & Aljoscha! >> >> Below are replies point by point. >> >> 1. For DETACHED mode, what I said there is about the DETACHED mode in >> client side. >> There are two configurations overload the item DETACHED[1]. >> >> In client side, it means whether or not client.submitJob is blocking to >> job execution result. >> Due to client.submitJob returns CompletableFuture<JobClient> NON-DETACHED >> is no >> power at all. Caller of submitJob makes the decision whether or not >> blocking to get the >> JobClient and request for the job execution result. If client crashes, it >> is a user scope >> exception that should be handled in user code; if client lost connection >> to cluster, we have >> a retry times and interval configuration that automatically retry and >> throws an user scope >> exception if exceed. >> >> Your comment about poll for result or job result sounds like a concern on >> cluster side. >> >> In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED >> configured, >> JobCluster exits on job finished; if NON-DETACHED configured, JobCluster >> exits on job >> execution result delivered. FLIP-74 doesn't stick to changes on this >> scope, it is just remained. >> >> However, it is an interesting part we can revisit this implementation a >> bit. >> >> <see the next email for compact reply in this one> >> >> 2. The retrieval of JobClient is so important that if we don't have a way >> to retrieve JobClient it is >> a dumb public user-facing interface(what a strange state :P). >> >> About the retrieval of JobClient, as mentioned in the document, two ways >> should be supported. >> >> (1). Retrieved as return type of job submission. >> (2). Retrieve a JobClient of existing job.(with job id) >> >> I highly respect your thoughts about how Executors should be and thoughts >> on multi-layered clients. >> Although, (2) is not supported by public interfaces as summary of >> discussion above, we can discuss >> a bit on the place of Executors on multi-layered clients and find a way >> to retrieve JobClient of >> existing job with public client API. I will comment in FLIP-73 thread[2] >> since it is almost about Executors. >> >> Best, >> tison. >> >> [1] >> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 >> [2] >> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E >> >> >> >> >> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: >> >>> Hi Tison, >>> >>> Thanks for the FLIP and launching the discussion! >>> >>> As a first note, big +1 on providing/exposing a JobClient to the users! >>> >>> Some points that would be nice to be clarified: >>> 1) You mention that we can get rid of the DETACHED mode: I agree that >>> at a high level, given that everything will now be asynchronous, there >>> is no need to keep the DETACHED mode but I think we should specify >>> some aspects. For example, without the explicit separation of the >>> modes, what happens when the job finishes. Does the client >>> periodically poll for the result always or the result is pushed when >>> in NON-DETACHED mode? What happens if the client disconnects and >>> reconnects? >>> >>> 2) On the "how to retrieve a JobClient for a running Job", I think >>> this is related to the other discussion you opened in the ML about >>> multi-layered clients. First of all, I agree that exposing different >>> "levels" of clients would be a nice addition, and actually there have >>> been some discussions about doing so in the future. Now for this >>> specific discussion: >>> i) I do not think that we should expose the >>> ClusterDescriptor/ClusterSpecification to the user, as this ties us to >>> a specific architecture which may change in the future. >>> ii) I do not think it should be the Executor that will provide a >>> JobClient for an already running job (only for the Jobs that it >>> submits). The job of the executor should just be to execute() a >>> pipeline. >>> iii) I think a solution that respects the separation of concerns >>> could be the addition of another component (in the future), something >>> like a ClientFactory, or ClusterFactory that will have methods like: >>> ClusterClient createCluster(Configuration), JobClient >>> retrieveJobClient(Configuration , JobId), maybe even (although not >>> sure) Executor getExecutor(Configuration ) and maybe more. This >>> component would be responsible to interact with a cluster manager like >>> Yarn and do what is now being done by the ClusterDescriptor plus some >>> more stuff. >>> >>> Although under the hood all these abstractions (Environments, >>> Executors, ...) underneath use the same clients, I believe their >>> job/existence is not contradicting but they simply hide some of the >>> complexity from the user, and give us, as developers some freedom to >>> change in the future some of the parts. For example, the executor will >>> take a Pipeline, create a JobGraph and submit it, instead of requiring >>> the user to do each step separately. This allows us to, for example, >>> get rid of the Plan if in the future everything is DataStream. >>> Essentially, I think of these as layers of an onion with the clients >>> being close to the core. The higher you go, the more functionality is >>> included and hidden from the public eye. >>> >>> Point iii) by the way is just a thought and by no means final. I also >>> like the idea of multi-layered clients so this may spark up the >>> discussion. >>> >>> Cheers, >>> Kostas >>> >>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]> >>> wrote: >>> > >>> > Hi Tison, >>> > >>> > Thanks for proposing the document! I had some comments on the document. >>> > >>> > I think the only complex thing that we still need to figure out is how >>> to get a JobClient for a job that is already running. As you mentioned in >>> the document. Currently I’m thinking that its ok to add a method to >>> Executor for retrieving a JobClient for a running job by providing an ID. >>> Let’s see what Kostas has to say on the topic. >>> > >>> > Best, >>> > Aljoscha >>> > >>> > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote: >>> > > >>> > > Hi all, >>> > > >>> > > Summary from the discussion about introducing Flink JobClient API[1] >>> we >>> > > draft FLIP-74[2] to >>> > > gather thoughts and towards a standard public user-facing interfaces. >>> > > >>> > > This discussion thread aims at standardizing job level client API. >>> But I'd >>> > > like to emphasize that >>> > > how to retrieve JobClient possibly causes further discussion on >>> different >>> > > level clients exposed from >>> > > Flink so that a following thread will be started later to coordinate >>> > > FLIP-73 and FLIP-74 on >>> > > expose issue. >>> > > >>> > > Looking forward to your opinions. >>> > > >>> > > Best, >>> > > tison. >>> > > >>> > > [1] >>> > > >>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >>> > > [2] >>> > > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API >>> > >>> >> |
Hi all,
just a remark about the Flink REST APIs (and its client as well): almost all the times we need a way to dynamically know which jobs are contained in a jar file, and this could be exposed by the REST endpoint under /jars/:jarid/entry-points (a simple way to implement this would be to check the value of Main-class or Main-classes inside the Manifest of the jar if they exists [1]). I understand that this is something that is not strictly required to execute Flink jobs but IMHO it would ease A LOT the work of UI developers that could have a way to show the users all available jobs inside a jar + their configurable parameters. For example, right now in the WebUI, you can upload a jar and then you have to set (without any autocomplete or UI support) the main class and their params (for example using a string like --param1 xx --param2 yy). Adding this functionality to the REST API and the respective client would enable the WebUI (and all UIs interacting with a Flink cluster) to prefill a dropdown list containing the list of entry-point classes (i.e. Flink jobs) and, once selected, their required (typed) parameters. Best, Flavio [1] https://issues.apache.org/jira/browse/FLINK-10864 On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote: > modify > > /we just shutdown the cluster on the exit of client that running inside > cluster/ > > to > > we just shutdown the cluster on both the exit of client that running inside > cluster and the finish of job. > Since client is running inside cluster we can easily wait for the end of > two both in ClusterEntrypoint. > > > Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: > > > About JobCluster > > > > Actually I am not quite sure what we gains from DETACHED configuration on > > cluster side. > > We don't have a NON-DETACHED JobCluster in fact in our codebase, right? > > > > It comes to me one major questions we have to answer first. > > > > *What JobCluster conceptually is exactly* > > > > Related discussion can be found in JIRA[1] and mailing list[2]. Stephan > > gives a nice > > description of JobCluster: > > > > Two things to add: - The job mode is very nice in the way that it runs > the > > client inside the cluster (in the same image/process that is the JM) and > > thus unifies both applications and what the Spark world calls the "driver > > mode". - Another thing I would add is that during the FLIP-6 design, we > > were thinking about setups where Dispatcher and JobManager are separate > > processes. A Yarn or Mesos Dispatcher of a session could run > independently > > (even as privileged processes executing no code). Then you the "per-job" > > mode could still be helpful: when a job is submitted to the dispatcher, > it > > launches the JM again in a per-job mode, so that JM and TM processes are > > bound to teh job only. For higher security setups, it is important that > > processes are not reused across jobs. > > > > However, currently in "per-job" mode we generate JobGraph in client side, > > launching > > the JobCluster and retrieve the JobGraph for execution. So actually, we > > don't "run the > > client inside the cluster". > > > > Besides, refer to the discussion with Till[1], it would be helpful we > > follow the same process > > of session mode for that of "per-job" mode in user perspective, that we > > don't use > > OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink > > cluster in env.execute. > > > > Generally 2 points > > > > 1. Running Flink job by invoke user main method and execute throughout, > > instead of create > > JobGraph from main-class. > > 2. Run the client inside the cluster. > > > > If 1 and 2 are implemented. There is obvious no need for DETACHED mode in > > cluster side > > because we just shutdown the cluster on the exit of client that running > > inside cluster. Whether > > or not delivered the result is up to user code. > > > > [1] > > > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 > > [2] > > > https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E > > > > > > Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > > > >> Thanks for your replies Kostas & Aljoscha! > >> > >> Below are replies point by point. > >> > >> 1. For DETACHED mode, what I said there is about the DETACHED mode in > >> client side. > >> There are two configurations overload the item DETACHED[1]. > >> > >> In client side, it means whether or not client.submitJob is blocking to > >> job execution result. > >> Due to client.submitJob returns CompletableFuture<JobClient> > NON-DETACHED > >> is no > >> power at all. Caller of submitJob makes the decision whether or not > >> blocking to get the > >> JobClient and request for the job execution result. If client crashes, > it > >> is a user scope > >> exception that should be handled in user code; if client lost connection > >> to cluster, we have > >> a retry times and interval configuration that automatically retry and > >> throws an user scope > >> exception if exceed. > >> > >> Your comment about poll for result or job result sounds like a concern > on > >> cluster side. > >> > >> In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED > >> configured, > >> JobCluster exits on job finished; if NON-DETACHED configured, JobCluster > >> exits on job > >> execution result delivered. FLIP-74 doesn't stick to changes on this > >> scope, it is just remained. > >> > >> However, it is an interesting part we can revisit this implementation a > >> bit. > >> > >> <see the next email for compact reply in this one> > >> > >> 2. The retrieval of JobClient is so important that if we don't have a > way > >> to retrieve JobClient it is > >> a dumb public user-facing interface(what a strange state :P). > >> > >> About the retrieval of JobClient, as mentioned in the document, two ways > >> should be supported. > >> > >> (1). Retrieved as return type of job submission. > >> (2). Retrieve a JobClient of existing job.(with job id) > >> > >> I highly respect your thoughts about how Executors should be and > thoughts > >> on multi-layered clients. > >> Although, (2) is not supported by public interfaces as summary of > >> discussion above, we can discuss > >> a bit on the place of Executors on multi-layered clients and find a way > >> to retrieve JobClient of > >> existing job with public client API. I will comment in FLIP-73 thread[2] > >> since it is almost about Executors. > >> > >> Best, > >> tison. > >> > >> [1] > >> > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 > >> [2] > >> > https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E > >> > >> > >> > >> > >> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > >> > >>> Hi Tison, > >>> > >>> Thanks for the FLIP and launching the discussion! > >>> > >>> As a first note, big +1 on providing/exposing a JobClient to the users! > >>> > >>> Some points that would be nice to be clarified: > >>> 1) You mention that we can get rid of the DETACHED mode: I agree that > >>> at a high level, given that everything will now be asynchronous, there > >>> is no need to keep the DETACHED mode but I think we should specify > >>> some aspects. For example, without the explicit separation of the > >>> modes, what happens when the job finishes. Does the client > >>> periodically poll for the result always or the result is pushed when > >>> in NON-DETACHED mode? What happens if the client disconnects and > >>> reconnects? > >>> > >>> 2) On the "how to retrieve a JobClient for a running Job", I think > >>> this is related to the other discussion you opened in the ML about > >>> multi-layered clients. First of all, I agree that exposing different > >>> "levels" of clients would be a nice addition, and actually there have > >>> been some discussions about doing so in the future. Now for this > >>> specific discussion: > >>> i) I do not think that we should expose the > >>> ClusterDescriptor/ClusterSpecification to the user, as this ties us to > >>> a specific architecture which may change in the future. > >>> ii) I do not think it should be the Executor that will provide a > >>> JobClient for an already running job (only for the Jobs that it > >>> submits). The job of the executor should just be to execute() a > >>> pipeline. > >>> iii) I think a solution that respects the separation of concerns > >>> could be the addition of another component (in the future), something > >>> like a ClientFactory, or ClusterFactory that will have methods like: > >>> ClusterClient createCluster(Configuration), JobClient > >>> retrieveJobClient(Configuration , JobId), maybe even (although not > >>> sure) Executor getExecutor(Configuration ) and maybe more. This > >>> component would be responsible to interact with a cluster manager like > >>> Yarn and do what is now being done by the ClusterDescriptor plus some > >>> more stuff. > >>> > >>> Although under the hood all these abstractions (Environments, > >>> Executors, ...) underneath use the same clients, I believe their > >>> job/existence is not contradicting but they simply hide some of the > >>> complexity from the user, and give us, as developers some freedom to > >>> change in the future some of the parts. For example, the executor will > >>> take a Pipeline, create a JobGraph and submit it, instead of requiring > >>> the user to do each step separately. This allows us to, for example, > >>> get rid of the Plan if in the future everything is DataStream. > >>> Essentially, I think of these as layers of an onion with the clients > >>> being close to the core. The higher you go, the more functionality is > >>> included and hidden from the public eye. > >>> > >>> Point iii) by the way is just a thought and by no means final. I also > >>> like the idea of multi-layered clients so this may spark up the > >>> discussion. > >>> > >>> Cheers, > >>> Kostas > >>> > >>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]> > >>> wrote: > >>> > > >>> > Hi Tison, > >>> > > >>> > Thanks for proposing the document! I had some comments on the > document. > >>> > > >>> > I think the only complex thing that we still need to figure out is > how > >>> to get a JobClient for a job that is already running. As you mentioned > in > >>> the document. Currently I’m thinking that its ok to add a method to > >>> Executor for retrieving a JobClient for a running job by providing an > ID. > >>> Let’s see what Kostas has to say on the topic. > >>> > > >>> > Best, > >>> > Aljoscha > >>> > > >>> > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote: > >>> > > > >>> > > Hi all, > >>> > > > >>> > > Summary from the discussion about introducing Flink JobClient > API[1] > >>> we > >>> > > draft FLIP-74[2] to > >>> > > gather thoughts and towards a standard public user-facing > interfaces. > >>> > > > >>> > > This discussion thread aims at standardizing job level client API. > >>> But I'd > >>> > > like to emphasize that > >>> > > how to retrieve JobClient possibly causes further discussion on > >>> different > >>> > > level clients exposed from > >>> > > Flink so that a following thread will be started later to > coordinate > >>> > > FLIP-73 and FLIP-74 on > >>> > > expose issue. > >>> > > > >>> > > Looking forward to your opinions. > >>> > > > >>> > > Best, > >>> > > tison. > >>> > > > >>> > > [1] > >>> > > > >>> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > >>> > > [2] > >>> > > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > >>> > > >>> > >> |
Hi Flavio,
I agree that this would be good to have. But I also think that this is outside the scope of FLIP-74, I think it is an orthogonal feature. Best, Aljoscha > On 27. Sep 2019, at 10:31, Flavio Pompermaier <[hidden email]> wrote: > > Hi all, > just a remark about the Flink REST APIs (and its client as well): almost > all the times we need a way to dynamically know which jobs are contained in > a jar file, and this could be exposed by the REST endpoint under > /jars/:jarid/entry-points (a simple way to implement this would be to check > the value of Main-class or Main-classes inside the Manifest of the jar if > they exists [1]). > > I understand that this is something that is not strictly required to > execute Flink jobs but IMHO it would ease A LOT the work of UI developers > that could have a way to show the users all available jobs inside a jar + > their configurable parameters. > For example, right now in the WebUI, you can upload a jar and then you have > to set (without any autocomplete or UI support) the main class and their > params (for example using a string like --param1 xx --param2 yy). > Adding this functionality to the REST API and the respective client would > enable the WebUI (and all UIs interacting with a Flink cluster) to prefill > a dropdown list containing the list of entry-point classes (i.e. Flink > jobs) and, once selected, their required (typed) parameters. > > Best, > Flavio > > [1] https://issues.apache.org/jira/browse/FLINK-10864 > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote: > >> modify >> >> /we just shutdown the cluster on the exit of client that running inside >> cluster/ >> >> to >> >> we just shutdown the cluster on both the exit of client that running inside >> cluster and the finish of job. >> Since client is running inside cluster we can easily wait for the end of >> two both in ClusterEntrypoint. >> >> >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: >> >>> About JobCluster >>> >>> Actually I am not quite sure what we gains from DETACHED configuration on >>> cluster side. >>> We don't have a NON-DETACHED JobCluster in fact in our codebase, right? >>> >>> It comes to me one major questions we have to answer first. >>> >>> *What JobCluster conceptually is exactly* >>> >>> Related discussion can be found in JIRA[1] and mailing list[2]. Stephan >>> gives a nice >>> description of JobCluster: >>> >>> Two things to add: - The job mode is very nice in the way that it runs >> the >>> client inside the cluster (in the same image/process that is the JM) and >>> thus unifies both applications and what the Spark world calls the "driver >>> mode". - Another thing I would add is that during the FLIP-6 design, we >>> were thinking about setups where Dispatcher and JobManager are separate >>> processes. A Yarn or Mesos Dispatcher of a session could run >> independently >>> (even as privileged processes executing no code). Then you the "per-job" >>> mode could still be helpful: when a job is submitted to the dispatcher, >> it >>> launches the JM again in a per-job mode, so that JM and TM processes are >>> bound to teh job only. For higher security setups, it is important that >>> processes are not reused across jobs. >>> >>> However, currently in "per-job" mode we generate JobGraph in client side, >>> launching >>> the JobCluster and retrieve the JobGraph for execution. So actually, we >>> don't "run the >>> client inside the cluster". >>> >>> Besides, refer to the discussion with Till[1], it would be helpful we >>> follow the same process >>> of session mode for that of "per-job" mode in user perspective, that we >>> don't use >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink >>> cluster in env.execute. >>> >>> Generally 2 points >>> >>> 1. Running Flink job by invoke user main method and execute throughout, >>> instead of create >>> JobGraph from main-class. >>> 2. Run the client inside the cluster. >>> >>> If 1 and 2 are implemented. There is obvious no need for DETACHED mode in >>> cluster side >>> because we just shutdown the cluster on the exit of client that running >>> inside cluster. Whether >>> or not delivered the result is up to user code. >>> >>> [1] >>> >> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 >>> [2] >>> >> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E >>> >>> >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: >>> >>>> Thanks for your replies Kostas & Aljoscha! >>>> >>>> Below are replies point by point. >>>> >>>> 1. For DETACHED mode, what I said there is about the DETACHED mode in >>>> client side. >>>> There are two configurations overload the item DETACHED[1]. >>>> >>>> In client side, it means whether or not client.submitJob is blocking to >>>> job execution result. >>>> Due to client.submitJob returns CompletableFuture<JobClient> >> NON-DETACHED >>>> is no >>>> power at all. Caller of submitJob makes the decision whether or not >>>> blocking to get the >>>> JobClient and request for the job execution result. If client crashes, >> it >>>> is a user scope >>>> exception that should be handled in user code; if client lost connection >>>> to cluster, we have >>>> a retry times and interval configuration that automatically retry and >>>> throws an user scope >>>> exception if exceed. >>>> >>>> Your comment about poll for result or job result sounds like a concern >> on >>>> cluster side. >>>> >>>> In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED >>>> configured, >>>> JobCluster exits on job finished; if NON-DETACHED configured, JobCluster >>>> exits on job >>>> execution result delivered. FLIP-74 doesn't stick to changes on this >>>> scope, it is just remained. >>>> >>>> However, it is an interesting part we can revisit this implementation a >>>> bit. >>>> >>>> <see the next email for compact reply in this one> >>>> >>>> 2. The retrieval of JobClient is so important that if we don't have a >> way >>>> to retrieve JobClient it is >>>> a dumb public user-facing interface(what a strange state :P). >>>> >>>> About the retrieval of JobClient, as mentioned in the document, two ways >>>> should be supported. >>>> >>>> (1). Retrieved as return type of job submission. >>>> (2). Retrieve a JobClient of existing job.(with job id) >>>> >>>> I highly respect your thoughts about how Executors should be and >> thoughts >>>> on multi-layered clients. >>>> Although, (2) is not supported by public interfaces as summary of >>>> discussion above, we can discuss >>>> a bit on the place of Executors on multi-layered clients and find a way >>>> to retrieve JobClient of >>>> existing job with public client API. I will comment in FLIP-73 thread[2] >>>> since it is almost about Executors. >>>> >>>> Best, >>>> tison. >>>> >>>> [1] >>>> >> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 >>>> [2] >>>> >> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E >>>> >>>> >>>> >>>> >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: >>>> >>>>> Hi Tison, >>>>> >>>>> Thanks for the FLIP and launching the discussion! >>>>> >>>>> As a first note, big +1 on providing/exposing a JobClient to the users! >>>>> >>>>> Some points that would be nice to be clarified: >>>>> 1) You mention that we can get rid of the DETACHED mode: I agree that >>>>> at a high level, given that everything will now be asynchronous, there >>>>> is no need to keep the DETACHED mode but I think we should specify >>>>> some aspects. For example, without the explicit separation of the >>>>> modes, what happens when the job finishes. Does the client >>>>> periodically poll for the result always or the result is pushed when >>>>> in NON-DETACHED mode? What happens if the client disconnects and >>>>> reconnects? >>>>> >>>>> 2) On the "how to retrieve a JobClient for a running Job", I think >>>>> this is related to the other discussion you opened in the ML about >>>>> multi-layered clients. First of all, I agree that exposing different >>>>> "levels" of clients would be a nice addition, and actually there have >>>>> been some discussions about doing so in the future. Now for this >>>>> specific discussion: >>>>> i) I do not think that we should expose the >>>>> ClusterDescriptor/ClusterSpecification to the user, as this ties us to >>>>> a specific architecture which may change in the future. >>>>> ii) I do not think it should be the Executor that will provide a >>>>> JobClient for an already running job (only for the Jobs that it >>>>> submits). The job of the executor should just be to execute() a >>>>> pipeline. >>>>> iii) I think a solution that respects the separation of concerns >>>>> could be the addition of another component (in the future), something >>>>> like a ClientFactory, or ClusterFactory that will have methods like: >>>>> ClusterClient createCluster(Configuration), JobClient >>>>> retrieveJobClient(Configuration , JobId), maybe even (although not >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This >>>>> component would be responsible to interact with a cluster manager like >>>>> Yarn and do what is now being done by the ClusterDescriptor plus some >>>>> more stuff. >>>>> >>>>> Although under the hood all these abstractions (Environments, >>>>> Executors, ...) underneath use the same clients, I believe their >>>>> job/existence is not contradicting but they simply hide some of the >>>>> complexity from the user, and give us, as developers some freedom to >>>>> change in the future some of the parts. For example, the executor will >>>>> take a Pipeline, create a JobGraph and submit it, instead of requiring >>>>> the user to do each step separately. This allows us to, for example, >>>>> get rid of the Plan if in the future everything is DataStream. >>>>> Essentially, I think of these as layers of an onion with the clients >>>>> being close to the core. The higher you go, the more functionality is >>>>> included and hidden from the public eye. >>>>> >>>>> Point iii) by the way is just a thought and by no means final. I also >>>>> like the idea of multi-layered clients so this may spark up the >>>>> discussion. >>>>> >>>>> Cheers, >>>>> Kostas >>>>> >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]> >>>>> wrote: >>>>>> >>>>>> Hi Tison, >>>>>> >>>>>> Thanks for proposing the document! I had some comments on the >> document. >>>>>> >>>>>> I think the only complex thing that we still need to figure out is >> how >>>>> to get a JobClient for a job that is already running. As you mentioned >> in >>>>> the document. Currently I’m thinking that its ok to add a method to >>>>> Executor for retrieving a JobClient for a running job by providing an >> ID. >>>>> Let’s see what Kostas has to say on the topic. >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote: >>>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> Summary from the discussion about introducing Flink JobClient >> API[1] >>>>> we >>>>>>> draft FLIP-74[2] to >>>>>>> gather thoughts and towards a standard public user-facing >> interfaces. >>>>>>> >>>>>>> This discussion thread aims at standardizing job level client API. >>>>> But I'd >>>>>>> like to emphasize that >>>>>>> how to retrieve JobClient possibly causes further discussion on >>>>> different >>>>>>> level clients exposed from >>>>>>> Flink so that a following thread will be started later to >> coordinate >>>>>>> FLIP-73 and FLIP-74 on >>>>>>> expose issue. >>>>>>> >>>>>>> Looking forward to your opinions. >>>>>>> >>>>>>> Best, >>>>>>> tison. >>>>>>> >>>>>>> [1] >>>>>>> >>>>> >> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >>>>>>> [2] >>>>>>> >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API >>>>>> >>>>> >>>> |
Hi Zili,
thanks for working on this topic. Just read through the FLIP and I have two questions: * should we add "cancelWithSavepeoint" to a new public API, when we have deprecated the corresponding REST API/CLI methods? In my understanding there is no reason to use it anymore. * should we call "stopWithSavepoint" simply "stop" as "stop" always performs a savepoint? Best, Konstantin On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <[hidden email]> wrote: > Hi Flavio, > > I agree that this would be good to have. But I also think that this is > outside the scope of FLIP-74, I think it is an orthogonal feature. > > Best, > Aljoscha > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier <[hidden email]> > wrote: > > > > Hi all, > > just a remark about the Flink REST APIs (and its client as well): almost > > all the times we need a way to dynamically know which jobs are contained > in > > a jar file, and this could be exposed by the REST endpoint under > > /jars/:jarid/entry-points (a simple way to implement this would be to > check > > the value of Main-class or Main-classes inside the Manifest of the jar if > > they exists [1]). > > > > I understand that this is something that is not strictly required to > > execute Flink jobs but IMHO it would ease A LOT the work of UI developers > > that could have a way to show the users all available jobs inside a jar + > > their configurable parameters. > > For example, right now in the WebUI, you can upload a jar and then you > have > > to set (without any autocomplete or UI support) the main class and their > > params (for example using a string like --param1 xx --param2 yy). > > Adding this functionality to the REST API and the respective client would > > enable the WebUI (and all UIs interacting with a Flink cluster) to > prefill > > a dropdown list containing the list of entry-point classes (i.e. Flink > > jobs) and, once selected, their required (typed) parameters. > > > > Best, > > Flavio > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864 > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote: > > > >> modify > >> > >> /we just shutdown the cluster on the exit of client that running inside > >> cluster/ > >> > >> to > >> > >> we just shutdown the cluster on both the exit of client that running > inside > >> cluster and the finish of job. > >> Since client is running inside cluster we can easily wait for the end of > >> two both in ClusterEntrypoint. > >> > >> > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: > >> > >>> About JobCluster > >>> > >>> Actually I am not quite sure what we gains from DETACHED configuration > on > >>> cluster side. > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase, right? > >>> > >>> It comes to me one major questions we have to answer first. > >>> > >>> *What JobCluster conceptually is exactly* > >>> > >>> Related discussion can be found in JIRA[1] and mailing list[2]. Stephan > >>> gives a nice > >>> description of JobCluster: > >>> > >>> Two things to add: - The job mode is very nice in the way that it runs > >> the > >>> client inside the cluster (in the same image/process that is the JM) > and > >>> thus unifies both applications and what the Spark world calls the > "driver > >>> mode". - Another thing I would add is that during the FLIP-6 design, we > >>> were thinking about setups where Dispatcher and JobManager are separate > >>> processes. A Yarn or Mesos Dispatcher of a session could run > >> independently > >>> (even as privileged processes executing no code). Then you the > "per-job" > >>> mode could still be helpful: when a job is submitted to the dispatcher, > >> it > >>> launches the JM again in a per-job mode, so that JM and TM processes > are > >>> bound to teh job only. For higher security setups, it is important that > >>> processes are not reused across jobs. > >>> > >>> However, currently in "per-job" mode we generate JobGraph in client > side, > >>> launching > >>> the JobCluster and retrieve the JobGraph for execution. So actually, we > >>> don't "run the > >>> client inside the cluster". > >>> > >>> Besides, refer to the discussion with Till[1], it would be helpful we > >>> follow the same process > >>> of session mode for that of "per-job" mode in user perspective, that we > >>> don't use > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink > >>> cluster in env.execute. > >>> > >>> Generally 2 points > >>> > >>> 1. Running Flink job by invoke user main method and execute throughout, > >>> instead of create > >>> JobGraph from main-class. > >>> 2. Run the client inside the cluster. > >>> > >>> If 1 and 2 are implemented. There is obvious no need for DETACHED mode > in > >>> cluster side > >>> because we just shutdown the cluster on the exit of client that running > >>> inside cluster. Whether > >>> or not delivered the result is up to user code. > >>> > >>> [1] > >>> > >> > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 > >>> [2] > >>> > >> > https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E > >>> > >>> > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > >>> > >>>> Thanks for your replies Kostas & Aljoscha! > >>>> > >>>> Below are replies point by point. > >>>> > >>>> 1. For DETACHED mode, what I said there is about the DETACHED mode in > >>>> client side. > >>>> There are two configurations overload the item DETACHED[1]. > >>>> > >>>> In client side, it means whether or not client.submitJob is blocking > to > >>>> job execution result. > >>>> Due to client.submitJob returns CompletableFuture<JobClient> > >> NON-DETACHED > >>>> is no > >>>> power at all. Caller of submitJob makes the decision whether or not > >>>> blocking to get the > >>>> JobClient and request for the job execution result. If client crashes, > >> it > >>>> is a user scope > >>>> exception that should be handled in user code; if client lost > connection > >>>> to cluster, we have > >>>> a retry times and interval configuration that automatically retry and > >>>> throws an user scope > >>>> exception if exceed. > >>>> > >>>> Your comment about poll for result or job result sounds like a concern > >> on > >>>> cluster side. > >>>> > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If > DETACHED > >>>> configured, > >>>> JobCluster exits on job finished; if NON-DETACHED configured, > JobCluster > >>>> exits on job > >>>> execution result delivered. FLIP-74 doesn't stick to changes on this > >>>> scope, it is just remained. > >>>> > >>>> However, it is an interesting part we can revisit this implementation > a > >>>> bit. > >>>> > >>>> <see the next email for compact reply in this one> > >>>> > >>>> 2. The retrieval of JobClient is so important that if we don't have a > >> way > >>>> to retrieve JobClient it is > >>>> a dumb public user-facing interface(what a strange state :P). > >>>> > >>>> About the retrieval of JobClient, as mentioned in the document, two > ways > >>>> should be supported. > >>>> > >>>> (1). Retrieved as return type of job submission. > >>>> (2). Retrieve a JobClient of existing job.(with job id) > >>>> > >>>> I highly respect your thoughts about how Executors should be and > >> thoughts > >>>> on multi-layered clients. > >>>> Although, (2) is not supported by public interfaces as summary of > >>>> discussion above, we can discuss > >>>> a bit on the place of Executors on multi-layered clients and find a > way > >>>> to retrieve JobClient of > >>>> existing job with public client API. I will comment in FLIP-73 > thread[2] > >>>> since it is almost about Executors. > >>>> > >>>> Best, > >>>> tison. > >>>> > >>>> [1] > >>>> > >> > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 > >>>> [2] > >>>> > >> > https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E > >>>> > >>>> > >>>> > >>>> > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > >>>> > >>>>> Hi Tison, > >>>>> > >>>>> Thanks for the FLIP and launching the discussion! > >>>>> > >>>>> As a first note, big +1 on providing/exposing a JobClient to the > users! > >>>>> > >>>>> Some points that would be nice to be clarified: > >>>>> 1) You mention that we can get rid of the DETACHED mode: I agree that > >>>>> at a high level, given that everything will now be asynchronous, > there > >>>>> is no need to keep the DETACHED mode but I think we should specify > >>>>> some aspects. For example, without the explicit separation of the > >>>>> modes, what happens when the job finishes. Does the client > >>>>> periodically poll for the result always or the result is pushed when > >>>>> in NON-DETACHED mode? What happens if the client disconnects and > >>>>> reconnects? > >>>>> > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I think > >>>>> this is related to the other discussion you opened in the ML about > >>>>> multi-layered clients. First of all, I agree that exposing different > >>>>> "levels" of clients would be a nice addition, and actually there have > >>>>> been some discussions about doing so in the future. Now for this > >>>>> specific discussion: > >>>>> i) I do not think that we should expose the > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this ties us > to > >>>>> a specific architecture which may change in the future. > >>>>> ii) I do not think it should be the Executor that will provide a > >>>>> JobClient for an already running job (only for the Jobs that it > >>>>> submits). The job of the executor should just be to execute() a > >>>>> pipeline. > >>>>> iii) I think a solution that respects the separation of concerns > >>>>> could be the addition of another component (in the future), something > >>>>> like a ClientFactory, or ClusterFactory that will have methods like: > >>>>> ClusterClient createCluster(Configuration), JobClient > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although not > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This > >>>>> component would be responsible to interact with a cluster manager > like > >>>>> Yarn and do what is now being done by the ClusterDescriptor plus some > >>>>> more stuff. > >>>>> > >>>>> Although under the hood all these abstractions (Environments, > >>>>> Executors, ...) underneath use the same clients, I believe their > >>>>> job/existence is not contradicting but they simply hide some of the > >>>>> complexity from the user, and give us, as developers some freedom to > >>>>> change in the future some of the parts. For example, the executor > will > >>>>> take a Pipeline, create a JobGraph and submit it, instead of > requiring > >>>>> the user to do each step separately. This allows us to, for example, > >>>>> get rid of the Plan if in the future everything is DataStream. > >>>>> Essentially, I think of these as layers of an onion with the clients > >>>>> being close to the core. The higher you go, the more functionality is > >>>>> included and hidden from the public eye. > >>>>> > >>>>> Point iii) by the way is just a thought and by no means final. I also > >>>>> like the idea of multi-layered clients so this may spark up the > >>>>> discussion. > >>>>> > >>>>> Cheers, > >>>>> Kostas > >>>>> > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < > [hidden email]> > >>>>> wrote: > >>>>>> > >>>>>> Hi Tison, > >>>>>> > >>>>>> Thanks for proposing the document! I had some comments on the > >> document. > >>>>>> > >>>>>> I think the only complex thing that we still need to figure out is > >> how > >>>>> to get a JobClient for a job that is already running. As you > mentioned > >> in > >>>>> the document. Currently I’m thinking that its ok to add a method to > >>>>> Executor for retrieving a JobClient for a running job by providing an > >> ID. > >>>>> Let’s see what Kostas has to say on the topic. > >>>>>> > >>>>>> Best, > >>>>>> Aljoscha > >>>>>> > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote: > >>>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> Summary from the discussion about introducing Flink JobClient > >> API[1] > >>>>> we > >>>>>>> draft FLIP-74[2] to > >>>>>>> gather thoughts and towards a standard public user-facing > >> interfaces. > >>>>>>> > >>>>>>> This discussion thread aims at standardizing job level client API. > >>>>> But I'd > >>>>>>> like to emphasize that > >>>>>>> how to retrieve JobClient possibly causes further discussion on > >>>>> different > >>>>>>> level clients exposed from > >>>>>>> Flink so that a following thread will be started later to > >> coordinate > >>>>>>> FLIP-73 and FLIP-74 on > >>>>>>> expose issue. > >>>>>>> > >>>>>>> Looking forward to your opinions. > >>>>>>> > >>>>>>> Best, > >>>>>>> tison. > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>> > >> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > >>>>>>> [2] > >>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > >>>>>> > >>>>> > >>>> > > -- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData Ververica <https://www.ververica.com/> -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng |
I did not realize there was a plan to deprecate anything in the REST API?
The REST API is super important for tooling written in non JVM languages, that does not include a Flink client (like FlinkK8sOperator). The REST API should continue to support all job management operations, including job submission. Thomas On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <[hidden email]> wrote: > Hi Zili, > > thanks for working on this topic. Just read through the FLIP and I have two > questions: > > * should we add "cancelWithSavepeoint" to a new public API, when we have > deprecated the corresponding REST API/CLI methods? In my understanding > there is no reason to use it anymore. > * should we call "stopWithSavepoint" simply "stop" as "stop" always > performs a savepoint? > > Best, > > Konstantin > > > > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <[hidden email]> > wrote: > > > Hi Flavio, > > > > I agree that this would be good to have. But I also think that this is > > outside the scope of FLIP-74, I think it is an orthogonal feature. > > > > Best, > > Aljoscha > > > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier <[hidden email]> > > wrote: > > > > > > Hi all, > > > just a remark about the Flink REST APIs (and its client as well): > almost > > > all the times we need a way to dynamically know which jobs are > contained > > in > > > a jar file, and this could be exposed by the REST endpoint under > > > /jars/:jarid/entry-points (a simple way to implement this would be to > > check > > > the value of Main-class or Main-classes inside the Manifest of the jar > if > > > they exists [1]). > > > > > > I understand that this is something that is not strictly required to > > > execute Flink jobs but IMHO it would ease A LOT the work of UI > developers > > > that could have a way to show the users all available jobs inside a > jar + > > > their configurable parameters. > > > For example, right now in the WebUI, you can upload a jar and then you > > have > > > to set (without any autocomplete or UI support) the main class and > their > > > params (for example using a string like --param1 xx --param2 yy). > > > Adding this functionality to the REST API and the respective client > would > > > enable the WebUI (and all UIs interacting with a Flink cluster) to > > prefill > > > a dropdown list containing the list of entry-point classes (i.e. Flink > > > jobs) and, once selected, their required (typed) parameters. > > > > > > Best, > > > Flavio > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864 > > > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> > wrote: > > > > > >> modify > > >> > > >> /we just shutdown the cluster on the exit of client that running > inside > > >> cluster/ > > >> > > >> to > > >> > > >> we just shutdown the cluster on both the exit of client that running > > inside > > >> cluster and the finish of job. > > >> Since client is running inside cluster we can easily wait for the end > of > > >> two both in ClusterEntrypoint. > > >> > > >> > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: > > >> > > >>> About JobCluster > > >>> > > >>> Actually I am not quite sure what we gains from DETACHED > configuration > > on > > >>> cluster side. > > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase, > right? > > >>> > > >>> It comes to me one major questions we have to answer first. > > >>> > > >>> *What JobCluster conceptually is exactly* > > >>> > > >>> Related discussion can be found in JIRA[1] and mailing list[2]. > Stephan > > >>> gives a nice > > >>> description of JobCluster: > > >>> > > >>> Two things to add: - The job mode is very nice in the way that it > runs > > >> the > > >>> client inside the cluster (in the same image/process that is the JM) > > and > > >>> thus unifies both applications and what the Spark world calls the > > "driver > > >>> mode". - Another thing I would add is that during the FLIP-6 design, > we > > >>> were thinking about setups where Dispatcher and JobManager are > separate > > >>> processes. A Yarn or Mesos Dispatcher of a session could run > > >> independently > > >>> (even as privileged processes executing no code). Then you the > > "per-job" > > >>> mode could still be helpful: when a job is submitted to the > dispatcher, > > >> it > > >>> launches the JM again in a per-job mode, so that JM and TM processes > > are > > >>> bound to teh job only. For higher security setups, it is important > that > > >>> processes are not reused across jobs. > > >>> > > >>> However, currently in "per-job" mode we generate JobGraph in client > > side, > > >>> launching > > >>> the JobCluster and retrieve the JobGraph for execution. So actually, > we > > >>> don't "run the > > >>> client inside the cluster". > > >>> > > >>> Besides, refer to the discussion with Till[1], it would be helpful we > > >>> follow the same process > > >>> of session mode for that of "per-job" mode in user perspective, that > we > > >>> don't use > > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy > Flink > > >>> cluster in env.execute. > > >>> > > >>> Generally 2 points > > >>> > > >>> 1. Running Flink job by invoke user main method and execute > throughout, > > >>> instead of create > > >>> JobGraph from main-class. > > >>> 2. Run the client inside the cluster. > > >>> > > >>> If 1 and 2 are implemented. There is obvious no need for DETACHED > mode > > in > > >>> cluster side > > >>> because we just shutdown the cluster on the exit of client that > running > > >>> inside cluster. Whether > > >>> or not delivered the result is up to user code. > > >>> > > >>> [1] > > >>> > > >> > > > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 > > >>> [2] > > >>> > > >> > > > https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E > > >>> > > >>> > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > > >>> > > >>>> Thanks for your replies Kostas & Aljoscha! > > >>>> > > >>>> Below are replies point by point. > > >>>> > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED mode > in > > >>>> client side. > > >>>> There are two configurations overload the item DETACHED[1]. > > >>>> > > >>>> In client side, it means whether or not client.submitJob is blocking > > to > > >>>> job execution result. > > >>>> Due to client.submitJob returns CompletableFuture<JobClient> > > >> NON-DETACHED > > >>>> is no > > >>>> power at all. Caller of submitJob makes the decision whether or not > > >>>> blocking to get the > > >>>> JobClient and request for the job execution result. If client > crashes, > > >> it > > >>>> is a user scope > > >>>> exception that should be handled in user code; if client lost > > connection > > >>>> to cluster, we have > > >>>> a retry times and interval configuration that automatically retry > and > > >>>> throws an user scope > > >>>> exception if exceed. > > >>>> > > >>>> Your comment about poll for result or job result sounds like a > concern > > >> on > > >>>> cluster side. > > >>>> > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If > > DETACHED > > >>>> configured, > > >>>> JobCluster exits on job finished; if NON-DETACHED configured, > > JobCluster > > >>>> exits on job > > >>>> execution result delivered. FLIP-74 doesn't stick to changes on this > > >>>> scope, it is just remained. > > >>>> > > >>>> However, it is an interesting part we can revisit this > implementation > > a > > >>>> bit. > > >>>> > > >>>> <see the next email for compact reply in this one> > > >>>> > > >>>> 2. The retrieval of JobClient is so important that if we don't have > a > > >> way > > >>>> to retrieve JobClient it is > > >>>> a dumb public user-facing interface(what a strange state :P). > > >>>> > > >>>> About the retrieval of JobClient, as mentioned in the document, two > > ways > > >>>> should be supported. > > >>>> > > >>>> (1). Retrieved as return type of job submission. > > >>>> (2). Retrieve a JobClient of existing job.(with job id) > > >>>> > > >>>> I highly respect your thoughts about how Executors should be and > > >> thoughts > > >>>> on multi-layered clients. > > >>>> Although, (2) is not supported by public interfaces as summary of > > >>>> discussion above, we can discuss > > >>>> a bit on the place of Executors on multi-layered clients and find a > > way > > >>>> to retrieve JobClient of > > >>>> existing job with public client API. I will comment in FLIP-73 > > thread[2] > > >>>> since it is almost about Executors. > > >>>> > > >>>> Best, > > >>>> tison. > > >>>> > > >>>> [1] > > >>>> > > >> > > > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 > > >>>> [2] > > >>>> > > >> > > > https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > > >>>> > > >>>>> Hi Tison, > > >>>>> > > >>>>> Thanks for the FLIP and launching the discussion! > > >>>>> > > >>>>> As a first note, big +1 on providing/exposing a JobClient to the > > users! > > >>>>> > > >>>>> Some points that would be nice to be clarified: > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I agree > that > > >>>>> at a high level, given that everything will now be asynchronous, > > there > > >>>>> is no need to keep the DETACHED mode but I think we should specify > > >>>>> some aspects. For example, without the explicit separation of the > > >>>>> modes, what happens when the job finishes. Does the client > > >>>>> periodically poll for the result always or the result is pushed > when > > >>>>> in NON-DETACHED mode? What happens if the client disconnects and > > >>>>> reconnects? > > >>>>> > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I think > > >>>>> this is related to the other discussion you opened in the ML about > > >>>>> multi-layered clients. First of all, I agree that exposing > different > > >>>>> "levels" of clients would be a nice addition, and actually there > have > > >>>>> been some discussions about doing so in the future. Now for this > > >>>>> specific discussion: > > >>>>> i) I do not think that we should expose the > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this ties us > > to > > >>>>> a specific architecture which may change in the future. > > >>>>> ii) I do not think it should be the Executor that will provide > a > > >>>>> JobClient for an already running job (only for the Jobs that it > > >>>>> submits). The job of the executor should just be to execute() a > > >>>>> pipeline. > > >>>>> iii) I think a solution that respects the separation of > concerns > > >>>>> could be the addition of another component (in the future), > something > > >>>>> like a ClientFactory, or ClusterFactory that will have methods > like: > > >>>>> ClusterClient createCluster(Configuration), JobClient > > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although not > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This > > >>>>> component would be responsible to interact with a cluster manager > > like > > >>>>> Yarn and do what is now being done by the ClusterDescriptor plus > some > > >>>>> more stuff. > > >>>>> > > >>>>> Although under the hood all these abstractions (Environments, > > >>>>> Executors, ...) underneath use the same clients, I believe their > > >>>>> job/existence is not contradicting but they simply hide some of the > > >>>>> complexity from the user, and give us, as developers some freedom > to > > >>>>> change in the future some of the parts. For example, the executor > > will > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of > > requiring > > >>>>> the user to do each step separately. This allows us to, for > example, > > >>>>> get rid of the Plan if in the future everything is DataStream. > > >>>>> Essentially, I think of these as layers of an onion with the > clients > > >>>>> being close to the core. The higher you go, the more functionality > is > > >>>>> included and hidden from the public eye. > > >>>>> > > >>>>> Point iii) by the way is just a thought and by no means final. I > also > > >>>>> like the idea of multi-layered clients so this may spark up the > > >>>>> discussion. > > >>>>> > > >>>>> Cheers, > > >>>>> Kostas > > >>>>> > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < > > [hidden email]> > > >>>>> wrote: > > >>>>>> > > >>>>>> Hi Tison, > > >>>>>> > > >>>>>> Thanks for proposing the document! I had some comments on the > > >> document. > > >>>>>> > > >>>>>> I think the only complex thing that we still need to figure out is > > >> how > > >>>>> to get a JobClient for a job that is already running. As you > > mentioned > > >> in > > >>>>> the document. Currently I’m thinking that its ok to add a method to > > >>>>> Executor for retrieving a JobClient for a running job by providing > an > > >> ID. > > >>>>> Let’s see what Kostas has to say on the topic. > > >>>>>> > > >>>>>> Best, > > >>>>>> Aljoscha > > >>>>>> > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> > wrote: > > >>>>>>> > > >>>>>>> Hi all, > > >>>>>>> > > >>>>>>> Summary from the discussion about introducing Flink JobClient > > >> API[1] > > >>>>> we > > >>>>>>> draft FLIP-74[2] to > > >>>>>>> gather thoughts and towards a standard public user-facing > > >> interfaces. > > >>>>>>> > > >>>>>>> This discussion thread aims at standardizing job level client > API. > > >>>>> But I'd > > >>>>>>> like to emphasize that > > >>>>>>> how to retrieve JobClient possibly causes further discussion on > > >>>>> different > > >>>>>>> level clients exposed from > > >>>>>>> Flink so that a following thread will be started later to > > >> coordinate > > >>>>>>> FLIP-73 and FLIP-74 on > > >>>>>>> expose issue. > > >>>>>>> > > >>>>>>> Looking forward to your opinions. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> tison. > > >>>>>>> > > >>>>>>> [1] > > >>>>>>> > > >>>>> > > >> > > > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > >>>>>>> [2] > > >>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > > >>>>>> > > >>>>> > > >>>> > > > > > > -- > > Konstantin Knauf | Solutions Architect > > +49 160 91394525 > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Tony) Cheng > |
Hi Thomas,
maybe there is a misunderstanding. There is no plan to deprecate anything in the REST API in the process of introducing the JobClient API, and it shouldn't. Since "cancel with savepoint" was already deprecated in the REST API and CLI, I am just raising the question whether to add it to the JobClient API in the first place. Best, Konstantin On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote: > I did not realize there was a plan to deprecate anything in the REST API? > > The REST API is super important for tooling written in non JVM languages, > that does not include a Flink client (like FlinkK8sOperator). The REST API > should continue to support all job management operations, including job > submission. > > Thomas > > > On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <[hidden email] > > > wrote: > > > Hi Zili, > > > > thanks for working on this topic. Just read through the FLIP and I have > two > > questions: > > > > * should we add "cancelWithSavepeoint" to a new public API, when we have > > deprecated the corresponding REST API/CLI methods? In my understanding > > there is no reason to use it anymore. > > * should we call "stopWithSavepoint" simply "stop" as "stop" always > > performs a savepoint? > > > > Best, > > > > Konstantin > > > > > > > > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <[hidden email]> > > wrote: > > > > > Hi Flavio, > > > > > > I agree that this would be good to have. But I also think that this is > > > outside the scope of FLIP-74, I think it is an orthogonal feature. > > > > > > Best, > > > Aljoscha > > > > > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier <[hidden email]> > > > wrote: > > > > > > > > Hi all, > > > > just a remark about the Flink REST APIs (and its client as well): > > almost > > > > all the times we need a way to dynamically know which jobs are > > contained > > > in > > > > a jar file, and this could be exposed by the REST endpoint under > > > > /jars/:jarid/entry-points (a simple way to implement this would be to > > > check > > > > the value of Main-class or Main-classes inside the Manifest of the > jar > > if > > > > they exists [1]). > > > > > > > > I understand that this is something that is not strictly required to > > > > execute Flink jobs but IMHO it would ease A LOT the work of UI > > developers > > > > that could have a way to show the users all available jobs inside a > > jar + > > > > their configurable parameters. > > > > For example, right now in the WebUI, you can upload a jar and then > you > > > have > > > > to set (without any autocomplete or UI support) the main class and > > their > > > > params (for example using a string like --param1 xx --param2 yy). > > > > Adding this functionality to the REST API and the respective client > > would > > > > enable the WebUI (and all UIs interacting with a Flink cluster) to > > > prefill > > > > a dropdown list containing the list of entry-point classes (i.e. > Flink > > > > jobs) and, once selected, their required (typed) parameters. > > > > > > > > Best, > > > > Flavio > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864 > > > > > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> > > wrote: > > > > > > > >> modify > > > >> > > > >> /we just shutdown the cluster on the exit of client that running > > inside > > > >> cluster/ > > > >> > > > >> to > > > >> > > > >> we just shutdown the cluster on both the exit of client that running > > > inside > > > >> cluster and the finish of job. > > > >> Since client is running inside cluster we can easily wait for the > end > > of > > > >> two both in ClusterEntrypoint. > > > >> > > > >> > > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: > > > >> > > > >>> About JobCluster > > > >>> > > > >>> Actually I am not quite sure what we gains from DETACHED > > configuration > > > on > > > >>> cluster side. > > > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase, > > right? > > > >>> > > > >>> It comes to me one major questions we have to answer first. > > > >>> > > > >>> *What JobCluster conceptually is exactly* > > > >>> > > > >>> Related discussion can be found in JIRA[1] and mailing list[2]. > > Stephan > > > >>> gives a nice > > > >>> description of JobCluster: > > > >>> > > > >>> Two things to add: - The job mode is very nice in the way that it > > runs > > > >> the > > > >>> client inside the cluster (in the same image/process that is the > JM) > > > and > > > >>> thus unifies both applications and what the Spark world calls the > > > "driver > > > >>> mode". - Another thing I would add is that during the FLIP-6 > design, > > we > > > >>> were thinking about setups where Dispatcher and JobManager are > > separate > > > >>> processes. A Yarn or Mesos Dispatcher of a session could run > > > >> independently > > > >>> (even as privileged processes executing no code). Then you the > > > "per-job" > > > >>> mode could still be helpful: when a job is submitted to the > > dispatcher, > > > >> it > > > >>> launches the JM again in a per-job mode, so that JM and TM > processes > > > are > > > >>> bound to teh job only. For higher security setups, it is important > > that > > > >>> processes are not reused across jobs. > > > >>> > > > >>> However, currently in "per-job" mode we generate JobGraph in client > > > side, > > > >>> launching > > > >>> the JobCluster and retrieve the JobGraph for execution. So > actually, > > we > > > >>> don't "run the > > > >>> client inside the cluster". > > > >>> > > > >>> Besides, refer to the discussion with Till[1], it would be helpful > we > > > >>> follow the same process > > > >>> of session mode for that of "per-job" mode in user perspective, > that > > we > > > >>> don't use > > > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy > > Flink > > > >>> cluster in env.execute. > > > >>> > > > >>> Generally 2 points > > > >>> > > > >>> 1. Running Flink job by invoke user main method and execute > > throughout, > > > >>> instead of create > > > >>> JobGraph from main-class. > > > >>> 2. Run the client inside the cluster. > > > >>> > > > >>> If 1 and 2 are implemented. There is obvious no need for DETACHED > > mode > > > in > > > >>> cluster side > > > >>> because we just shutdown the cluster on the exit of client that > > running > > > >>> inside cluster. Whether > > > >>> or not delivered the result is up to user code. > > > >>> > > > >>> [1] > > > >>> > > > >> > > > > > > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 > > > >>> [2] > > > >>> > > > >> > > > > > > https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E > > > >>> > > > >>> > > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > > > >>> > > > >>>> Thanks for your replies Kostas & Aljoscha! > > > >>>> > > > >>>> Below are replies point by point. > > > >>>> > > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED mode > > in > > > >>>> client side. > > > >>>> There are two configurations overload the item DETACHED[1]. > > > >>>> > > > >>>> In client side, it means whether or not client.submitJob is > blocking > > > to > > > >>>> job execution result. > > > >>>> Due to client.submitJob returns CompletableFuture<JobClient> > > > >> NON-DETACHED > > > >>>> is no > > > >>>> power at all. Caller of submitJob makes the decision whether or > not > > > >>>> blocking to get the > > > >>>> JobClient and request for the job execution result. If client > > crashes, > > > >> it > > > >>>> is a user scope > > > >>>> exception that should be handled in user code; if client lost > > > connection > > > >>>> to cluster, we have > > > >>>> a retry times and interval configuration that automatically retry > > and > > > >>>> throws an user scope > > > >>>> exception if exceed. > > > >>>> > > > >>>> Your comment about poll for result or job result sounds like a > > concern > > > >> on > > > >>>> cluster side. > > > >>>> > > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If > > > DETACHED > > > >>>> configured, > > > >>>> JobCluster exits on job finished; if NON-DETACHED configured, > > > JobCluster > > > >>>> exits on job > > > >>>> execution result delivered. FLIP-74 doesn't stick to changes on > this > > > >>>> scope, it is just remained. > > > >>>> > > > >>>> However, it is an interesting part we can revisit this > > implementation > > > a > > > >>>> bit. > > > >>>> > > > >>>> <see the next email for compact reply in this one> > > > >>>> > > > >>>> 2. The retrieval of JobClient is so important that if we don't > have > > a > > > >> way > > > >>>> to retrieve JobClient it is > > > >>>> a dumb public user-facing interface(what a strange state :P). > > > >>>> > > > >>>> About the retrieval of JobClient, as mentioned in the document, > two > > > ways > > > >>>> should be supported. > > > >>>> > > > >>>> (1). Retrieved as return type of job submission. > > > >>>> (2). Retrieve a JobClient of existing job.(with job id) > > > >>>> > > > >>>> I highly respect your thoughts about how Executors should be and > > > >> thoughts > > > >>>> on multi-layered clients. > > > >>>> Although, (2) is not supported by public interfaces as summary of > > > >>>> discussion above, we can discuss > > > >>>> a bit on the place of Executors on multi-layered clients and find > a > > > way > > > >>>> to retrieve JobClient of > > > >>>> existing job with public client API. I will comment in FLIP-73 > > > thread[2] > > > >>>> since it is almost about Executors. > > > >>>> > > > >>>> Best, > > > >>>> tison. > > > >>>> > > > >>>> [1] > > > >>>> > > > >> > > > > > > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 > > > >>>> [2] > > > >>>> > > > >> > > > > > > https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > > > >>>> > > > >>>>> Hi Tison, > > > >>>>> > > > >>>>> Thanks for the FLIP and launching the discussion! > > > >>>>> > > > >>>>> As a first note, big +1 on providing/exposing a JobClient to the > > > users! > > > >>>>> > > > >>>>> Some points that would be nice to be clarified: > > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I agree > > that > > > >>>>> at a high level, given that everything will now be asynchronous, > > > there > > > >>>>> is no need to keep the DETACHED mode but I think we should > specify > > > >>>>> some aspects. For example, without the explicit separation of the > > > >>>>> modes, what happens when the job finishes. Does the client > > > >>>>> periodically poll for the result always or the result is pushed > > when > > > >>>>> in NON-DETACHED mode? What happens if the client disconnects and > > > >>>>> reconnects? > > > >>>>> > > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I > think > > > >>>>> this is related to the other discussion you opened in the ML > about > > > >>>>> multi-layered clients. First of all, I agree that exposing > > different > > > >>>>> "levels" of clients would be a nice addition, and actually there > > have > > > >>>>> been some discussions about doing so in the future. Now for this > > > >>>>> specific discussion: > > > >>>>> i) I do not think that we should expose the > > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this ties > us > > > to > > > >>>>> a specific architecture which may change in the future. > > > >>>>> ii) I do not think it should be the Executor that will > provide > > a > > > >>>>> JobClient for an already running job (only for the Jobs that it > > > >>>>> submits). The job of the executor should just be to execute() a > > > >>>>> pipeline. > > > >>>>> iii) I think a solution that respects the separation of > > concerns > > > >>>>> could be the addition of another component (in the future), > > something > > > >>>>> like a ClientFactory, or ClusterFactory that will have methods > > like: > > > >>>>> ClusterClient createCluster(Configuration), JobClient > > > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although > not > > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This > > > >>>>> component would be responsible to interact with a cluster manager > > > like > > > >>>>> Yarn and do what is now being done by the ClusterDescriptor plus > > some > > > >>>>> more stuff. > > > >>>>> > > > >>>>> Although under the hood all these abstractions (Environments, > > > >>>>> Executors, ...) underneath use the same clients, I believe their > > > >>>>> job/existence is not contradicting but they simply hide some of > the > > > >>>>> complexity from the user, and give us, as developers some freedom > > to > > > >>>>> change in the future some of the parts. For example, the executor > > > will > > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of > > > requiring > > > >>>>> the user to do each step separately. This allows us to, for > > example, > > > >>>>> get rid of the Plan if in the future everything is DataStream. > > > >>>>> Essentially, I think of these as layers of an onion with the > > clients > > > >>>>> being close to the core. The higher you go, the more > functionality > > is > > > >>>>> included and hidden from the public eye. > > > >>>>> > > > >>>>> Point iii) by the way is just a thought and by no means final. I > > also > > > >>>>> like the idea of multi-layered clients so this may spark up the > > > >>>>> discussion. > > > >>>>> > > > >>>>> Cheers, > > > >>>>> Kostas > > > >>>>> > > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < > > > [hidden email]> > > > >>>>> wrote: > > > >>>>>> > > > >>>>>> Hi Tison, > > > >>>>>> > > > >>>>>> Thanks for proposing the document! I had some comments on the > > > >> document. > > > >>>>>> > > > >>>>>> I think the only complex thing that we still need to figure out > is > > > >> how > > > >>>>> to get a JobClient for a job that is already running. As you > > > mentioned > > > >> in > > > >>>>> the document. Currently I’m thinking that its ok to add a method > to > > > >>>>> Executor for retrieving a JobClient for a running job by > providing > > an > > > >> ID. > > > >>>>> Let’s see what Kostas has to say on the topic. > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Aljoscha > > > >>>>>> > > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> > > wrote: > > > >>>>>>> > > > >>>>>>> Hi all, > > > >>>>>>> > > > >>>>>>> Summary from the discussion about introducing Flink JobClient > > > >> API[1] > > > >>>>> we > > > >>>>>>> draft FLIP-74[2] to > > > >>>>>>> gather thoughts and towards a standard public user-facing > > > >> interfaces. > > > >>>>>>> > > > >>>>>>> This discussion thread aims at standardizing job level client > > API. > > > >>>>> But I'd > > > >>>>>>> like to emphasize that > > > >>>>>>> how to retrieve JobClient possibly causes further discussion on > > > >>>>> different > > > >>>>>>> level clients exposed from > > > >>>>>>> Flink so that a following thread will be started later to > > > >> coordinate > > > >>>>>>> FLIP-73 and FLIP-74 on > > > >>>>>>> expose issue. > > > >>>>>>> > > > >>>>>>> Looking forward to your opinions. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> tison. > > > >>>>>>> > > > >>>>>>> [1] > > > >>>>>>> > > > >>>>> > > > >> > > > > > > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > > >>>>>>> [2] > > > >>>>>>> > > > >>>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > > > >>>>>> > > > >>>>> > > > >>>> > > > > > > > > > > -- > > > > Konstantin Knauf | Solutions Architect > > > > +49 160 91394525 > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > > > > -- > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > Conference > > > > Stream Processing | Event Driven | Real Time > > > > -- > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > -- > > Ververica GmbH > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > > (Tony) Cheng > > > -- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData Ververica <https://www.ververica.com/> -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng |
Hi Konstantin,
* should we add "cancelWithSavepeoint" to a new public API, when we have deprecated the corresponding REST API/CLI methods? In my understanding there is no reason to use it anymore. Good point. We can exclude "cancelWithSavepoint" from public API at least for now, since it is deprecated already. Let's see if there is other concerns. * should we call "stopWithSavepoint" simply "stop" as "stop" always performs a savepoint? Well for naming issue I'm fine with that if it is a consensus of our community. I can see there is a "stop" CLI command which means "stop with savepoint". Best, tison. Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道: > Hi Thomas, > > maybe there is a misunderstanding. There is no plan to deprecate anything > in the REST API in the process of introducing the JobClient API, and it > shouldn't. > > Since "cancel with savepoint" was already deprecated in the REST API and > CLI, I am just raising the question whether to add it to the JobClient API > in the first place. > > Best, > > Konstantin > > > > On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote: > > > I did not realize there was a plan to deprecate anything in the REST API? > > > > The REST API is super important for tooling written in non JVM languages, > > that does not include a Flink client (like FlinkK8sOperator). The REST > API > > should continue to support all job management operations, including job > > submission. > > > > Thomas > > > > > > On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf < > [hidden email] > > > > > wrote: > > > > > Hi Zili, > > > > > > thanks for working on this topic. Just read through the FLIP and I have > > two > > > questions: > > > > > > * should we add "cancelWithSavepeoint" to a new public API, when we > have > > > deprecated the corresponding REST API/CLI methods? In my understanding > > > there is no reason to use it anymore. > > > * should we call "stopWithSavepoint" simply "stop" as "stop" always > > > performs a savepoint? > > > > > > Best, > > > > > > Konstantin > > > > > > > > > > > > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <[hidden email] > > > > > wrote: > > > > > > > Hi Flavio, > > > > > > > > I agree that this would be good to have. But I also think that this > is > > > > outside the scope of FLIP-74, I think it is an orthogonal feature. > > > > > > > > Best, > > > > Aljoscha > > > > > > > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier < > [hidden email]> > > > > wrote: > > > > > > > > > > Hi all, > > > > > just a remark about the Flink REST APIs (and its client as well): > > > almost > > > > > all the times we need a way to dynamically know which jobs are > > > contained > > > > in > > > > > a jar file, and this could be exposed by the REST endpoint under > > > > > /jars/:jarid/entry-points (a simple way to implement this would be > to > > > > check > > > > > the value of Main-class or Main-classes inside the Manifest of the > > jar > > > if > > > > > they exists [1]). > > > > > > > > > > I understand that this is something that is not strictly required > to > > > > > execute Flink jobs but IMHO it would ease A LOT the work of UI > > > developers > > > > > that could have a way to show the users all available jobs inside a > > > jar + > > > > > their configurable parameters. > > > > > For example, right now in the WebUI, you can upload a jar and then > > you > > > > have > > > > > to set (without any autocomplete or UI support) the main class and > > > their > > > > > params (for example using a string like --param1 xx --param2 yy). > > > > > Adding this functionality to the REST API and the respective client > > > would > > > > > enable the WebUI (and all UIs interacting with a Flink cluster) to > > > > prefill > > > > > a dropdown list containing the list of entry-point classes (i.e. > > Flink > > > > > jobs) and, once selected, their required (typed) parameters. > > > > > > > > > > Best, > > > > > Flavio > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864 > > > > > > > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> > > > wrote: > > > > > > > > > >> modify > > > > >> > > > > >> /we just shutdown the cluster on the exit of client that running > > > inside > > > > >> cluster/ > > > > >> > > > > >> to > > > > >> > > > > >> we just shutdown the cluster on both the exit of client that > running > > > > inside > > > > >> cluster and the finish of job. > > > > >> Since client is running inside cluster we can easily wait for the > > end > > > of > > > > >> two both in ClusterEntrypoint. > > > > >> > > > > >> > > > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: > > > > >> > > > > >>> About JobCluster > > > > >>> > > > > >>> Actually I am not quite sure what we gains from DETACHED > > > configuration > > > > on > > > > >>> cluster side. > > > > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase, > > > right? > > > > >>> > > > > >>> It comes to me one major questions we have to answer first. > > > > >>> > > > > >>> *What JobCluster conceptually is exactly* > > > > >>> > > > > >>> Related discussion can be found in JIRA[1] and mailing list[2]. > > > Stephan > > > > >>> gives a nice > > > > >>> description of JobCluster: > > > > >>> > > > > >>> Two things to add: - The job mode is very nice in the way that it > > > runs > > > > >> the > > > > >>> client inside the cluster (in the same image/process that is the > > JM) > > > > and > > > > >>> thus unifies both applications and what the Spark world calls the > > > > "driver > > > > >>> mode". - Another thing I would add is that during the FLIP-6 > > design, > > > we > > > > >>> were thinking about setups where Dispatcher and JobManager are > > > separate > > > > >>> processes. A Yarn or Mesos Dispatcher of a session could run > > > > >> independently > > > > >>> (even as privileged processes executing no code). Then you the > > > > "per-job" > > > > >>> mode could still be helpful: when a job is submitted to the > > > dispatcher, > > > > >> it > > > > >>> launches the JM again in a per-job mode, so that JM and TM > > processes > > > > are > > > > >>> bound to teh job only. For higher security setups, it is > important > > > that > > > > >>> processes are not reused across jobs. > > > > >>> > > > > >>> However, currently in "per-job" mode we generate JobGraph in > client > > > > side, > > > > >>> launching > > > > >>> the JobCluster and retrieve the JobGraph for execution. So > > actually, > > > we > > > > >>> don't "run the > > > > >>> client inside the cluster". > > > > >>> > > > > >>> Besides, refer to the discussion with Till[1], it would be > helpful > > we > > > > >>> follow the same process > > > > >>> of session mode for that of "per-job" mode in user perspective, > > that > > > we > > > > >>> don't use > > > > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy > > > Flink > > > > >>> cluster in env.execute. > > > > >>> > > > > >>> Generally 2 points > > > > >>> > > > > >>> 1. Running Flink job by invoke user main method and execute > > > throughout, > > > > >>> instead of create > > > > >>> JobGraph from main-class. > > > > >>> 2. Run the client inside the cluster. > > > > >>> > > > > >>> If 1 and 2 are implemented. There is obvious no need for DETACHED > > > mode > > > > in > > > > >>> cluster side > > > > >>> because we just shutdown the cluster on the exit of client that > > > running > > > > >>> inside cluster. Whether > > > > >>> or not delivered the result is up to user code. > > > > >>> > > > > >>> [1] > > > > >>> > > > > >> > > > > > > > > > > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 > > > > >>> [2] > > > > >>> > > > > >> > > > > > > > > > > https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E > > > > >>> > > > > >>> > > > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > > > > >>> > > > > >>>> Thanks for your replies Kostas & Aljoscha! > > > > >>>> > > > > >>>> Below are replies point by point. > > > > >>>> > > > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED > mode > > > in > > > > >>>> client side. > > > > >>>> There are two configurations overload the item DETACHED[1]. > > > > >>>> > > > > >>>> In client side, it means whether or not client.submitJob is > > blocking > > > > to > > > > >>>> job execution result. > > > > >>>> Due to client.submitJob returns CompletableFuture<JobClient> > > > > >> NON-DETACHED > > > > >>>> is no > > > > >>>> power at all. Caller of submitJob makes the decision whether or > > not > > > > >>>> blocking to get the > > > > >>>> JobClient and request for the job execution result. If client > > > crashes, > > > > >> it > > > > >>>> is a user scope > > > > >>>> exception that should be handled in user code; if client lost > > > > connection > > > > >>>> to cluster, we have > > > > >>>> a retry times and interval configuration that automatically > retry > > > and > > > > >>>> throws an user scope > > > > >>>> exception if exceed. > > > > >>>> > > > > >>>> Your comment about poll for result or job result sounds like a > > > concern > > > > >> on > > > > >>>> cluster side. > > > > >>>> > > > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If > > > > DETACHED > > > > >>>> configured, > > > > >>>> JobCluster exits on job finished; if NON-DETACHED configured, > > > > JobCluster > > > > >>>> exits on job > > > > >>>> execution result delivered. FLIP-74 doesn't stick to changes on > > this > > > > >>>> scope, it is just remained. > > > > >>>> > > > > >>>> However, it is an interesting part we can revisit this > > > implementation > > > > a > > > > >>>> bit. > > > > >>>> > > > > >>>> <see the next email for compact reply in this one> > > > > >>>> > > > > >>>> 2. The retrieval of JobClient is so important that if we don't > > have > > > a > > > > >> way > > > > >>>> to retrieve JobClient it is > > > > >>>> a dumb public user-facing interface(what a strange state :P). > > > > >>>> > > > > >>>> About the retrieval of JobClient, as mentioned in the document, > > two > > > > ways > > > > >>>> should be supported. > > > > >>>> > > > > >>>> (1). Retrieved as return type of job submission. > > > > >>>> (2). Retrieve a JobClient of existing job.(with job id) > > > > >>>> > > > > >>>> I highly respect your thoughts about how Executors should be and > > > > >> thoughts > > > > >>>> on multi-layered clients. > > > > >>>> Although, (2) is not supported by public interfaces as summary > of > > > > >>>> discussion above, we can discuss > > > > >>>> a bit on the place of Executors on multi-layered clients and > find > > a > > > > way > > > > >>>> to retrieve JobClient of > > > > >>>> existing job with public client API. I will comment in FLIP-73 > > > > thread[2] > > > > >>>> since it is almost about Executors. > > > > >>>> > > > > >>>> Best, > > > > >>>> tison. > > > > >>>> > > > > >>>> [1] > > > > >>>> > > > > >> > > > > > > > > > > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 > > > > >>>> [2] > > > > >>>> > > > > >> > > > > > > > > > > https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > > > > >>>> > > > > >>>>> Hi Tison, > > > > >>>>> > > > > >>>>> Thanks for the FLIP and launching the discussion! > > > > >>>>> > > > > >>>>> As a first note, big +1 on providing/exposing a JobClient to > the > > > > users! > > > > >>>>> > > > > >>>>> Some points that would be nice to be clarified: > > > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I > agree > > > that > > > > >>>>> at a high level, given that everything will now be > asynchronous, > > > > there > > > > >>>>> is no need to keep the DETACHED mode but I think we should > > specify > > > > >>>>> some aspects. For example, without the explicit separation of > the > > > > >>>>> modes, what happens when the job finishes. Does the client > > > > >>>>> periodically poll for the result always or the result is pushed > > > when > > > > >>>>> in NON-DETACHED mode? What happens if the client disconnects > and > > > > >>>>> reconnects? > > > > >>>>> > > > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I > > think > > > > >>>>> this is related to the other discussion you opened in the ML > > about > > > > >>>>> multi-layered clients. First of all, I agree that exposing > > > different > > > > >>>>> "levels" of clients would be a nice addition, and actually > there > > > have > > > > >>>>> been some discussions about doing so in the future. Now for > this > > > > >>>>> specific discussion: > > > > >>>>> i) I do not think that we should expose the > > > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this > ties > > us > > > > to > > > > >>>>> a specific architecture which may change in the future. > > > > >>>>> ii) I do not think it should be the Executor that will > > provide > > > a > > > > >>>>> JobClient for an already running job (only for the Jobs that it > > > > >>>>> submits). The job of the executor should just be to execute() a > > > > >>>>> pipeline. > > > > >>>>> iii) I think a solution that respects the separation of > > > concerns > > > > >>>>> could be the addition of another component (in the future), > > > something > > > > >>>>> like a ClientFactory, or ClusterFactory that will have methods > > > like: > > > > >>>>> ClusterClient createCluster(Configuration), JobClient > > > > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although > > not > > > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This > > > > >>>>> component would be responsible to interact with a cluster > manager > > > > like > > > > >>>>> Yarn and do what is now being done by the ClusterDescriptor > plus > > > some > > > > >>>>> more stuff. > > > > >>>>> > > > > >>>>> Although under the hood all these abstractions (Environments, > > > > >>>>> Executors, ...) underneath use the same clients, I believe > their > > > > >>>>> job/existence is not contradicting but they simply hide some of > > the > > > > >>>>> complexity from the user, and give us, as developers some > freedom > > > to > > > > >>>>> change in the future some of the parts. For example, the > executor > > > > will > > > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of > > > > requiring > > > > >>>>> the user to do each step separately. This allows us to, for > > > example, > > > > >>>>> get rid of the Plan if in the future everything is DataStream. > > > > >>>>> Essentially, I think of these as layers of an onion with the > > > clients > > > > >>>>> being close to the core. The higher you go, the more > > functionality > > > is > > > > >>>>> included and hidden from the public eye. > > > > >>>>> > > > > >>>>> Point iii) by the way is just a thought and by no means final. > I > > > also > > > > >>>>> like the idea of multi-layered clients so this may spark up the > > > > >>>>> discussion. > > > > >>>>> > > > > >>>>> Cheers, > > > > >>>>> Kostas > > > > >>>>> > > > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < > > > > [hidden email]> > > > > >>>>> wrote: > > > > >>>>>> > > > > >>>>>> Hi Tison, > > > > >>>>>> > > > > >>>>>> Thanks for proposing the document! I had some comments on the > > > > >> document. > > > > >>>>>> > > > > >>>>>> I think the only complex thing that we still need to figure > out > > is > > > > >> how > > > > >>>>> to get a JobClient for a job that is already running. As you > > > > mentioned > > > > >> in > > > > >>>>> the document. Currently I’m thinking that its ok to add a > method > > to > > > > >>>>> Executor for retrieving a JobClient for a running job by > > providing > > > an > > > > >> ID. > > > > >>>>> Let’s see what Kostas has to say on the topic. > > > > >>>>>> > > > > >>>>>> Best, > > > > >>>>>> Aljoscha > > > > >>>>>> > > > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> > > > wrote: > > > > >>>>>>> > > > > >>>>>>> Hi all, > > > > >>>>>>> > > > > >>>>>>> Summary from the discussion about introducing Flink JobClient > > > > >> API[1] > > > > >>>>> we > > > > >>>>>>> draft FLIP-74[2] to > > > > >>>>>>> gather thoughts and towards a standard public user-facing > > > > >> interfaces. > > > > >>>>>>> > > > > >>>>>>> This discussion thread aims at standardizing job level client > > > API. > > > > >>>>> But I'd > > > > >>>>>>> like to emphasize that > > > > >>>>>>> how to retrieve JobClient possibly causes further discussion > on > > > > >>>>> different > > > > >>>>>>> level clients exposed from > > > > >>>>>>> Flink so that a following thread will be started later to > > > > >> coordinate > > > > >>>>>>> FLIP-73 and FLIP-74 on > > > > >>>>>>> expose issue. > > > > >>>>>>> > > > > >>>>>>> Looking forward to your opinions. > > > > >>>>>>> > > > > >>>>>>> Best, > > > > >>>>>>> tison. > > > > >>>>>>> > > > > >>>>>>> [1] > > > > >>>>>>> > > > > >>>>> > > > > >> > > > > > > > > > > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > > > >>>>>>> [2] > > > > >>>>>>> > > > > >>>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > > > > >>>>>> > > > > >>>>> > > > > >>>> > > > > > > > > > > > > > > -- > > > > > > Konstantin Knauf | Solutions Architect > > > > > > +49 160 91394525 > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > > > > > > > -- > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > > Conference > > > > > > Stream Processing | Event Driven | Real Time > > > > > > -- > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > -- > > > Ververica GmbH > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > > > (Tony) Cheng > > > > > > > > -- > > Konstantin Knauf | Solutions Architect > > +49 160 91394525 > > > Follow us @VervericaData Ververica <https://www.ververica.com/> > > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Tony) Cheng > |
Hi all,
Narrow the scope to FLIP-74 we aimed at introduce a useful and extensible user-facing public interface JobClient. Let me reemphasize two major works under this thread. 1. standard interface As in FLIP-74 we introduce an interface JobClient with its methods, we'd like to make it a standard (non-final since we can always extends on demand) interface. On this branch I'd like to, with respect to Konstantin's suggestion, 1) exclude deprecated cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to stop to keep consistency with our CLI command. If there is no more concern on these topics I will update proposal tomorrow. 2. client interfaces are asynchronous If the asynchronous JobClient interfaces approved, a necessary proposed changed is corresponding update ClusterClient interfaces. Still ClusterClient is an internal concept after this FLIP but it might have some impact so I think it's better to reach a community consensus as prerequisite. Note that with all client methods are asynchronous, no matter whether or not we remove client side detach option it is no power. Let me know your ideas on these topic and keep moving forward :-) Best, tison. Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道: > Hi Konstantin, > > * should we add "cancelWithSavepeoint" to a new public API, when we have > deprecated the corresponding REST API/CLI methods? In my understanding > there is no reason to use it anymore. > > Good point. We can exclude "cancelWithSavepoint" from public API at least > for now, > since it is deprecated already. Let's see if there is other concerns. > > * should we call "stopWithSavepoint" simply "stop" as "stop" always > performs a savepoint? > > Well for naming issue I'm fine with that if it is a consensus of our > community. I can see > there is a "stop" CLI command which means "stop with savepoint". > > Best, > tison. > > > Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道: > >> Hi Thomas, >> >> maybe there is a misunderstanding. There is no plan to deprecate anything >> in the REST API in the process of introducing the JobClient API, and it >> shouldn't. >> >> Since "cancel with savepoint" was already deprecated in the REST API and >> CLI, I am just raising the question whether to add it to the JobClient API >> in the first place. >> >> Best, >> >> Konstantin >> >> >> >> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote: >> >> > I did not realize there was a plan to deprecate anything in the REST >> API? >> > >> > The REST API is super important for tooling written in non JVM >> languages, >> > that does not include a Flink client (like FlinkK8sOperator). The REST >> API >> > should continue to support all job management operations, including job >> > submission. >> > >> > Thomas >> > >> > >> > On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf < >> [hidden email] >> > > >> > wrote: >> > >> > > Hi Zili, >> > > >> > > thanks for working on this topic. Just read through the FLIP and I >> have >> > two >> > > questions: >> > > >> > > * should we add "cancelWithSavepeoint" to a new public API, when we >> have >> > > deprecated the corresponding REST API/CLI methods? In my understanding >> > > there is no reason to use it anymore. >> > > * should we call "stopWithSavepoint" simply "stop" as "stop" always >> > > performs a savepoint? >> > > >> > > Best, >> > > >> > > Konstantin >> > > >> > > >> > > >> > > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek < >> [hidden email]> >> > > wrote: >> > > >> > > > Hi Flavio, >> > > > >> > > > I agree that this would be good to have. But I also think that this >> is >> > > > outside the scope of FLIP-74, I think it is an orthogonal feature. >> > > > >> > > > Best, >> > > > Aljoscha >> > > > >> > > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier < >> [hidden email]> >> > > > wrote: >> > > > > >> > > > > Hi all, >> > > > > just a remark about the Flink REST APIs (and its client as well): >> > > almost >> > > > > all the times we need a way to dynamically know which jobs are >> > > contained >> > > > in >> > > > > a jar file, and this could be exposed by the REST endpoint under >> > > > > /jars/:jarid/entry-points (a simple way to implement this would >> be to >> > > > check >> > > > > the value of Main-class or Main-classes inside the Manifest of the >> > jar >> > > if >> > > > > they exists [1]). >> > > > > >> > > > > I understand that this is something that is not strictly required >> to >> > > > > execute Flink jobs but IMHO it would ease A LOT the work of UI >> > > developers >> > > > > that could have a way to show the users all available jobs inside >> a >> > > jar + >> > > > > their configurable parameters. >> > > > > For example, right now in the WebUI, you can upload a jar and then >> > you >> > > > have >> > > > > to set (without any autocomplete or UI support) the main class and >> > > their >> > > > > params (for example using a string like --param1 xx --param2 yy). >> > > > > Adding this functionality to the REST API and the respective >> client >> > > would >> > > > > enable the WebUI (and all UIs interacting with a Flink cluster) to >> > > > prefill >> > > > > a dropdown list containing the list of entry-point classes (i.e. >> > Flink >> > > > > jobs) and, once selected, their required (typed) parameters. >> > > > > >> > > > > Best, >> > > > > Flavio >> > > > > >> > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864 >> > > > > >> > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> >> > > wrote: >> > > > > >> > > > >> modify >> > > > >> >> > > > >> /we just shutdown the cluster on the exit of client that running >> > > inside >> > > > >> cluster/ >> > > > >> >> > > > >> to >> > > > >> >> > > > >> we just shutdown the cluster on both the exit of client that >> running >> > > > inside >> > > > >> cluster and the finish of job. >> > > > >> Since client is running inside cluster we can easily wait for the >> > end >> > > of >> > > > >> two both in ClusterEntrypoint. >> > > > >> >> > > > >> >> > > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: >> > > > >> >> > > > >>> About JobCluster >> > > > >>> >> > > > >>> Actually I am not quite sure what we gains from DETACHED >> > > configuration >> > > > on >> > > > >>> cluster side. >> > > > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase, >> > > right? >> > > > >>> >> > > > >>> It comes to me one major questions we have to answer first. >> > > > >>> >> > > > >>> *What JobCluster conceptually is exactly* >> > > > >>> >> > > > >>> Related discussion can be found in JIRA[1] and mailing list[2]. >> > > Stephan >> > > > >>> gives a nice >> > > > >>> description of JobCluster: >> > > > >>> >> > > > >>> Two things to add: - The job mode is very nice in the way that >> it >> > > runs >> > > > >> the >> > > > >>> client inside the cluster (in the same image/process that is the >> > JM) >> > > > and >> > > > >>> thus unifies both applications and what the Spark world calls >> the >> > > > "driver >> > > > >>> mode". - Another thing I would add is that during the FLIP-6 >> > design, >> > > we >> > > > >>> were thinking about setups where Dispatcher and JobManager are >> > > separate >> > > > >>> processes. A Yarn or Mesos Dispatcher of a session could run >> > > > >> independently >> > > > >>> (even as privileged processes executing no code). Then you the >> > > > "per-job" >> > > > >>> mode could still be helpful: when a job is submitted to the >> > > dispatcher, >> > > > >> it >> > > > >>> launches the JM again in a per-job mode, so that JM and TM >> > processes >> > > > are >> > > > >>> bound to teh job only. For higher security setups, it is >> important >> > > that >> > > > >>> processes are not reused across jobs. >> > > > >>> >> > > > >>> However, currently in "per-job" mode we generate JobGraph in >> client >> > > > side, >> > > > >>> launching >> > > > >>> the JobCluster and retrieve the JobGraph for execution. So >> > actually, >> > > we >> > > > >>> don't "run the >> > > > >>> client inside the cluster". >> > > > >>> >> > > > >>> Besides, refer to the discussion with Till[1], it would be >> helpful >> > we >> > > > >>> follow the same process >> > > > >>> of session mode for that of "per-job" mode in user perspective, >> > that >> > > we >> > > > >>> don't use >> > > > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy >> > > Flink >> > > > >>> cluster in env.execute. >> > > > >>> >> > > > >>> Generally 2 points >> > > > >>> >> > > > >>> 1. Running Flink job by invoke user main method and execute >> > > throughout, >> > > > >>> instead of create >> > > > >>> JobGraph from main-class. >> > > > >>> 2. Run the client inside the cluster. >> > > > >>> >> > > > >>> If 1 and 2 are implemented. There is obvious no need for >> DETACHED >> > > mode >> > > > in >> > > > >>> cluster side >> > > > >>> because we just shutdown the cluster on the exit of client that >> > > running >> > > > >>> inside cluster. Whether >> > > > >>> or not delivered the result is up to user code. >> > > > >>> >> > > > >>> [1] >> > > > >>> >> > > > >> >> > > > >> > > >> > >> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 >> > > > >>> [2] >> > > > >>> >> > > > >> >> > > > >> > > >> > >> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E >> > > > >>> >> > > > >>> >> > > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: >> > > > >>> >> > > > >>>> Thanks for your replies Kostas & Aljoscha! >> > > > >>>> >> > > > >>>> Below are replies point by point. >> > > > >>>> >> > > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED >> mode >> > > in >> > > > >>>> client side. >> > > > >>>> There are two configurations overload the item DETACHED[1]. >> > > > >>>> >> > > > >>>> In client side, it means whether or not client.submitJob is >> > blocking >> > > > to >> > > > >>>> job execution result. >> > > > >>>> Due to client.submitJob returns CompletableFuture<JobClient> >> > > > >> NON-DETACHED >> > > > >>>> is no >> > > > >>>> power at all. Caller of submitJob makes the decision whether or >> > not >> > > > >>>> blocking to get the >> > > > >>>> JobClient and request for the job execution result. If client >> > > crashes, >> > > > >> it >> > > > >>>> is a user scope >> > > > >>>> exception that should be handled in user code; if client lost >> > > > connection >> > > > >>>> to cluster, we have >> > > > >>>> a retry times and interval configuration that automatically >> retry >> > > and >> > > > >>>> throws an user scope >> > > > >>>> exception if exceed. >> > > > >>>> >> > > > >>>> Your comment about poll for result or job result sounds like a >> > > concern >> > > > >> on >> > > > >>>> cluster side. >> > > > >>>> >> > > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If >> > > > DETACHED >> > > > >>>> configured, >> > > > >>>> JobCluster exits on job finished; if NON-DETACHED configured, >> > > > JobCluster >> > > > >>>> exits on job >> > > > >>>> execution result delivered. FLIP-74 doesn't stick to changes on >> > this >> > > > >>>> scope, it is just remained. >> > > > >>>> >> > > > >>>> However, it is an interesting part we can revisit this >> > > implementation >> > > > a >> > > > >>>> bit. >> > > > >>>> >> > > > >>>> <see the next email for compact reply in this one> >> > > > >>>> >> > > > >>>> 2. The retrieval of JobClient is so important that if we don't >> > have >> > > a >> > > > >> way >> > > > >>>> to retrieve JobClient it is >> > > > >>>> a dumb public user-facing interface(what a strange state :P). >> > > > >>>> >> > > > >>>> About the retrieval of JobClient, as mentioned in the document, >> > two >> > > > ways >> > > > >>>> should be supported. >> > > > >>>> >> > > > >>>> (1). Retrieved as return type of job submission. >> > > > >>>> (2). Retrieve a JobClient of existing job.(with job id) >> > > > >>>> >> > > > >>>> I highly respect your thoughts about how Executors should be >> and >> > > > >> thoughts >> > > > >>>> on multi-layered clients. >> > > > >>>> Although, (2) is not supported by public interfaces as summary >> of >> > > > >>>> discussion above, we can discuss >> > > > >>>> a bit on the place of Executors on multi-layered clients and >> find >> > a >> > > > way >> > > > >>>> to retrieve JobClient of >> > > > >>>> existing job with public client API. I will comment in FLIP-73 >> > > > thread[2] >> > > > >>>> since it is almost about Executors. >> > > > >>>> >> > > > >>>> Best, >> > > > >>>> tison. >> > > > >>>> >> > > > >>>> [1] >> > > > >>>> >> > > > >> >> > > > >> > > >> > >> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 >> > > > >>>> [2] >> > > > >>>> >> > > > >> >> > > > >> > > >> > >> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> >> > > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: >> > > > >>>> >> > > > >>>>> Hi Tison, >> > > > >>>>> >> > > > >>>>> Thanks for the FLIP and launching the discussion! >> > > > >>>>> >> > > > >>>>> As a first note, big +1 on providing/exposing a JobClient to >> the >> > > > users! >> > > > >>>>> >> > > > >>>>> Some points that would be nice to be clarified: >> > > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I >> agree >> > > that >> > > > >>>>> at a high level, given that everything will now be >> asynchronous, >> > > > there >> > > > >>>>> is no need to keep the DETACHED mode but I think we should >> > specify >> > > > >>>>> some aspects. For example, without the explicit separation of >> the >> > > > >>>>> modes, what happens when the job finishes. Does the client >> > > > >>>>> periodically poll for the result always or the result is >> pushed >> > > when >> > > > >>>>> in NON-DETACHED mode? What happens if the client disconnects >> and >> > > > >>>>> reconnects? >> > > > >>>>> >> > > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I >> > think >> > > > >>>>> this is related to the other discussion you opened in the ML >> > about >> > > > >>>>> multi-layered clients. First of all, I agree that exposing >> > > different >> > > > >>>>> "levels" of clients would be a nice addition, and actually >> there >> > > have >> > > > >>>>> been some discussions about doing so in the future. Now for >> this >> > > > >>>>> specific discussion: >> > > > >>>>> i) I do not think that we should expose the >> > > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this >> ties >> > us >> > > > to >> > > > >>>>> a specific architecture which may change in the future. >> > > > >>>>> ii) I do not think it should be the Executor that will >> > provide >> > > a >> > > > >>>>> JobClient for an already running job (only for the Jobs that >> it >> > > > >>>>> submits). The job of the executor should just be to execute() >> a >> > > > >>>>> pipeline. >> > > > >>>>> iii) I think a solution that respects the separation of >> > > concerns >> > > > >>>>> could be the addition of another component (in the future), >> > > something >> > > > >>>>> like a ClientFactory, or ClusterFactory that will have methods >> > > like: >> > > > >>>>> ClusterClient createCluster(Configuration), JobClient >> > > > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although >> > not >> > > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. >> This >> > > > >>>>> component would be responsible to interact with a cluster >> manager >> > > > like >> > > > >>>>> Yarn and do what is now being done by the ClusterDescriptor >> plus >> > > some >> > > > >>>>> more stuff. >> > > > >>>>> >> > > > >>>>> Although under the hood all these abstractions (Environments, >> > > > >>>>> Executors, ...) underneath use the same clients, I believe >> their >> > > > >>>>> job/existence is not contradicting but they simply hide some >> of >> > the >> > > > >>>>> complexity from the user, and give us, as developers some >> freedom >> > > to >> > > > >>>>> change in the future some of the parts. For example, the >> executor >> > > > will >> > > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of >> > > > requiring >> > > > >>>>> the user to do each step separately. This allows us to, for >> > > example, >> > > > >>>>> get rid of the Plan if in the future everything is DataStream. >> > > > >>>>> Essentially, I think of these as layers of an onion with the >> > > clients >> > > > >>>>> being close to the core. The higher you go, the more >> > functionality >> > > is >> > > > >>>>> included and hidden from the public eye. >> > > > >>>>> >> > > > >>>>> Point iii) by the way is just a thought and by no means >> final. I >> > > also >> > > > >>>>> like the idea of multi-layered clients so this may spark up >> the >> > > > >>>>> discussion. >> > > > >>>>> >> > > > >>>>> Cheers, >> > > > >>>>> Kostas >> > > > >>>>> >> > > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < >> > > > [hidden email]> >> > > > >>>>> wrote: >> > > > >>>>>> >> > > > >>>>>> Hi Tison, >> > > > >>>>>> >> > > > >>>>>> Thanks for proposing the document! I had some comments on the >> > > > >> document. >> > > > >>>>>> >> > > > >>>>>> I think the only complex thing that we still need to figure >> out >> > is >> > > > >> how >> > > > >>>>> to get a JobClient for a job that is already running. As you >> > > > mentioned >> > > > >> in >> > > > >>>>> the document. Currently I’m thinking that its ok to add a >> method >> > to >> > > > >>>>> Executor for retrieving a JobClient for a running job by >> > providing >> > > an >> > > > >> ID. >> > > > >>>>> Let’s see what Kostas has to say on the topic. >> > > > >>>>>> >> > > > >>>>>> Best, >> > > > >>>>>> Aljoscha >> > > > >>>>>> >> > > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> >> > > wrote: >> > > > >>>>>>> >> > > > >>>>>>> Hi all, >> > > > >>>>>>> >> > > > >>>>>>> Summary from the discussion about introducing Flink >> JobClient >> > > > >> API[1] >> > > > >>>>> we >> > > > >>>>>>> draft FLIP-74[2] to >> > > > >>>>>>> gather thoughts and towards a standard public user-facing >> > > > >> interfaces. >> > > > >>>>>>> >> > > > >>>>>>> This discussion thread aims at standardizing job level >> client >> > > API. >> > > > >>>>> But I'd >> > > > >>>>>>> like to emphasize that >> > > > >>>>>>> how to retrieve JobClient possibly causes further >> discussion on >> > > > >>>>> different >> > > > >>>>>>> level clients exposed from >> > > > >>>>>>> Flink so that a following thread will be started later to >> > > > >> coordinate >> > > > >>>>>>> FLIP-73 and FLIP-74 on >> > > > >>>>>>> expose issue. >> > > > >>>>>>> >> > > > >>>>>>> Looking forward to your opinions. >> > > > >>>>>>> >> > > > >>>>>>> Best, >> > > > >>>>>>> tison. >> > > > >>>>>>> >> > > > >>>>>>> [1] >> > > > >>>>>>> >> > > > >>>>> >> > > > >> >> > > > >> > > >> > >> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >> > > > >>>>>>> [2] >> > > > >>>>>>> >> > > > >>>>> >> > > > >> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API >> > > > >>>>>> >> > > > >>>>> >> > > > >>>> >> > > > >> > > > >> > > >> > > -- >> > > >> > > Konstantin Knauf | Solutions Architect >> > > >> > > +49 160 91394525 >> > > >> > > >> > > Follow us @VervericaData Ververica <https://www.ververica.com/> >> > > >> > > >> > > -- >> > > >> > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> > > Conference >> > > >> > > Stream Processing | Event Driven | Real Time >> > > >> > > -- >> > > >> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> > > >> > > -- >> > > Ververica GmbH >> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B >> > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >> Ji >> > > (Tony) Cheng >> > > >> > >> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> >> Follow us @VervericaData Ververica <https://www.ververica.com/> >> >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Tony) Cheng >> > |
Hi Kostas,
By mention "integration to be a follow up discussion" in FLIP-73 discussion I think I'm more on the context if FLIP-74 because without including the retrieval of JobClient in FLIP-74 we actually introduce a dummy public interface. 1. return JobClient from Executor#execute actually has a dependency of FLIP-73. 2. retrieve JobClient of an existing job directly lead to the discussion of the retrieval chains which I started as [DISCUSS] Expose multiple level clients. Best, tison. Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:35写道: > Hi all, > > Narrow the scope to FLIP-74 we aimed at introduce a useful and extensible > user-facing public interface JobClient. Let me reemphasize two major works > under this thread. > > 1. standard interface > > As in FLIP-74 we introduce an interface JobClient with its methods, we'd > like to > make it a standard (non-final since we can always extends on demand) > interface. > > On this branch I'd like to, with respect to Konstantin's suggestion, 1) > exclude deprecated > cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to stop > to keep > consistency with our CLI command. If there is no more concern on these > topics I will > update proposal tomorrow. > > 2. client interfaces are asynchronous > > If the asynchronous JobClient interfaces approved, a necessary proposed > changed is > corresponding update ClusterClient interfaces. Still ClusterClient is an > internal concept > after this FLIP but it might have some impact so I think it's better to > reach a community > consensus as prerequisite. Note that with all client methods are > asynchronous, no matter > whether or not we remove client side detach option it is no power. > > Let me know your ideas on these topic and keep moving forward :-) > > Best, > tison. > > > Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道: > >> Hi Konstantin, >> >> * should we add "cancelWithSavepeoint" to a new public API, when we have >> deprecated the corresponding REST API/CLI methods? In my understanding >> there is no reason to use it anymore. >> >> Good point. We can exclude "cancelWithSavepoint" from public API at least >> for now, >> since it is deprecated already. Let's see if there is other concerns. >> >> * should we call "stopWithSavepoint" simply "stop" as "stop" always >> performs a savepoint? >> >> Well for naming issue I'm fine with that if it is a consensus of our >> community. I can see >> there is a "stop" CLI command which means "stop with savepoint". >> >> Best, >> tison. >> >> >> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道: >> >>> Hi Thomas, >>> >>> maybe there is a misunderstanding. There is no plan to deprecate anything >>> in the REST API in the process of introducing the JobClient API, and it >>> shouldn't. >>> >>> Since "cancel with savepoint" was already deprecated in the REST API and >>> CLI, I am just raising the question whether to add it to the JobClient >>> API >>> in the first place. >>> >>> Best, >>> >>> Konstantin >>> >>> >>> >>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote: >>> >>> > I did not realize there was a plan to deprecate anything in the REST >>> API? >>> > >>> > The REST API is super important for tooling written in non JVM >>> languages, >>> > that does not include a Flink client (like FlinkK8sOperator). The REST >>> API >>> > should continue to support all job management operations, including job >>> > submission. >>> > >>> > Thomas >>> > >>> > >>> > On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf < >>> [hidden email] >>> > > >>> > wrote: >>> > >>> > > Hi Zili, >>> > > >>> > > thanks for working on this topic. Just read through the FLIP and I >>> have >>> > two >>> > > questions: >>> > > >>> > > * should we add "cancelWithSavepeoint" to a new public API, when we >>> have >>> > > deprecated the corresponding REST API/CLI methods? In my >>> understanding >>> > > there is no reason to use it anymore. >>> > > * should we call "stopWithSavepoint" simply "stop" as "stop" always >>> > > performs a savepoint? >>> > > >>> > > Best, >>> > > >>> > > Konstantin >>> > > >>> > > >>> > > >>> > > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek < >>> [hidden email]> >>> > > wrote: >>> > > >>> > > > Hi Flavio, >>> > > > >>> > > > I agree that this would be good to have. But I also think that >>> this is >>> > > > outside the scope of FLIP-74, I think it is an orthogonal feature. >>> > > > >>> > > > Best, >>> > > > Aljoscha >>> > > > >>> > > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier < >>> [hidden email]> >>> > > > wrote: >>> > > > > >>> > > > > Hi all, >>> > > > > just a remark about the Flink REST APIs (and its client as well): >>> > > almost >>> > > > > all the times we need a way to dynamically know which jobs are >>> > > contained >>> > > > in >>> > > > > a jar file, and this could be exposed by the REST endpoint under >>> > > > > /jars/:jarid/entry-points (a simple way to implement this would >>> be to >>> > > > check >>> > > > > the value of Main-class or Main-classes inside the Manifest of >>> the >>> > jar >>> > > if >>> > > > > they exists [1]). >>> > > > > >>> > > > > I understand that this is something that is not strictly >>> required to >>> > > > > execute Flink jobs but IMHO it would ease A LOT the work of UI >>> > > developers >>> > > > > that could have a way to show the users all available jobs >>> inside a >>> > > jar + >>> > > > > their configurable parameters. >>> > > > > For example, right now in the WebUI, you can upload a jar and >>> then >>> > you >>> > > > have >>> > > > > to set (without any autocomplete or UI support) the main class >>> and >>> > > their >>> > > > > params (for example using a string like --param1 xx --param2 yy). >>> > > > > Adding this functionality to the REST API and the respective >>> client >>> > > would >>> > > > > enable the WebUI (and all UIs interacting with a Flink cluster) >>> to >>> > > > prefill >>> > > > > a dropdown list containing the list of entry-point classes (i.e. >>> > Flink >>> > > > > jobs) and, once selected, their required (typed) parameters. >>> > > > > >>> > > > > Best, >>> > > > > Flavio >>> > > > > >>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864 >>> > > > > >>> > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> >>> > > wrote: >>> > > > > >>> > > > >> modify >>> > > > >> >>> > > > >> /we just shutdown the cluster on the exit of client that running >>> > > inside >>> > > > >> cluster/ >>> > > > >> >>> > > > >> to >>> > > > >> >>> > > > >> we just shutdown the cluster on both the exit of client that >>> running >>> > > > inside >>> > > > >> cluster and the finish of job. >>> > > > >> Since client is running inside cluster we can easily wait for >>> the >>> > end >>> > > of >>> > > > >> two both in ClusterEntrypoint. >>> > > > >> >>> > > > >> >>> > > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: >>> > > > >> >>> > > > >>> About JobCluster >>> > > > >>> >>> > > > >>> Actually I am not quite sure what we gains from DETACHED >>> > > configuration >>> > > > on >>> > > > >>> cluster side. >>> > > > >>> We don't have a NON-DETACHED JobCluster in fact in our >>> codebase, >>> > > right? >>> > > > >>> >>> > > > >>> It comes to me one major questions we have to answer first. >>> > > > >>> >>> > > > >>> *What JobCluster conceptually is exactly* >>> > > > >>> >>> > > > >>> Related discussion can be found in JIRA[1] and mailing list[2]. >>> > > Stephan >>> > > > >>> gives a nice >>> > > > >>> description of JobCluster: >>> > > > >>> >>> > > > >>> Two things to add: - The job mode is very nice in the way that >>> it >>> > > runs >>> > > > >> the >>> > > > >>> client inside the cluster (in the same image/process that is >>> the >>> > JM) >>> > > > and >>> > > > >>> thus unifies both applications and what the Spark world calls >>> the >>> > > > "driver >>> > > > >>> mode". - Another thing I would add is that during the FLIP-6 >>> > design, >>> > > we >>> > > > >>> were thinking about setups where Dispatcher and JobManager are >>> > > separate >>> > > > >>> processes. A Yarn or Mesos Dispatcher of a session could run >>> > > > >> independently >>> > > > >>> (even as privileged processes executing no code). Then you the >>> > > > "per-job" >>> > > > >>> mode could still be helpful: when a job is submitted to the >>> > > dispatcher, >>> > > > >> it >>> > > > >>> launches the JM again in a per-job mode, so that JM and TM >>> > processes >>> > > > are >>> > > > >>> bound to teh job only. For higher security setups, it is >>> important >>> > > that >>> > > > >>> processes are not reused across jobs. >>> > > > >>> >>> > > > >>> However, currently in "per-job" mode we generate JobGraph in >>> client >>> > > > side, >>> > > > >>> launching >>> > > > >>> the JobCluster and retrieve the JobGraph for execution. So >>> > actually, >>> > > we >>> > > > >>> don't "run the >>> > > > >>> client inside the cluster". >>> > > > >>> >>> > > > >>> Besides, refer to the discussion with Till[1], it would be >>> helpful >>> > we >>> > > > >>> follow the same process >>> > > > >>> of session mode for that of "per-job" mode in user perspective, >>> > that >>> > > we >>> > > > >>> don't use >>> > > > >>> OptimizedPlanEnvironment to create JobGraph, but directly >>> deploy >>> > > Flink >>> > > > >>> cluster in env.execute. >>> > > > >>> >>> > > > >>> Generally 2 points >>> > > > >>> >>> > > > >>> 1. Running Flink job by invoke user main method and execute >>> > > throughout, >>> > > > >>> instead of create >>> > > > >>> JobGraph from main-class. >>> > > > >>> 2. Run the client inside the cluster. >>> > > > >>> >>> > > > >>> If 1 and 2 are implemented. There is obvious no need for >>> DETACHED >>> > > mode >>> > > > in >>> > > > >>> cluster side >>> > > > >>> because we just shutdown the cluster on the exit of client that >>> > > running >>> > > > >>> inside cluster. Whether >>> > > > >>> or not delivered the result is up to user code. >>> > > > >>> >>> > > > >>> [1] >>> > > > >>> >>> > > > >> >>> > > > >>> > > >>> > >>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 >>> > > > >>> [2] >>> > > > >>> >>> > > > >> >>> > > > >>> > > >>> > >>> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E >>> > > > >>> >>> > > > >>> >>> > > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: >>> > > > >>> >>> > > > >>>> Thanks for your replies Kostas & Aljoscha! >>> > > > >>>> >>> > > > >>>> Below are replies point by point. >>> > > > >>>> >>> > > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED >>> mode >>> > > in >>> > > > >>>> client side. >>> > > > >>>> There are two configurations overload the item DETACHED[1]. >>> > > > >>>> >>> > > > >>>> In client side, it means whether or not client.submitJob is >>> > blocking >>> > > > to >>> > > > >>>> job execution result. >>> > > > >>>> Due to client.submitJob returns CompletableFuture<JobClient> >>> > > > >> NON-DETACHED >>> > > > >>>> is no >>> > > > >>>> power at all. Caller of submitJob makes the decision whether >>> or >>> > not >>> > > > >>>> blocking to get the >>> > > > >>>> JobClient and request for the job execution result. If client >>> > > crashes, >>> > > > >> it >>> > > > >>>> is a user scope >>> > > > >>>> exception that should be handled in user code; if client lost >>> > > > connection >>> > > > >>>> to cluster, we have >>> > > > >>>> a retry times and interval configuration that automatically >>> retry >>> > > and >>> > > > >>>> throws an user scope >>> > > > >>>> exception if exceed. >>> > > > >>>> >>> > > > >>>> Your comment about poll for result or job result sounds like a >>> > > concern >>> > > > >> on >>> > > > >>>> cluster side. >>> > > > >>>> >>> > > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If >>> > > > DETACHED >>> > > > >>>> configured, >>> > > > >>>> JobCluster exits on job finished; if NON-DETACHED configured, >>> > > > JobCluster >>> > > > >>>> exits on job >>> > > > >>>> execution result delivered. FLIP-74 doesn't stick to changes >>> on >>> > this >>> > > > >>>> scope, it is just remained. >>> > > > >>>> >>> > > > >>>> However, it is an interesting part we can revisit this >>> > > implementation >>> > > > a >>> > > > >>>> bit. >>> > > > >>>> >>> > > > >>>> <see the next email for compact reply in this one> >>> > > > >>>> >>> > > > >>>> 2. The retrieval of JobClient is so important that if we don't >>> > have >>> > > a >>> > > > >> way >>> > > > >>>> to retrieve JobClient it is >>> > > > >>>> a dumb public user-facing interface(what a strange state :P). >>> > > > >>>> >>> > > > >>>> About the retrieval of JobClient, as mentioned in the >>> document, >>> > two >>> > > > ways >>> > > > >>>> should be supported. >>> > > > >>>> >>> > > > >>>> (1). Retrieved as return type of job submission. >>> > > > >>>> (2). Retrieve a JobClient of existing job.(with job id) >>> > > > >>>> >>> > > > >>>> I highly respect your thoughts about how Executors should be >>> and >>> > > > >> thoughts >>> > > > >>>> on multi-layered clients. >>> > > > >>>> Although, (2) is not supported by public interfaces as >>> summary of >>> > > > >>>> discussion above, we can discuss >>> > > > >>>> a bit on the place of Executors on multi-layered clients and >>> find >>> > a >>> > > > way >>> > > > >>>> to retrieve JobClient of >>> > > > >>>> existing job with public client API. I will comment in FLIP-73 >>> > > > thread[2] >>> > > > >>>> since it is almost about Executors. >>> > > > >>>> >>> > > > >>>> Best, >>> > > > >>>> tison. >>> > > > >>>> >>> > > > >>>> [1] >>> > > > >>>> >>> > > > >> >>> > > > >>> > > >>> > >>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 >>> > > > >>>> [2] >>> > > > >>>> >>> > > > >> >>> > > > >>> > > >>> > >>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E >>> > > > >>>> >>> > > > >>>> >>> > > > >>>> >>> > > > >>>> >>> > > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: >>> > > > >>>> >>> > > > >>>>> Hi Tison, >>> > > > >>>>> >>> > > > >>>>> Thanks for the FLIP and launching the discussion! >>> > > > >>>>> >>> > > > >>>>> As a first note, big +1 on providing/exposing a JobClient to >>> the >>> > > > users! >>> > > > >>>>> >>> > > > >>>>> Some points that would be nice to be clarified: >>> > > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I >>> agree >>> > > that >>> > > > >>>>> at a high level, given that everything will now be >>> asynchronous, >>> > > > there >>> > > > >>>>> is no need to keep the DETACHED mode but I think we should >>> > specify >>> > > > >>>>> some aspects. For example, without the explicit separation >>> of the >>> > > > >>>>> modes, what happens when the job finishes. Does the client >>> > > > >>>>> periodically poll for the result always or the result is >>> pushed >>> > > when >>> > > > >>>>> in NON-DETACHED mode? What happens if the client disconnects >>> and >>> > > > >>>>> reconnects? >>> > > > >>>>> >>> > > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I >>> > think >>> > > > >>>>> this is related to the other discussion you opened in the ML >>> > about >>> > > > >>>>> multi-layered clients. First of all, I agree that exposing >>> > > different >>> > > > >>>>> "levels" of clients would be a nice addition, and actually >>> there >>> > > have >>> > > > >>>>> been some discussions about doing so in the future. Now for >>> this >>> > > > >>>>> specific discussion: >>> > > > >>>>> i) I do not think that we should expose the >>> > > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this >>> ties >>> > us >>> > > > to >>> > > > >>>>> a specific architecture which may change in the future. >>> > > > >>>>> ii) I do not think it should be the Executor that will >>> > provide >>> > > a >>> > > > >>>>> JobClient for an already running job (only for the Jobs that >>> it >>> > > > >>>>> submits). The job of the executor should just be to >>> execute() a >>> > > > >>>>> pipeline. >>> > > > >>>>> iii) I think a solution that respects the separation of >>> > > concerns >>> > > > >>>>> could be the addition of another component (in the future), >>> > > something >>> > > > >>>>> like a ClientFactory, or ClusterFactory that will have >>> methods >>> > > like: >>> > > > >>>>> ClusterClient createCluster(Configuration), JobClient >>> > > > >>>>> retrieveJobClient(Configuration , JobId), maybe even >>> (although >>> > not >>> > > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. >>> This >>> > > > >>>>> component would be responsible to interact with a cluster >>> manager >>> > > > like >>> > > > >>>>> Yarn and do what is now being done by the ClusterDescriptor >>> plus >>> > > some >>> > > > >>>>> more stuff. >>> > > > >>>>> >>> > > > >>>>> Although under the hood all these abstractions (Environments, >>> > > > >>>>> Executors, ...) underneath use the same clients, I believe >>> their >>> > > > >>>>> job/existence is not contradicting but they simply hide some >>> of >>> > the >>> > > > >>>>> complexity from the user, and give us, as developers some >>> freedom >>> > > to >>> > > > >>>>> change in the future some of the parts. For example, the >>> executor >>> > > > will >>> > > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of >>> > > > requiring >>> > > > >>>>> the user to do each step separately. This allows us to, for >>> > > example, >>> > > > >>>>> get rid of the Plan if in the future everything is >>> DataStream. >>> > > > >>>>> Essentially, I think of these as layers of an onion with the >>> > > clients >>> > > > >>>>> being close to the core. The higher you go, the more >>> > functionality >>> > > is >>> > > > >>>>> included and hidden from the public eye. >>> > > > >>>>> >>> > > > >>>>> Point iii) by the way is just a thought and by no means >>> final. I >>> > > also >>> > > > >>>>> like the idea of multi-layered clients so this may spark up >>> the >>> > > > >>>>> discussion. >>> > > > >>>>> >>> > > > >>>>> Cheers, >>> > > > >>>>> Kostas >>> > > > >>>>> >>> > > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < >>> > > > [hidden email]> >>> > > > >>>>> wrote: >>> > > > >>>>>> >>> > > > >>>>>> Hi Tison, >>> > > > >>>>>> >>> > > > >>>>>> Thanks for proposing the document! I had some comments on >>> the >>> > > > >> document. >>> > > > >>>>>> >>> > > > >>>>>> I think the only complex thing that we still need to figure >>> out >>> > is >>> > > > >> how >>> > > > >>>>> to get a JobClient for a job that is already running. As you >>> > > > mentioned >>> > > > >> in >>> > > > >>>>> the document. Currently I’m thinking that its ok to add a >>> method >>> > to >>> > > > >>>>> Executor for retrieving a JobClient for a running job by >>> > providing >>> > > an >>> > > > >> ID. >>> > > > >>>>> Let’s see what Kostas has to say on the topic. >>> > > > >>>>>> >>> > > > >>>>>> Best, >>> > > > >>>>>> Aljoscha >>> > > > >>>>>> >>> > > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email] >>> > >>> > > wrote: >>> > > > >>>>>>> >>> > > > >>>>>>> Hi all, >>> > > > >>>>>>> >>> > > > >>>>>>> Summary from the discussion about introducing Flink >>> JobClient >>> > > > >> API[1] >>> > > > >>>>> we >>> > > > >>>>>>> draft FLIP-74[2] to >>> > > > >>>>>>> gather thoughts and towards a standard public user-facing >>> > > > >> interfaces. >>> > > > >>>>>>> >>> > > > >>>>>>> This discussion thread aims at standardizing job level >>> client >>> > > API. >>> > > > >>>>> But I'd >>> > > > >>>>>>> like to emphasize that >>> > > > >>>>>>> how to retrieve JobClient possibly causes further >>> discussion on >>> > > > >>>>> different >>> > > > >>>>>>> level clients exposed from >>> > > > >>>>>>> Flink so that a following thread will be started later to >>> > > > >> coordinate >>> > > > >>>>>>> FLIP-73 and FLIP-74 on >>> > > > >>>>>>> expose issue. >>> > > > >>>>>>> >>> > > > >>>>>>> Looking forward to your opinions. >>> > > > >>>>>>> >>> > > > >>>>>>> Best, >>> > > > >>>>>>> tison. >>> > > > >>>>>>> >>> > > > >>>>>>> [1] >>> > > > >>>>>>> >>> > > > >>>>> >>> > > > >> >>> > > > >>> > > >>> > >>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >>> > > > >>>>>>> [2] >>> > > > >>>>>>> >>> > > > >>>>> >>> > > > >> >>> > > > >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API >>> > > > >>>>>> >>> > > > >>>>> >>> > > > >>>> >>> > > > >>> > > > >>> > > >>> > > -- >>> > > >>> > > Konstantin Knauf | Solutions Architect >>> > > >>> > > +49 160 91394525 >>> > > >>> > > >>> > > Follow us @VervericaData Ververica <https://www.ververica.com/> >>> > > >>> > > >>> > > -- >>> > > >>> > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> > > Conference >>> > > >>> > > Stream Processing | Event Driven | Real Time >>> > > >>> > > -- >>> > > >>> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> > > >>> > > -- >>> > > Ververica GmbH >>> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>> Ji >>> > > (Tony) Cheng >>> > > >>> > >>> >>> >>> -- >>> >>> Konstantin Knauf | Solutions Architect >>> >>> +49 160 91394525 >>> >>> >>> Follow us @VervericaData Ververica <https://www.ververica.com/> >>> >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Tony) Cheng >>> >> |
s/on the context if/on the context of/
s/dummy/dumb/ |
Hi Tison,
I see. Then I would say that as a first step, and to see if people are happy with the result, integration with the production code can be through a new method executeAsync() in the Executor that we discussed earlier. This method could potentially be exposed to ExecutionEnvironment as a new flavour of the execute that returns a JobClient. In the future we could consider exposing it through a ClusterClientFactory (or sth similar). What do you think Kostas On Thu, Oct 3, 2019 at 10:12 AM Zili Chen <[hidden email]> wrote: > > s/on the context if/on the context of/ > s/dummy/dumb/ |
Hi Kostas,
That is exactly how things go on in my mind and the reason I say "integration to be a follow up discussion" :-) Best, tison. Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午6:23写道: > Hi Tison, > > I see. Then I would say that as a first step, and to see if people are > happy with the result, > integration with the production code can be through a new method > executeAsync() in the Executor > that we discussed earlier. > > This method could potentially be exposed to ExecutionEnvironment as a > new flavour of > the execute that returns a JobClient. > > In the future we could consider exposing it through a > ClusterClientFactory (or sth similar). > > What do you think > Kostas > > On Thu, Oct 3, 2019 at 10:12 AM Zili Chen <[hidden email]> wrote: > > > > s/on the context if/on the context of/ > > s/dummy/dumb/ > |
In reply to this post by tison
This makes sense to me, yes!
> On 2. Oct 2019, at 20:35, Zili Chen <[hidden email]> wrote: > > Hi all, > > Narrow the scope to FLIP-74 we aimed at introduce a useful and extensible > user-facing public interface JobClient. Let me reemphasize two major works > under this thread. > > 1. standard interface > > As in FLIP-74 we introduce an interface JobClient with its methods, we'd > like to > make it a standard (non-final since we can always extends on demand) > interface. > > On this branch I'd like to, with respect to Konstantin's suggestion, 1) > exclude deprecated > cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to stop > to keep > consistency with our CLI command. If there is no more concern on these > topics I will > update proposal tomorrow. > > 2. client interfaces are asynchronous > > If the asynchronous JobClient interfaces approved, a necessary proposed > changed is > corresponding update ClusterClient interfaces. Still ClusterClient is an > internal concept > after this FLIP but it might have some impact so I think it's better to > reach a community > consensus as prerequisite. Note that with all client methods are > asynchronous, no matter > whether or not we remove client side detach option it is no power. > > Let me know your ideas on these topic and keep moving forward :-) > > Best, > tison. > > > Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道: > >> Hi Konstantin, >> >> * should we add "cancelWithSavepeoint" to a new public API, when we have >> deprecated the corresponding REST API/CLI methods? In my understanding >> there is no reason to use it anymore. >> >> Good point. We can exclude "cancelWithSavepoint" from public API at least >> for now, >> since it is deprecated already. Let's see if there is other concerns. >> >> * should we call "stopWithSavepoint" simply "stop" as "stop" always >> performs a savepoint? >> >> Well for naming issue I'm fine with that if it is a consensus of our >> community. I can see >> there is a "stop" CLI command which means "stop with savepoint". >> >> Best, >> tison. >> >> >> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道: >> >>> Hi Thomas, >>> >>> maybe there is a misunderstanding. There is no plan to deprecate anything >>> in the REST API in the process of introducing the JobClient API, and it >>> shouldn't. >>> >>> Since "cancel with savepoint" was already deprecated in the REST API and >>> CLI, I am just raising the question whether to add it to the JobClient API >>> in the first place. >>> >>> Best, >>> >>> Konstantin >>> >>> >>> >>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote: >>> >>>> I did not realize there was a plan to deprecate anything in the REST >>> API? >>>> >>>> The REST API is super important for tooling written in non JVM >>> languages, >>>> that does not include a Flink client (like FlinkK8sOperator). The REST >>> API >>>> should continue to support all job management operations, including job >>>> submission. >>>> >>>> Thomas >>>> >>>> >>>> On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf < >>> [hidden email] >>>>> >>>> wrote: >>>> >>>>> Hi Zili, >>>>> >>>>> thanks for working on this topic. Just read through the FLIP and I >>> have >>>> two >>>>> questions: >>>>> >>>>> * should we add "cancelWithSavepeoint" to a new public API, when we >>> have >>>>> deprecated the corresponding REST API/CLI methods? In my understanding >>>>> there is no reason to use it anymore. >>>>> * should we call "stopWithSavepoint" simply "stop" as "stop" always >>>>> performs a savepoint? >>>>> >>>>> Best, >>>>> >>>>> Konstantin >>>>> >>>>> >>>>> >>>>> On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek < >>> [hidden email]> >>>>> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> I agree that this would be good to have. But I also think that this >>> is >>>>>> outside the scope of FLIP-74, I think it is an orthogonal feature. >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>>> On 27. Sep 2019, at 10:31, Flavio Pompermaier < >>> [hidden email]> >>>>>> wrote: >>>>>>> >>>>>>> Hi all, >>>>>>> just a remark about the Flink REST APIs (and its client as well): >>>>> almost >>>>>>> all the times we need a way to dynamically know which jobs are >>>>> contained >>>>>> in >>>>>>> a jar file, and this could be exposed by the REST endpoint under >>>>>>> /jars/:jarid/entry-points (a simple way to implement this would >>> be to >>>>>> check >>>>>>> the value of Main-class or Main-classes inside the Manifest of the >>>> jar >>>>> if >>>>>>> they exists [1]). >>>>>>> >>>>>>> I understand that this is something that is not strictly required >>> to >>>>>>> execute Flink jobs but IMHO it would ease A LOT the work of UI >>>>> developers >>>>>>> that could have a way to show the users all available jobs inside >>> a >>>>> jar + >>>>>>> their configurable parameters. >>>>>>> For example, right now in the WebUI, you can upload a jar and then >>>> you >>>>>> have >>>>>>> to set (without any autocomplete or UI support) the main class and >>>>> their >>>>>>> params (for example using a string like --param1 xx --param2 yy). >>>>>>> Adding this functionality to the REST API and the respective >>> client >>>>> would >>>>>>> enable the WebUI (and all UIs interacting with a Flink cluster) to >>>>>> prefill >>>>>>> a dropdown list containing the list of entry-point classes (i.e. >>>> Flink >>>>>>> jobs) and, once selected, their required (typed) parameters. >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10864 >>>>>>> >>>>>>> On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> >>>>> wrote: >>>>>>> >>>>>>>> modify >>>>>>>> >>>>>>>> /we just shutdown the cluster on the exit of client that running >>>>> inside >>>>>>>> cluster/ >>>>>>>> >>>>>>>> to >>>>>>>> >>>>>>>> we just shutdown the cluster on both the exit of client that >>> running >>>>>> inside >>>>>>>> cluster and the finish of job. >>>>>>>> Since client is running inside cluster we can easily wait for the >>>> end >>>>> of >>>>>>>> two both in ClusterEntrypoint. >>>>>>>> >>>>>>>> >>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: >>>>>>>> >>>>>>>>> About JobCluster >>>>>>>>> >>>>>>>>> Actually I am not quite sure what we gains from DETACHED >>>>> configuration >>>>>> on >>>>>>>>> cluster side. >>>>>>>>> We don't have a NON-DETACHED JobCluster in fact in our codebase, >>>>> right? >>>>>>>>> >>>>>>>>> It comes to me one major questions we have to answer first. >>>>>>>>> >>>>>>>>> *What JobCluster conceptually is exactly* >>>>>>>>> >>>>>>>>> Related discussion can be found in JIRA[1] and mailing list[2]. >>>>> Stephan >>>>>>>>> gives a nice >>>>>>>>> description of JobCluster: >>>>>>>>> >>>>>>>>> Two things to add: - The job mode is very nice in the way that >>> it >>>>> runs >>>>>>>> the >>>>>>>>> client inside the cluster (in the same image/process that is the >>>> JM) >>>>>> and >>>>>>>>> thus unifies both applications and what the Spark world calls >>> the >>>>>> "driver >>>>>>>>> mode". - Another thing I would add is that during the FLIP-6 >>>> design, >>>>> we >>>>>>>>> were thinking about setups where Dispatcher and JobManager are >>>>> separate >>>>>>>>> processes. A Yarn or Mesos Dispatcher of a session could run >>>>>>>> independently >>>>>>>>> (even as privileged processes executing no code). Then you the >>>>>> "per-job" >>>>>>>>> mode could still be helpful: when a job is submitted to the >>>>> dispatcher, >>>>>>>> it >>>>>>>>> launches the JM again in a per-job mode, so that JM and TM >>>> processes >>>>>> are >>>>>>>>> bound to teh job only. For higher security setups, it is >>> important >>>>> that >>>>>>>>> processes are not reused across jobs. >>>>>>>>> >>>>>>>>> However, currently in "per-job" mode we generate JobGraph in >>> client >>>>>> side, >>>>>>>>> launching >>>>>>>>> the JobCluster and retrieve the JobGraph for execution. So >>>> actually, >>>>> we >>>>>>>>> don't "run the >>>>>>>>> client inside the cluster". >>>>>>>>> >>>>>>>>> Besides, refer to the discussion with Till[1], it would be >>> helpful >>>> we >>>>>>>>> follow the same process >>>>>>>>> of session mode for that of "per-job" mode in user perspective, >>>> that >>>>> we >>>>>>>>> don't use >>>>>>>>> OptimizedPlanEnvironment to create JobGraph, but directly deploy >>>>> Flink >>>>>>>>> cluster in env.execute. >>>>>>>>> >>>>>>>>> Generally 2 points >>>>>>>>> >>>>>>>>> 1. Running Flink job by invoke user main method and execute >>>>> throughout, >>>>>>>>> instead of create >>>>>>>>> JobGraph from main-class. >>>>>>>>> 2. Run the client inside the cluster. >>>>>>>>> >>>>>>>>> If 1 and 2 are implemented. There is obvious no need for >>> DETACHED >>>>> mode >>>>>> in >>>>>>>>> cluster side >>>>>>>>> because we just shutdown the cluster on the exit of client that >>>>> running >>>>>>>>> inside cluster. Whether >>>>>>>>> or not delivered the result is up to user code. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 >>>>>>>>> [2] >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E >>>>>>>>> >>>>>>>>> >>>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: >>>>>>>>> >>>>>>>>>> Thanks for your replies Kostas & Aljoscha! >>>>>>>>>> >>>>>>>>>> Below are replies point by point. >>>>>>>>>> >>>>>>>>>> 1. For DETACHED mode, what I said there is about the DETACHED >>> mode >>>>> in >>>>>>>>>> client side. >>>>>>>>>> There are two configurations overload the item DETACHED[1]. >>>>>>>>>> >>>>>>>>>> In client side, it means whether or not client.submitJob is >>>> blocking >>>>>> to >>>>>>>>>> job execution result. >>>>>>>>>> Due to client.submitJob returns CompletableFuture<JobClient> >>>>>>>> NON-DETACHED >>>>>>>>>> is no >>>>>>>>>> power at all. Caller of submitJob makes the decision whether or >>>> not >>>>>>>>>> blocking to get the >>>>>>>>>> JobClient and request for the job execution result. If client >>>>> crashes, >>>>>>>> it >>>>>>>>>> is a user scope >>>>>>>>>> exception that should be handled in user code; if client lost >>>>>> connection >>>>>>>>>> to cluster, we have >>>>>>>>>> a retry times and interval configuration that automatically >>> retry >>>>> and >>>>>>>>>> throws an user scope >>>>>>>>>> exception if exceed. >>>>>>>>>> >>>>>>>>>> Your comment about poll for result or job result sounds like a >>>>> concern >>>>>>>> on >>>>>>>>>> cluster side. >>>>>>>>>> >>>>>>>>>> In cluster side, DETACHED mode is alive only in JobCluster. If >>>>>> DETACHED >>>>>>>>>> configured, >>>>>>>>>> JobCluster exits on job finished; if NON-DETACHED configured, >>>>>> JobCluster >>>>>>>>>> exits on job >>>>>>>>>> execution result delivered. FLIP-74 doesn't stick to changes on >>>> this >>>>>>>>>> scope, it is just remained. >>>>>>>>>> >>>>>>>>>> However, it is an interesting part we can revisit this >>>>> implementation >>>>>> a >>>>>>>>>> bit. >>>>>>>>>> >>>>>>>>>> <see the next email for compact reply in this one> >>>>>>>>>> >>>>>>>>>> 2. The retrieval of JobClient is so important that if we don't >>>> have >>>>> a >>>>>>>> way >>>>>>>>>> to retrieve JobClient it is >>>>>>>>>> a dumb public user-facing interface(what a strange state :P). >>>>>>>>>> >>>>>>>>>> About the retrieval of JobClient, as mentioned in the document, >>>> two >>>>>> ways >>>>>>>>>> should be supported. >>>>>>>>>> >>>>>>>>>> (1). Retrieved as return type of job submission. >>>>>>>>>> (2). Retrieve a JobClient of existing job.(with job id) >>>>>>>>>> >>>>>>>>>> I highly respect your thoughts about how Executors should be >>> and >>>>>>>> thoughts >>>>>>>>>> on multi-layered clients. >>>>>>>>>> Although, (2) is not supported by public interfaces as summary >>> of >>>>>>>>>> discussion above, we can discuss >>>>>>>>>> a bit on the place of Executors on multi-layered clients and >>> find >>>> a >>>>>> way >>>>>>>>>> to retrieve JobClient of >>>>>>>>>> existing job with public client API. I will comment in FLIP-73 >>>>>> thread[2] >>>>>>>>>> since it is almost about Executors. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> tison. >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 >>>>>>>>>> [2] >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: >>>>>>>>>> >>>>>>>>>>> Hi Tison, >>>>>>>>>>> >>>>>>>>>>> Thanks for the FLIP and launching the discussion! >>>>>>>>>>> >>>>>>>>>>> As a first note, big +1 on providing/exposing a JobClient to >>> the >>>>>> users! >>>>>>>>>>> >>>>>>>>>>> Some points that would be nice to be clarified: >>>>>>>>>>> 1) You mention that we can get rid of the DETACHED mode: I >>> agree >>>>> that >>>>>>>>>>> at a high level, given that everything will now be >>> asynchronous, >>>>>> there >>>>>>>>>>> is no need to keep the DETACHED mode but I think we should >>>> specify >>>>>>>>>>> some aspects. For example, without the explicit separation of >>> the >>>>>>>>>>> modes, what happens when the job finishes. Does the client >>>>>>>>>>> periodically poll for the result always or the result is >>> pushed >>>>> when >>>>>>>>>>> in NON-DETACHED mode? What happens if the client disconnects >>> and >>>>>>>>>>> reconnects? >>>>>>>>>>> >>>>>>>>>>> 2) On the "how to retrieve a JobClient for a running Job", I >>>> think >>>>>>>>>>> this is related to the other discussion you opened in the ML >>>> about >>>>>>>>>>> multi-layered clients. First of all, I agree that exposing >>>>> different >>>>>>>>>>> "levels" of clients would be a nice addition, and actually >>> there >>>>> have >>>>>>>>>>> been some discussions about doing so in the future. Now for >>> this >>>>>>>>>>> specific discussion: >>>>>>>>>>> i) I do not think that we should expose the >>>>>>>>>>> ClusterDescriptor/ClusterSpecification to the user, as this >>> ties >>>> us >>>>>> to >>>>>>>>>>> a specific architecture which may change in the future. >>>>>>>>>>> ii) I do not think it should be the Executor that will >>>> provide >>>>> a >>>>>>>>>>> JobClient for an already running job (only for the Jobs that >>> it >>>>>>>>>>> submits). The job of the executor should just be to execute() >>> a >>>>>>>>>>> pipeline. >>>>>>>>>>> iii) I think a solution that respects the separation of >>>>> concerns >>>>>>>>>>> could be the addition of another component (in the future), >>>>> something >>>>>>>>>>> like a ClientFactory, or ClusterFactory that will have methods >>>>> like: >>>>>>>>>>> ClusterClient createCluster(Configuration), JobClient >>>>>>>>>>> retrieveJobClient(Configuration , JobId), maybe even (although >>>> not >>>>>>>>>>> sure) Executor getExecutor(Configuration ) and maybe more. >>> This >>>>>>>>>>> component would be responsible to interact with a cluster >>> manager >>>>>> like >>>>>>>>>>> Yarn and do what is now being done by the ClusterDescriptor >>> plus >>>>> some >>>>>>>>>>> more stuff. >>>>>>>>>>> >>>>>>>>>>> Although under the hood all these abstractions (Environments, >>>>>>>>>>> Executors, ...) underneath use the same clients, I believe >>> their >>>>>>>>>>> job/existence is not contradicting but they simply hide some >>> of >>>> the >>>>>>>>>>> complexity from the user, and give us, as developers some >>> freedom >>>>> to >>>>>>>>>>> change in the future some of the parts. For example, the >>> executor >>>>>> will >>>>>>>>>>> take a Pipeline, create a JobGraph and submit it, instead of >>>>>> requiring >>>>>>>>>>> the user to do each step separately. This allows us to, for >>>>> example, >>>>>>>>>>> get rid of the Plan if in the future everything is DataStream. >>>>>>>>>>> Essentially, I think of these as layers of an onion with the >>>>> clients >>>>>>>>>>> being close to the core. The higher you go, the more >>>> functionality >>>>> is >>>>>>>>>>> included and hidden from the public eye. >>>>>>>>>>> >>>>>>>>>>> Point iii) by the way is just a thought and by no means >>> final. I >>>>> also >>>>>>>>>>> like the idea of multi-layered clients so this may spark up >>> the >>>>>>>>>>> discussion. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Kostas >>>>>>>>>>> >>>>>>>>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < >>>>>> [hidden email]> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Tison, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for proposing the document! I had some comments on the >>>>>>>> document. >>>>>>>>>>>> >>>>>>>>>>>> I think the only complex thing that we still need to figure >>> out >>>> is >>>>>>>> how >>>>>>>>>>> to get a JobClient for a job that is already running. As you >>>>>> mentioned >>>>>>>> in >>>>>>>>>>> the document. Currently I’m thinking that its ok to add a >>> method >>>> to >>>>>>>>>>> Executor for retrieving a JobClient for a running job by >>>> providing >>>>> an >>>>>>>> ID. >>>>>>>>>>> Let’s see what Kostas has to say on the topic. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Aljoscha >>>>>>>>>>>> >>>>>>>>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> >>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi all, >>>>>>>>>>>>> >>>>>>>>>>>>> Summary from the discussion about introducing Flink >>> JobClient >>>>>>>> API[1] >>>>>>>>>>> we >>>>>>>>>>>>> draft FLIP-74[2] to >>>>>>>>>>>>> gather thoughts and towards a standard public user-facing >>>>>>>> interfaces. >>>>>>>>>>>>> >>>>>>>>>>>>> This discussion thread aims at standardizing job level >>> client >>>>> API. >>>>>>>>>>> But I'd >>>>>>>>>>>>> like to emphasize that >>>>>>>>>>>>> how to retrieve JobClient possibly causes further >>> discussion on >>>>>>>>>>> different >>>>>>>>>>>>> level clients exposed from >>>>>>>>>>>>> Flink so that a following thread will be started later to >>>>>>>> coordinate >>>>>>>>>>>>> FLIP-73 and FLIP-74 on >>>>>>>>>>>>> expose issue. >>>>>>>>>>>>> >>>>>>>>>>>>> Looking forward to your opinions. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> tison. >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >>>>>>>>>>>>> [2] >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Konstantin Knauf | Solutions Architect >>>>> >>>>> +49 160 91394525 >>>>> >>>>> >>>>> Follow us @VervericaData Ververica <https://www.ververica.com/> >>>>> >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>> Ji >>>>> (Tony) Cheng >>>>> >>>> >>> >>> >>> -- >>> >>> Konstantin Knauf | Solutions Architect >>> >>> +49 160 91394525 >>> >>> >>> Follow us @VervericaData Ververica <https://www.ververica.com/> >>> >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Tony) Cheng >>> >> |
I also agree @Zili Chen !
On Fri, Oct 4, 2019 at 10:17 AM Aljoscha Krettek <[hidden email]> wrote: > > This makes sense to me, yes! > > > On 2. Oct 2019, at 20:35, Zili Chen <[hidden email]> wrote: > > > > Hi all, > > > > Narrow the scope to FLIP-74 we aimed at introduce a useful and extensible > > user-facing public interface JobClient. Let me reemphasize two major works > > under this thread. > > > > 1. standard interface > > > > As in FLIP-74 we introduce an interface JobClient with its methods, we'd > > like to > > make it a standard (non-final since we can always extends on demand) > > interface. > > > > On this branch I'd like to, with respect to Konstantin's suggestion, 1) > > exclude deprecated > > cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to stop > > to keep > > consistency with our CLI command. If there is no more concern on these > > topics I will > > update proposal tomorrow. > > > > 2. client interfaces are asynchronous > > > > If the asynchronous JobClient interfaces approved, a necessary proposed > > changed is > > corresponding update ClusterClient interfaces. Still ClusterClient is an > > internal concept > > after this FLIP but it might have some impact so I think it's better to > > reach a community > > consensus as prerequisite. Note that with all client methods are > > asynchronous, no matter > > whether or not we remove client side detach option it is no power. > > > > Let me know your ideas on these topic and keep moving forward :-) > > > > Best, > > tison. > > > > > > Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道: > > > >> Hi Konstantin, > >> > >> * should we add "cancelWithSavepeoint" to a new public API, when we have > >> deprecated the corresponding REST API/CLI methods? In my understanding > >> there is no reason to use it anymore. > >> > >> Good point. We can exclude "cancelWithSavepoint" from public API at least > >> for now, > >> since it is deprecated already. Let's see if there is other concerns. > >> > >> * should we call "stopWithSavepoint" simply "stop" as "stop" always > >> performs a savepoint? > >> > >> Well for naming issue I'm fine with that if it is a consensus of our > >> community. I can see > >> there is a "stop" CLI command which means "stop with savepoint". > >> > >> Best, > >> tison. > >> > >> > >> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道: > >> > >>> Hi Thomas, > >>> > >>> maybe there is a misunderstanding. There is no plan to deprecate anything > >>> in the REST API in the process of introducing the JobClient API, and it > >>> shouldn't. > >>> > >>> Since "cancel with savepoint" was already deprecated in the REST API and > >>> CLI, I am just raising the question whether to add it to the JobClient API > >>> in the first place. > >>> > >>> Best, > >>> > >>> Konstantin > >>> > >>> > >>> > >>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote: > >>> > >>>> I did not realize there was a plan to deprecate anything in the REST > >>> API? > >>>> > >>>> The REST API is super important for tooling written in non JVM > >>> languages, > >>>> that does not include a Flink client (like FlinkK8sOperator). The REST > >>> API > >>>> should continue to support all job management operations, including job > >>>> submission. > >>>> > >>>> Thomas > >>>> > >>>> > >>>> On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf < > >>> [hidden email] > >>>>> > >>>> wrote: > >>>> > >>>>> Hi Zili, > >>>>> > >>>>> thanks for working on this topic. Just read through the FLIP and I > >>> have > >>>> two > >>>>> questions: > >>>>> > >>>>> * should we add "cancelWithSavepeoint" to a new public API, when we > >>> have > >>>>> deprecated the corresponding REST API/CLI methods? In my understanding > >>>>> there is no reason to use it anymore. > >>>>> * should we call "stopWithSavepoint" simply "stop" as "stop" always > >>>>> performs a savepoint? > >>>>> > >>>>> Best, > >>>>> > >>>>> Konstantin > >>>>> > >>>>> > >>>>> > >>>>> On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek < > >>> [hidden email]> > >>>>> wrote: > >>>>> > >>>>>> Hi Flavio, > >>>>>> > >>>>>> I agree that this would be good to have. But I also think that this > >>> is > >>>>>> outside the scope of FLIP-74, I think it is an orthogonal feature. > >>>>>> > >>>>>> Best, > >>>>>> Aljoscha > >>>>>> > >>>>>>> On 27. Sep 2019, at 10:31, Flavio Pompermaier < > >>> [hidden email]> > >>>>>> wrote: > >>>>>>> > >>>>>>> Hi all, > >>>>>>> just a remark about the Flink REST APIs (and its client as well): > >>>>> almost > >>>>>>> all the times we need a way to dynamically know which jobs are > >>>>> contained > >>>>>> in > >>>>>>> a jar file, and this could be exposed by the REST endpoint under > >>>>>>> /jars/:jarid/entry-points (a simple way to implement this would > >>> be to > >>>>>> check > >>>>>>> the value of Main-class or Main-classes inside the Manifest of the > >>>> jar > >>>>> if > >>>>>>> they exists [1]). > >>>>>>> > >>>>>>> I understand that this is something that is not strictly required > >>> to > >>>>>>> execute Flink jobs but IMHO it would ease A LOT the work of UI > >>>>> developers > >>>>>>> that could have a way to show the users all available jobs inside > >>> a > >>>>> jar + > >>>>>>> their configurable parameters. > >>>>>>> For example, right now in the WebUI, you can upload a jar and then > >>>> you > >>>>>> have > >>>>>>> to set (without any autocomplete or UI support) the main class and > >>>>> their > >>>>>>> params (for example using a string like --param1 xx --param2 yy). > >>>>>>> Adding this functionality to the REST API and the respective > >>> client > >>>>> would > >>>>>>> enable the WebUI (and all UIs interacting with a Flink cluster) to > >>>>>> prefill > >>>>>>> a dropdown list containing the list of entry-point classes (i.e. > >>>> Flink > >>>>>>> jobs) and, once selected, their required (typed) parameters. > >>>>>>> > >>>>>>> Best, > >>>>>>> Flavio > >>>>>>> > >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10864 > >>>>>>> > >>>>>>> On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> > >>>>> wrote: > >>>>>>> > >>>>>>>> modify > >>>>>>>> > >>>>>>>> /we just shutdown the cluster on the exit of client that running > >>>>> inside > >>>>>>>> cluster/ > >>>>>>>> > >>>>>>>> to > >>>>>>>> > >>>>>>>> we just shutdown the cluster on both the exit of client that > >>> running > >>>>>> inside > >>>>>>>> cluster and the finish of job. > >>>>>>>> Since client is running inside cluster we can easily wait for the > >>>> end > >>>>> of > >>>>>>>> two both in ClusterEntrypoint. > >>>>>>>> > >>>>>>>> > >>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: > >>>>>>>> > >>>>>>>>> About JobCluster > >>>>>>>>> > >>>>>>>>> Actually I am not quite sure what we gains from DETACHED > >>>>> configuration > >>>>>> on > >>>>>>>>> cluster side. > >>>>>>>>> We don't have a NON-DETACHED JobCluster in fact in our codebase, > >>>>> right? > >>>>>>>>> > >>>>>>>>> It comes to me one major questions we have to answer first. > >>>>>>>>> > >>>>>>>>> *What JobCluster conceptually is exactly* > >>>>>>>>> > >>>>>>>>> Related discussion can be found in JIRA[1] and mailing list[2]. > >>>>> Stephan > >>>>>>>>> gives a nice > >>>>>>>>> description of JobCluster: > >>>>>>>>> > >>>>>>>>> Two things to add: - The job mode is very nice in the way that > >>> it > >>>>> runs > >>>>>>>> the > >>>>>>>>> client inside the cluster (in the same image/process that is the > >>>> JM) > >>>>>> and > >>>>>>>>> thus unifies both applications and what the Spark world calls > >>> the > >>>>>> "driver > >>>>>>>>> mode". - Another thing I would add is that during the FLIP-6 > >>>> design, > >>>>> we > >>>>>>>>> were thinking about setups where Dispatcher and JobManager are > >>>>> separate > >>>>>>>>> processes. A Yarn or Mesos Dispatcher of a session could run > >>>>>>>> independently > >>>>>>>>> (even as privileged processes executing no code). Then you the > >>>>>> "per-job" > >>>>>>>>> mode could still be helpful: when a job is submitted to the > >>>>> dispatcher, > >>>>>>>> it > >>>>>>>>> launches the JM again in a per-job mode, so that JM and TM > >>>> processes > >>>>>> are > >>>>>>>>> bound to teh job only. For higher security setups, it is > >>> important > >>>>> that > >>>>>>>>> processes are not reused across jobs. > >>>>>>>>> > >>>>>>>>> However, currently in "per-job" mode we generate JobGraph in > >>> client > >>>>>> side, > >>>>>>>>> launching > >>>>>>>>> the JobCluster and retrieve the JobGraph for execution. So > >>>> actually, > >>>>> we > >>>>>>>>> don't "run the > >>>>>>>>> client inside the cluster". > >>>>>>>>> > >>>>>>>>> Besides, refer to the discussion with Till[1], it would be > >>> helpful > >>>> we > >>>>>>>>> follow the same process > >>>>>>>>> of session mode for that of "per-job" mode in user perspective, > >>>> that > >>>>> we > >>>>>>>>> don't use > >>>>>>>>> OptimizedPlanEnvironment to create JobGraph, but directly deploy > >>>>> Flink > >>>>>>>>> cluster in env.execute. > >>>>>>>>> > >>>>>>>>> Generally 2 points > >>>>>>>>> > >>>>>>>>> 1. Running Flink job by invoke user main method and execute > >>>>> throughout, > >>>>>>>>> instead of create > >>>>>>>>> JobGraph from main-class. > >>>>>>>>> 2. Run the client inside the cluster. > >>>>>>>>> > >>>>>>>>> If 1 and 2 are implemented. There is obvious no need for > >>> DETACHED > >>>>> mode > >>>>>> in > >>>>>>>>> cluster side > >>>>>>>>> because we just shutdown the cluster on the exit of client that > >>>>> running > >>>>>>>>> inside cluster. Whether > >>>>>>>>> or not delivered the result is up to user code. > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 > >>>>>>>>> [2] > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > >>>>>>>>> > >>>>>>>>>> Thanks for your replies Kostas & Aljoscha! > >>>>>>>>>> > >>>>>>>>>> Below are replies point by point. > >>>>>>>>>> > >>>>>>>>>> 1. For DETACHED mode, what I said there is about the DETACHED > >>> mode > >>>>> in > >>>>>>>>>> client side. > >>>>>>>>>> There are two configurations overload the item DETACHED[1]. > >>>>>>>>>> > >>>>>>>>>> In client side, it means whether or not client.submitJob is > >>>> blocking > >>>>>> to > >>>>>>>>>> job execution result. > >>>>>>>>>> Due to client.submitJob returns CompletableFuture<JobClient> > >>>>>>>> NON-DETACHED > >>>>>>>>>> is no > >>>>>>>>>> power at all. Caller of submitJob makes the decision whether or > >>>> not > >>>>>>>>>> blocking to get the > >>>>>>>>>> JobClient and request for the job execution result. If client > >>>>> crashes, > >>>>>>>> it > >>>>>>>>>> is a user scope > >>>>>>>>>> exception that should be handled in user code; if client lost > >>>>>> connection > >>>>>>>>>> to cluster, we have > >>>>>>>>>> a retry times and interval configuration that automatically > >>> retry > >>>>> and > >>>>>>>>>> throws an user scope > >>>>>>>>>> exception if exceed. > >>>>>>>>>> > >>>>>>>>>> Your comment about poll for result or job result sounds like a > >>>>> concern > >>>>>>>> on > >>>>>>>>>> cluster side. > >>>>>>>>>> > >>>>>>>>>> In cluster side, DETACHED mode is alive only in JobCluster. If > >>>>>> DETACHED > >>>>>>>>>> configured, > >>>>>>>>>> JobCluster exits on job finished; if NON-DETACHED configured, > >>>>>> JobCluster > >>>>>>>>>> exits on job > >>>>>>>>>> execution result delivered. FLIP-74 doesn't stick to changes on > >>>> this > >>>>>>>>>> scope, it is just remained. > >>>>>>>>>> > >>>>>>>>>> However, it is an interesting part we can revisit this > >>>>> implementation > >>>>>> a > >>>>>>>>>> bit. > >>>>>>>>>> > >>>>>>>>>> <see the next email for compact reply in this one> > >>>>>>>>>> > >>>>>>>>>> 2. The retrieval of JobClient is so important that if we don't > >>>> have > >>>>> a > >>>>>>>> way > >>>>>>>>>> to retrieve JobClient it is > >>>>>>>>>> a dumb public user-facing interface(what a strange state :P). > >>>>>>>>>> > >>>>>>>>>> About the retrieval of JobClient, as mentioned in the document, > >>>> two > >>>>>> ways > >>>>>>>>>> should be supported. > >>>>>>>>>> > >>>>>>>>>> (1). Retrieved as return type of job submission. > >>>>>>>>>> (2). Retrieve a JobClient of existing job.(with job id) > >>>>>>>>>> > >>>>>>>>>> I highly respect your thoughts about how Executors should be > >>> and > >>>>>>>> thoughts > >>>>>>>>>> on multi-layered clients. > >>>>>>>>>> Although, (2) is not supported by public interfaces as summary > >>> of > >>>>>>>>>> discussion above, we can discuss > >>>>>>>>>> a bit on the place of Executors on multi-layered clients and > >>> find > >>>> a > >>>>>> way > >>>>>>>>>> to retrieve JobClient of > >>>>>>>>>> existing job with public client API. I will comment in FLIP-73 > >>>>>> thread[2] > >>>>>>>>>> since it is almost about Executors. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> tison. > >>>>>>>>>> > >>>>>>>>>> [1] > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 > >>>>>>>>>> [2] > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > >>>>>>>>>> > >>>>>>>>>>> Hi Tison, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the FLIP and launching the discussion! > >>>>>>>>>>> > >>>>>>>>>>> As a first note, big +1 on providing/exposing a JobClient to > >>> the > >>>>>> users! > >>>>>>>>>>> > >>>>>>>>>>> Some points that would be nice to be clarified: > >>>>>>>>>>> 1) You mention that we can get rid of the DETACHED mode: I > >>> agree > >>>>> that > >>>>>>>>>>> at a high level, given that everything will now be > >>> asynchronous, > >>>>>> there > >>>>>>>>>>> is no need to keep the DETACHED mode but I think we should > >>>> specify > >>>>>>>>>>> some aspects. For example, without the explicit separation of > >>> the > >>>>>>>>>>> modes, what happens when the job finishes. Does the client > >>>>>>>>>>> periodically poll for the result always or the result is > >>> pushed > >>>>> when > >>>>>>>>>>> in NON-DETACHED mode? What happens if the client disconnects > >>> and > >>>>>>>>>>> reconnects? > >>>>>>>>>>> > >>>>>>>>>>> 2) On the "how to retrieve a JobClient for a running Job", I > >>>> think > >>>>>>>>>>> this is related to the other discussion you opened in the ML > >>>> about > >>>>>>>>>>> multi-layered clients. First of all, I agree that exposing > >>>>> different > >>>>>>>>>>> "levels" of clients would be a nice addition, and actually > >>> there > >>>>> have > >>>>>>>>>>> been some discussions about doing so in the future. Now for > >>> this > >>>>>>>>>>> specific discussion: > >>>>>>>>>>> i) I do not think that we should expose the > >>>>>>>>>>> ClusterDescriptor/ClusterSpecification to the user, as this > >>> ties > >>>> us > >>>>>> to > >>>>>>>>>>> a specific architecture which may change in the future. > >>>>>>>>>>> ii) I do not think it should be the Executor that will > >>>> provide > >>>>> a > >>>>>>>>>>> JobClient for an already running job (only for the Jobs that > >>> it > >>>>>>>>>>> submits). The job of the executor should just be to execute() > >>> a > >>>>>>>>>>> pipeline. > >>>>>>>>>>> iii) I think a solution that respects the separation of > >>>>> concerns > >>>>>>>>>>> could be the addition of another component (in the future), > >>>>> something > >>>>>>>>>>> like a ClientFactory, or ClusterFactory that will have methods > >>>>> like: > >>>>>>>>>>> ClusterClient createCluster(Configuration), JobClient > >>>>>>>>>>> retrieveJobClient(Configuration , JobId), maybe even (although > >>>> not > >>>>>>>>>>> sure) Executor getExecutor(Configuration ) and maybe more. > >>> This > >>>>>>>>>>> component would be responsible to interact with a cluster > >>> manager > >>>>>> like > >>>>>>>>>>> Yarn and do what is now being done by the ClusterDescriptor > >>> plus > >>>>> some > >>>>>>>>>>> more stuff. > >>>>>>>>>>> > >>>>>>>>>>> Although under the hood all these abstractions (Environments, > >>>>>>>>>>> Executors, ...) underneath use the same clients, I believe > >>> their > >>>>>>>>>>> job/existence is not contradicting but they simply hide some > >>> of > >>>> the > >>>>>>>>>>> complexity from the user, and give us, as developers some > >>> freedom > >>>>> to > >>>>>>>>>>> change in the future some of the parts. For example, the > >>> executor > >>>>>> will > >>>>>>>>>>> take a Pipeline, create a JobGraph and submit it, instead of > >>>>>> requiring > >>>>>>>>>>> the user to do each step separately. This allows us to, for > >>>>> example, > >>>>>>>>>>> get rid of the Plan if in the future everything is DataStream. > >>>>>>>>>>> Essentially, I think of these as layers of an onion with the > >>>>> clients > >>>>>>>>>>> being close to the core. The higher you go, the more > >>>> functionality > >>>>> is > >>>>>>>>>>> included and hidden from the public eye. > >>>>>>>>>>> > >>>>>>>>>>> Point iii) by the way is just a thought and by no means > >>> final. I > >>>>> also > >>>>>>>>>>> like the idea of multi-layered clients so this may spark up > >>> the > >>>>>>>>>>> discussion. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Kostas > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < > >>>>>> [hidden email]> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi Tison, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for proposing the document! I had some comments on the > >>>>>>>> document. > >>>>>>>>>>>> > >>>>>>>>>>>> I think the only complex thing that we still need to figure > >>> out > >>>> is > >>>>>>>> how > >>>>>>>>>>> to get a JobClient for a job that is already running. As you > >>>>>> mentioned > >>>>>>>> in > >>>>>>>>>>> the document. Currently I’m thinking that its ok to add a > >>> method > >>>> to > >>>>>>>>>>> Executor for retrieving a JobClient for a running job by > >>>> providing > >>>>> an > >>>>>>>> ID. > >>>>>>>>>>> Let’s see what Kostas has to say on the topic. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Aljoscha > >>>>>>>>>>>> > >>>>>>>>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> > >>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Summary from the discussion about introducing Flink > >>> JobClient > >>>>>>>> API[1] > >>>>>>>>>>> we > >>>>>>>>>>>>> draft FLIP-74[2] to > >>>>>>>>>>>>> gather thoughts and towards a standard public user-facing > >>>>>>>> interfaces. > >>>>>>>>>>>>> > >>>>>>>>>>>>> This discussion thread aims at standardizing job level > >>> client > >>>>> API. > >>>>>>>>>>> But I'd > >>>>>>>>>>>>> like to emphasize that > >>>>>>>>>>>>> how to retrieve JobClient possibly causes further > >>> discussion on > >>>>>>>>>>> different > >>>>>>>>>>>>> level clients exposed from > >>>>>>>>>>>>> Flink so that a following thread will be started later to > >>>>>>>> coordinate > >>>>>>>>>>>>> FLIP-73 and FLIP-74 on > >>>>>>>>>>>>> expose issue. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Looking forward to your opinions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> tison. > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1] > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > >>>>>>>>>>>>> [2] > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> -- > >>>>> > >>>>> Konstantin Knauf | Solutions Architect > >>>>> > >>>>> +49 160 91394525 > >>>>> > >>>>> > >>>>> Follow us @VervericaData Ververica <https://www.ververica.com/> > >>>>> > >>>>> > >>>>> -- > >>>>> > >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink > >>>>> Conference > >>>>> > >>>>> Stream Processing | Event Driven | Real Time > >>>>> > >>>>> -- > >>>>> > >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > >>>>> > >>>>> -- > >>>>> Ververica GmbH > >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B > >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, > >>> Ji > >>>>> (Tony) Cheng > >>>>> > >>>> > >>> > >>> > >>> -- > >>> > >>> Konstantin Knauf | Solutions Architect > >>> > >>> +49 160 91394525 > >>> > >>> > >>> Follow us @VervericaData Ververica <https://www.ververica.com/> > >>> > >>> > >>> -- > >>> > >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink > >>> Conference > >>> > >>> Stream Processing | Event Driven | Real Time > >>> > >>> -- > >>> > >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > >>> > >>> -- > >>> Ververica GmbH > >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B > >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > >>> (Tony) Cheng > >>> > >> > |
Thanks for your replies.
Since no objection to Konstantin's proposal so far, I'd like to update the FLIP correspondingly. They are naming issue and exclusion of deprecated functionality. I'm hereby infer that our community generally agree on the introduction of the JobClient and its interfaces proposed in the FLIP. If there are other concerns, please thrown into this thread. Otherwise I'm going to start a vote thread later. Best, tison. Kostas Kloudas <[hidden email]> 于2019年10月4日周五 下午11:33写道: > I also agree @Zili Chen ! > > On Fri, Oct 4, 2019 at 10:17 AM Aljoscha Krettek <[hidden email]> > wrote: > > > > This makes sense to me, yes! > > > > > On 2. Oct 2019, at 20:35, Zili Chen <[hidden email]> wrote: > > > > > > Hi all, > > > > > > Narrow the scope to FLIP-74 we aimed at introduce a useful and > extensible > > > user-facing public interface JobClient. Let me reemphasize two major > works > > > under this thread. > > > > > > 1. standard interface > > > > > > As in FLIP-74 we introduce an interface JobClient with its methods, > we'd > > > like to > > > make it a standard (non-final since we can always extends on demand) > > > interface. > > > > > > On this branch I'd like to, with respect to Konstantin's suggestion, 1) > > > exclude deprecated > > > cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to > stop > > > to keep > > > consistency with our CLI command. If there is no more concern on these > > > topics I will > > > update proposal tomorrow. > > > > > > 2. client interfaces are asynchronous > > > > > > If the asynchronous JobClient interfaces approved, a necessary proposed > > > changed is > > > corresponding update ClusterClient interfaces. Still ClusterClient is > an > > > internal concept > > > after this FLIP but it might have some impact so I think it's better to > > > reach a community > > > consensus as prerequisite. Note that with all client methods are > > > asynchronous, no matter > > > whether or not we remove client side detach option it is no power. > > > > > > Let me know your ideas on these topic and keep moving forward :-) > > > > > > Best, > > > tison. > > > > > > > > > Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道: > > > > > >> Hi Konstantin, > > >> > > >> * should we add "cancelWithSavepeoint" to a new public API, when we > have > > >> deprecated the corresponding REST API/CLI methods? In my understanding > > >> there is no reason to use it anymore. > > >> > > >> Good point. We can exclude "cancelWithSavepoint" from public API at > least > > >> for now, > > >> since it is deprecated already. Let's see if there is other concerns. > > >> > > >> * should we call "stopWithSavepoint" simply "stop" as "stop" always > > >> performs a savepoint? > > >> > > >> Well for naming issue I'm fine with that if it is a consensus of our > > >> community. I can see > > >> there is a "stop" CLI command which means "stop with savepoint". > > >> > > >> Best, > > >> tison. > > >> > > >> > > >> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道: > > >> > > >>> Hi Thomas, > > >>> > > >>> maybe there is a misunderstanding. There is no plan to deprecate > anything > > >>> in the REST API in the process of introducing the JobClient API, and > it > > >>> shouldn't. > > >>> > > >>> Since "cancel with savepoint" was already deprecated in the REST API > and > > >>> CLI, I am just raising the question whether to add it to the > JobClient API > > >>> in the first place. > > >>> > > >>> Best, > > >>> > > >>> Konstantin > > >>> > > >>> > > >>> > > >>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote: > > >>> > > >>>> I did not realize there was a plan to deprecate anything in the REST > > >>> API? > > >>>> > > >>>> The REST API is super important for tooling written in non JVM > > >>> languages, > > >>>> that does not include a Flink client (like FlinkK8sOperator). The > REST > > >>> API > > >>>> should continue to support all job management operations, including > job > > >>>> submission. > > >>>> > > >>>> Thomas > > >>>> > > >>>> > > >>>> On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf < > > >>> [hidden email] > > >>>>> > > >>>> wrote: > > >>>> > > >>>>> Hi Zili, > > >>>>> > > >>>>> thanks for working on this topic. Just read through the FLIP and I > > >>> have > > >>>> two > > >>>>> questions: > > >>>>> > > >>>>> * should we add "cancelWithSavepeoint" to a new public API, when we > > >>> have > > >>>>> deprecated the corresponding REST API/CLI methods? In my > understanding > > >>>>> there is no reason to use it anymore. > > >>>>> * should we call "stopWithSavepoint" simply "stop" as "stop" always > > >>>>> performs a savepoint? > > >>>>> > > >>>>> Best, > > >>>>> > > >>>>> Konstantin > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek < > > >>> [hidden email]> > > >>>>> wrote: > > >>>>> > > >>>>>> Hi Flavio, > > >>>>>> > > >>>>>> I agree that this would be good to have. But I also think that > this > > >>> is > > >>>>>> outside the scope of FLIP-74, I think it is an orthogonal feature. > > >>>>>> > > >>>>>> Best, > > >>>>>> Aljoscha > > >>>>>> > > >>>>>>> On 27. Sep 2019, at 10:31, Flavio Pompermaier < > > >>> [hidden email]> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>> Hi all, > > >>>>>>> just a remark about the Flink REST APIs (and its client as well): > > >>>>> almost > > >>>>>>> all the times we need a way to dynamically know which jobs are > > >>>>> contained > > >>>>>> in > > >>>>>>> a jar file, and this could be exposed by the REST endpoint under > > >>>>>>> /jars/:jarid/entry-points (a simple way to implement this would > > >>> be to > > >>>>>> check > > >>>>>>> the value of Main-class or Main-classes inside the Manifest of > the > > >>>> jar > > >>>>> if > > >>>>>>> they exists [1]). > > >>>>>>> > > >>>>>>> I understand that this is something that is not strictly required > > >>> to > > >>>>>>> execute Flink jobs but IMHO it would ease A LOT the work of UI > > >>>>> developers > > >>>>>>> that could have a way to show the users all available jobs inside > > >>> a > > >>>>> jar + > > >>>>>>> their configurable parameters. > > >>>>>>> For example, right now in the WebUI, you can upload a jar and > then > > >>>> you > > >>>>>> have > > >>>>>>> to set (without any autocomplete or UI support) the main class > and > > >>>>> their > > >>>>>>> params (for example using a string like --param1 xx --param2 yy). > > >>>>>>> Adding this functionality to the REST API and the respective > > >>> client > > >>>>> would > > >>>>>>> enable the WebUI (and all UIs interacting with a Flink cluster) > to > > >>>>>> prefill > > >>>>>>> a dropdown list containing the list of entry-point classes (i.e. > > >>>> Flink > > >>>>>>> jobs) and, once selected, their required (typed) parameters. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Flavio > > >>>>>>> > > >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10864 > > >>>>>>> > > >>>>>>> On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> > > >>>>> wrote: > > >>>>>>> > > >>>>>>>> modify > > >>>>>>>> > > >>>>>>>> /we just shutdown the cluster on the exit of client that running > > >>>>> inside > > >>>>>>>> cluster/ > > >>>>>>>> > > >>>>>>>> to > > >>>>>>>> > > >>>>>>>> we just shutdown the cluster on both the exit of client that > > >>> running > > >>>>>> inside > > >>>>>>>> cluster and the finish of job. > > >>>>>>>> Since client is running inside cluster we can easily wait for > the > > >>>> end > > >>>>> of > > >>>>>>>> two both in ClusterEntrypoint. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道: > > >>>>>>>> > > >>>>>>>>> About JobCluster > > >>>>>>>>> > > >>>>>>>>> Actually I am not quite sure what we gains from DETACHED > > >>>>> configuration > > >>>>>> on > > >>>>>>>>> cluster side. > > >>>>>>>>> We don't have a NON-DETACHED JobCluster in fact in our > codebase, > > >>>>> right? > > >>>>>>>>> > > >>>>>>>>> It comes to me one major questions we have to answer first. > > >>>>>>>>> > > >>>>>>>>> *What JobCluster conceptually is exactly* > > >>>>>>>>> > > >>>>>>>>> Related discussion can be found in JIRA[1] and mailing list[2]. > > >>>>> Stephan > > >>>>>>>>> gives a nice > > >>>>>>>>> description of JobCluster: > > >>>>>>>>> > > >>>>>>>>> Two things to add: - The job mode is very nice in the way that > > >>> it > > >>>>> runs > > >>>>>>>> the > > >>>>>>>>> client inside the cluster (in the same image/process that is > the > > >>>> JM) > > >>>>>> and > > >>>>>>>>> thus unifies both applications and what the Spark world calls > > >>> the > > >>>>>> "driver > > >>>>>>>>> mode". - Another thing I would add is that during the FLIP-6 > > >>>> design, > > >>>>> we > > >>>>>>>>> were thinking about setups where Dispatcher and JobManager are > > >>>>> separate > > >>>>>>>>> processes. A Yarn or Mesos Dispatcher of a session could run > > >>>>>>>> independently > > >>>>>>>>> (even as privileged processes executing no code). Then you the > > >>>>>> "per-job" > > >>>>>>>>> mode could still be helpful: when a job is submitted to the > > >>>>> dispatcher, > > >>>>>>>> it > > >>>>>>>>> launches the JM again in a per-job mode, so that JM and TM > > >>>> processes > > >>>>>> are > > >>>>>>>>> bound to teh job only. For higher security setups, it is > > >>> important > > >>>>> that > > >>>>>>>>> processes are not reused across jobs. > > >>>>>>>>> > > >>>>>>>>> However, currently in "per-job" mode we generate JobGraph in > > >>> client > > >>>>>> side, > > >>>>>>>>> launching > > >>>>>>>>> the JobCluster and retrieve the JobGraph for execution. So > > >>>> actually, > > >>>>> we > > >>>>>>>>> don't "run the > > >>>>>>>>> client inside the cluster". > > >>>>>>>>> > > >>>>>>>>> Besides, refer to the discussion with Till[1], it would be > > >>> helpful > > >>>> we > > >>>>>>>>> follow the same process > > >>>>>>>>> of session mode for that of "per-job" mode in user perspective, > > >>>> that > > >>>>> we > > >>>>>>>>> don't use > > >>>>>>>>> OptimizedPlanEnvironment to create JobGraph, but directly > deploy > > >>>>> Flink > > >>>>>>>>> cluster in env.execute. > > >>>>>>>>> > > >>>>>>>>> Generally 2 points > > >>>>>>>>> > > >>>>>>>>> 1. Running Flink job by invoke user main method and execute > > >>>>> throughout, > > >>>>>>>>> instead of create > > >>>>>>>>> JobGraph from main-class. > > >>>>>>>>> 2. Run the client inside the cluster. > > >>>>>>>>> > > >>>>>>>>> If 1 and 2 are implemented. There is obvious no need for > > >>> DETACHED > > >>>>> mode > > >>>>>> in > > >>>>>>>>> cluster side > > >>>>>>>>> because we just shutdown the cluster on the exit of client that > > >>>>> running > > >>>>>>>>> inside cluster. Whether > > >>>>>>>>> or not delivered the result is up to user code. > > >>>>>>>>> > > >>>>>>>>> [1] > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388 > > >>>>>>>>> [2] > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道: > > >>>>>>>>> > > >>>>>>>>>> Thanks for your replies Kostas & Aljoscha! > > >>>>>>>>>> > > >>>>>>>>>> Below are replies point by point. > > >>>>>>>>>> > > >>>>>>>>>> 1. For DETACHED mode, what I said there is about the DETACHED > > >>> mode > > >>>>> in > > >>>>>>>>>> client side. > > >>>>>>>>>> There are two configurations overload the item DETACHED[1]. > > >>>>>>>>>> > > >>>>>>>>>> In client side, it means whether or not client.submitJob is > > >>>> blocking > > >>>>>> to > > >>>>>>>>>> job execution result. > > >>>>>>>>>> Due to client.submitJob returns CompletableFuture<JobClient> > > >>>>>>>> NON-DETACHED > > >>>>>>>>>> is no > > >>>>>>>>>> power at all. Caller of submitJob makes the decision whether > or > > >>>> not > > >>>>>>>>>> blocking to get the > > >>>>>>>>>> JobClient and request for the job execution result. If client > > >>>>> crashes, > > >>>>>>>> it > > >>>>>>>>>> is a user scope > > >>>>>>>>>> exception that should be handled in user code; if client lost > > >>>>>> connection > > >>>>>>>>>> to cluster, we have > > >>>>>>>>>> a retry times and interval configuration that automatically > > >>> retry > > >>>>> and > > >>>>>>>>>> throws an user scope > > >>>>>>>>>> exception if exceed. > > >>>>>>>>>> > > >>>>>>>>>> Your comment about poll for result or job result sounds like a > > >>>>> concern > > >>>>>>>> on > > >>>>>>>>>> cluster side. > > >>>>>>>>>> > > >>>>>>>>>> In cluster side, DETACHED mode is alive only in JobCluster. If > > >>>>>> DETACHED > > >>>>>>>>>> configured, > > >>>>>>>>>> JobCluster exits on job finished; if NON-DETACHED configured, > > >>>>>> JobCluster > > >>>>>>>>>> exits on job > > >>>>>>>>>> execution result delivered. FLIP-74 doesn't stick to changes > on > > >>>> this > > >>>>>>>>>> scope, it is just remained. > > >>>>>>>>>> > > >>>>>>>>>> However, it is an interesting part we can revisit this > > >>>>> implementation > > >>>>>> a > > >>>>>>>>>> bit. > > >>>>>>>>>> > > >>>>>>>>>> <see the next email for compact reply in this one> > > >>>>>>>>>> > > >>>>>>>>>> 2. The retrieval of JobClient is so important that if we don't > > >>>> have > > >>>>> a > > >>>>>>>> way > > >>>>>>>>>> to retrieve JobClient it is > > >>>>>>>>>> a dumb public user-facing interface(what a strange state :P). > > >>>>>>>>>> > > >>>>>>>>>> About the retrieval of JobClient, as mentioned in the > document, > > >>>> two > > >>>>>> ways > > >>>>>>>>>> should be supported. > > >>>>>>>>>> > > >>>>>>>>>> (1). Retrieved as return type of job submission. > > >>>>>>>>>> (2). Retrieve a JobClient of existing job.(with job id) > > >>>>>>>>>> > > >>>>>>>>>> I highly respect your thoughts about how Executors should be > > >>> and > > >>>>>>>> thoughts > > >>>>>>>>>> on multi-layered clients. > > >>>>>>>>>> Although, (2) is not supported by public interfaces as summary > > >>> of > > >>>>>>>>>> discussion above, we can discuss > > >>>>>>>>>> a bit on the place of Executors on multi-layered clients and > > >>> find > > >>>> a > > >>>>>> way > > >>>>>>>>>> to retrieve JobClient of > > >>>>>>>>>> existing job with public client API. I will comment in FLIP-73 > > >>>>>> thread[2] > > >>>>>>>>>> since it is almost about Executors. > > >>>>>>>>>> > > >>>>>>>>>> Best, > > >>>>>>>>>> tison. > > >>>>>>>>>> > > >>>>>>>>>> [1] > > >>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8 > > >>>>>>>>>> [2] > > >>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Tison, > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks for the FLIP and launching the discussion! > > >>>>>>>>>>> > > >>>>>>>>>>> As a first note, big +1 on providing/exposing a JobClient to > > >>> the > > >>>>>> users! > > >>>>>>>>>>> > > >>>>>>>>>>> Some points that would be nice to be clarified: > > >>>>>>>>>>> 1) You mention that we can get rid of the DETACHED mode: I > > >>> agree > > >>>>> that > > >>>>>>>>>>> at a high level, given that everything will now be > > >>> asynchronous, > > >>>>>> there > > >>>>>>>>>>> is no need to keep the DETACHED mode but I think we should > > >>>> specify > > >>>>>>>>>>> some aspects. For example, without the explicit separation of > > >>> the > > >>>>>>>>>>> modes, what happens when the job finishes. Does the client > > >>>>>>>>>>> periodically poll for the result always or the result is > > >>> pushed > > >>>>> when > > >>>>>>>>>>> in NON-DETACHED mode? What happens if the client disconnects > > >>> and > > >>>>>>>>>>> reconnects? > > >>>>>>>>>>> > > >>>>>>>>>>> 2) On the "how to retrieve a JobClient for a running Job", I > > >>>> think > > >>>>>>>>>>> this is related to the other discussion you opened in the ML > > >>>> about > > >>>>>>>>>>> multi-layered clients. First of all, I agree that exposing > > >>>>> different > > >>>>>>>>>>> "levels" of clients would be a nice addition, and actually > > >>> there > > >>>>> have > > >>>>>>>>>>> been some discussions about doing so in the future. Now for > > >>> this > > >>>>>>>>>>> specific discussion: > > >>>>>>>>>>> i) I do not think that we should expose the > > >>>>>>>>>>> ClusterDescriptor/ClusterSpecification to the user, as this > > >>> ties > > >>>> us > > >>>>>> to > > >>>>>>>>>>> a specific architecture which may change in the future. > > >>>>>>>>>>> ii) I do not think it should be the Executor that will > > >>>> provide > > >>>>> a > > >>>>>>>>>>> JobClient for an already running job (only for the Jobs that > > >>> it > > >>>>>>>>>>> submits). The job of the executor should just be to execute() > > >>> a > > >>>>>>>>>>> pipeline. > > >>>>>>>>>>> iii) I think a solution that respects the separation of > > >>>>> concerns > > >>>>>>>>>>> could be the addition of another component (in the future), > > >>>>> something > > >>>>>>>>>>> like a ClientFactory, or ClusterFactory that will have > methods > > >>>>> like: > > >>>>>>>>>>> ClusterClient createCluster(Configuration), JobClient > > >>>>>>>>>>> retrieveJobClient(Configuration , JobId), maybe even > (although > > >>>> not > > >>>>>>>>>>> sure) Executor getExecutor(Configuration ) and maybe more. > > >>> This > > >>>>>>>>>>> component would be responsible to interact with a cluster > > >>> manager > > >>>>>> like > > >>>>>>>>>>> Yarn and do what is now being done by the ClusterDescriptor > > >>> plus > > >>>>> some > > >>>>>>>>>>> more stuff. > > >>>>>>>>>>> > > >>>>>>>>>>> Although under the hood all these abstractions (Environments, > > >>>>>>>>>>> Executors, ...) underneath use the same clients, I believe > > >>> their > > >>>>>>>>>>> job/existence is not contradicting but they simply hide some > > >>> of > > >>>> the > > >>>>>>>>>>> complexity from the user, and give us, as developers some > > >>> freedom > > >>>>> to > > >>>>>>>>>>> change in the future some of the parts. For example, the > > >>> executor > > >>>>>> will > > >>>>>>>>>>> take a Pipeline, create a JobGraph and submit it, instead of > > >>>>>> requiring > > >>>>>>>>>>> the user to do each step separately. This allows us to, for > > >>>>> example, > > >>>>>>>>>>> get rid of the Plan if in the future everything is > DataStream. > > >>>>>>>>>>> Essentially, I think of these as layers of an onion with the > > >>>>> clients > > >>>>>>>>>>> being close to the core. The higher you go, the more > > >>>> functionality > > >>>>> is > > >>>>>>>>>>> included and hidden from the public eye. > > >>>>>>>>>>> > > >>>>>>>>>>> Point iii) by the way is just a thought and by no means > > >>> final. I > > >>>>> also > > >>>>>>>>>>> like the idea of multi-layered clients so this may spark up > > >>> the > > >>>>>>>>>>> discussion. > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> Kostas > > >>>>>>>>>>> > > >>>>>>>>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek < > > >>>>>> [hidden email]> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>> Hi Tison, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks for proposing the document! I had some comments on > the > > >>>>>>>> document. > > >>>>>>>>>>>> > > >>>>>>>>>>>> I think the only complex thing that we still need to figure > > >>> out > > >>>> is > > >>>>>>>> how > > >>>>>>>>>>> to get a JobClient for a job that is already running. As you > > >>>>>> mentioned > > >>>>>>>> in > > >>>>>>>>>>> the document. Currently I’m thinking that its ok to add a > > >>> method > > >>>> to > > >>>>>>>>>>> Executor for retrieving a JobClient for a running job by > > >>>> providing > > >>>>> an > > >>>>>>>> ID. > > >>>>>>>>>>> Let’s see what Kostas has to say on the topic. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Best, > > >>>>>>>>>>>> Aljoscha > > >>>>>>>>>>>> > > >>>>>>>>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email] > > > > >>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Summary from the discussion about introducing Flink > > >>> JobClient > > >>>>>>>> API[1] > > >>>>>>>>>>> we > > >>>>>>>>>>>>> draft FLIP-74[2] to > > >>>>>>>>>>>>> gather thoughts and towards a standard public user-facing > > >>>>>>>> interfaces. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> This discussion thread aims at standardizing job level > > >>> client > > >>>>> API. > > >>>>>>>>>>> But I'd > > >>>>>>>>>>>>> like to emphasize that > > >>>>>>>>>>>>> how to retrieve JobClient possibly causes further > > >>> discussion on > > >>>>>>>>>>> different > > >>>>>>>>>>>>> level clients exposed from > > >>>>>>>>>>>>> Flink so that a following thread will be started later to > > >>>>>>>> coordinate > > >>>>>>>>>>>>> FLIP-73 and FLIP-74 on > > >>>>>>>>>>>>> expose issue. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Looking forward to your opinions. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Best, > > >>>>>>>>>>>>> tison. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> [1] > > >>>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > >>>>>>>>>>>>> [2] > > >>>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>> > > >>>>>> > > >>>>> > > >>>>> -- > > >>>>> > > >>>>> Konstantin Knauf | Solutions Architect > > >>>>> > > >>>>> +49 160 91394525 > > >>>>> > > >>>>> > > >>>>> Follow us @VervericaData Ververica <https://www.ververica.com/> > > >>>>> > > >>>>> > > >>>>> -- > > >>>>> > > >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > >>>>> Conference > > >>>>> > > >>>>> Stream Processing | Event Driven | Real Time > > >>>>> > > >>>>> -- > > >>>>> > > >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > >>>>> > > >>>>> -- > > >>>>> Ververica GmbH > > >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B > > >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung > Jason, > > >>> Ji > > >>>>> (Tony) Cheng > > >>>>> > > >>>> > > >>> > > >>> > > >>> -- > > >>> > > >>> Konstantin Knauf | Solutions Architect > > >>> > > >>> +49 160 91394525 > > >>> > > >>> > > >>> Follow us @VervericaData Ververica <https://www.ververica.com/> > > >>> > > >>> > > >>> -- > > >>> > > >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > >>> Conference > > >>> > > >>> Stream Processing | Event Driven | Real Time > > >>> > > >>> -- > > >>> > > >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > >>> > > >>> -- > > >>> Ververica GmbH > > >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B > > >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, > Ji > > >>> (Tony) Cheng > > >>> > > >> > > > |
Free forum by Nabble | Edit this page |