[DISCUSS] FLIP-74: Flink JobClient API

classic Classic list List threaded Threaded
20 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-74: Flink JobClient API

tison
Hi all,

Summary from the discussion about introducing Flink JobClient API[1] we
draft FLIP-74[2] to
gather thoughts and towards a standard public user-facing interfaces.

This discussion thread aims at standardizing job level client API. But I'd
like to emphasize that
how to retrieve JobClient possibly causes further discussion on different
level clients exposed from
Flink so that a following thread will be started later to coordinate
FLIP-73 and FLIP-74 on
expose issue.

Looking forward to your opinions.

Best,
tison.

[1]
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Aljoscha Krettek-2
Hi Tison,

Thanks for proposing the document! I had some comments on the document.

I think the only complex thing that we still need to figure out is how to get a JobClient for a job that is already running. As you mentioned in the document. Currently I’m thinking that its ok to add a method to Executor for retrieving a JobClient for a running job by providing an ID. Let’s see what Kostas has to say on the topic.

Best,
Aljoscha

> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote:
>
> Hi all,
>
> Summary from the discussion about introducing Flink JobClient API[1] we
> draft FLIP-74[2] to
> gather thoughts and towards a standard public user-facing interfaces.
>
> This discussion thread aims at standardizing job level client API. But I'd
> like to emphasize that
> how to retrieve JobClient possibly causes further discussion on different
> level clients exposed from
> Flink so that a following thread will be started later to coordinate
> FLIP-73 and FLIP-74 on
> expose issue.
>
> Looking forward to your opinions.
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Kostas Kloudas-4
Hi Tison,

Thanks for the FLIP and launching the discussion!

As a first note, big +1 on providing/exposing a JobClient to the users!

Some points that would be nice to be clarified:
1) You mention that we can get rid of the DETACHED mode: I agree that
at a high level, given that everything will now be asynchronous, there
is no need to keep the DETACHED mode but I think we should specify
some aspects. For example, without the explicit separation of the
modes, what happens when the job finishes. Does the client
periodically poll for the result always or the result is pushed when
in NON-DETACHED mode? What happens if the client disconnects and
reconnects?

2) On the "how to retrieve a JobClient for a running Job", I think
this is related to the other discussion you opened in the ML about
multi-layered clients. First of all, I agree that exposing different
"levels" of clients would be a nice addition, and actually there have
been some discussions about doing so in the future. Now for this
specific discussion:
      i) I do not think that we should expose the
ClusterDescriptor/ClusterSpecification to the user, as this ties us to
a specific architecture which may change in the future.
     ii) I do not think it should be the Executor that will provide a
JobClient for an already running job (only for the Jobs that it
submits). The job of the executor should just be to execute() a
pipeline.
     iii) I think a solution that respects the separation of concerns
could be the addition of another component (in the future), something
like a ClientFactory, or ClusterFactory that will have methods like:
ClusterClient createCluster(Configuration), JobClient
retrieveJobClient(Configuration , JobId), maybe even (although not
sure) Executor getExecutor(Configuration ) and maybe more. This
component would be responsible to interact with a cluster manager like
Yarn and do what is now being done by the ClusterDescriptor plus some
more stuff.

Although under the hood all these abstractions (Environments,
Executors, ...) underneath use the same clients, I believe their
job/existence is not contradicting but they simply hide some of the
complexity from the user, and give us, as developers some freedom to
change in the future some of the parts. For example, the executor will
take a Pipeline, create a JobGraph and submit it, instead of requiring
the user to do each step separately. This allows us to, for example,
get rid of the Plan if in the future everything is DataStream.
Essentially, I think of these as layers of an onion with the clients
being close to the core. The higher you go, the more functionality is
included and hidden from the public eye.

Point iii) by the way is just a thought and by no means final. I also
like the idea of multi-layered clients so this may spark up the
discussion.

Cheers,
Kostas

On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]> wrote:

>
> Hi Tison,
>
> Thanks for proposing the document! I had some comments on the document.
>
> I think the only complex thing that we still need to figure out is how to get a JobClient for a job that is already running. As you mentioned in the document. Currently I’m thinking that its ok to add a method to Executor for retrieving a JobClient for a running job by providing an ID. Let’s see what Kostas has to say on the topic.
>
> Best,
> Aljoscha
>
> > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote:
> >
> > Hi all,
> >
> > Summary from the discussion about introducing Flink JobClient API[1] we
> > draft FLIP-74[2] to
> > gather thoughts and towards a standard public user-facing interfaces.
> >
> > This discussion thread aims at standardizing job level client API. But I'd
> > like to emphasize that
> > how to retrieve JobClient possibly causes further discussion on different
> > level clients exposed from
> > Flink so that a following thread will be started later to coordinate
> > FLIP-73 and FLIP-74 on
> > expose issue.
> >
> > Looking forward to your opinions.
> >
> > Best,
> > tison.
> >
> > [1]
> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > [2]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
Thanks for your replies Kostas & Aljoscha!

Below are replies point by point.

1. For DETACHED mode, what I said there is about the DETACHED mode in
client side.
There are two configurations overload the item DETACHED[1].

In client side, it means whether or not client.submitJob is blocking to job
execution result.
Due to client.submitJob returns CompletableFuture<JobClient> NON-DETACHED
is no
power at all. Caller of submitJob makes the decision whether or not
blocking to get the
JobClient and request for the job execution result. If client crashes, it
is a user scope
exception that should be handled in user code; if client lost connection to
cluster, we have
a retry times and interval configuration that automatically retry and
throws an user scope
exception if exceed.

Your comment about poll for result or job result sounds like a concern on
cluster side.

In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED
configured,
JobCluster exits on job finished; if NON-DETACHED configured, JobCluster
exits on job
execution result delivered. FLIP-74 doesn't stick to changes on this scope,
it is just remained.

However, it is an interesting part we can revisit this implementation a bit.

<see the next email for compact reply in this one>

2. The retrieval of JobClient is so important that if we don't have a way
to retrieve JobClient it is
a dumb public user-facing interface(what a strange state :P).

About the retrieval of JobClient, as mentioned in the document, two ways
should be supported.

(1). Retrieved as return type of job submission.
(2). Retrieve a JobClient of existing job.(with job id)

I highly respect your thoughts about how Executors should be and thoughts
on multi-layered clients.
Although, (2) is not supported by public interfaces as summary of
discussion above, we can discuss
a bit on the place of Executors on multi-layered clients and find a way to
retrieve JobClient of
existing job with public client API. I will comment in FLIP-73 thread[2]
since it is almost about Executors.

Best,
tison.

[1]
https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
[2]
https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E




Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:

> Hi Tison,
>
> Thanks for the FLIP and launching the discussion!
>
> As a first note, big +1 on providing/exposing a JobClient to the users!
>
> Some points that would be nice to be clarified:
> 1) You mention that we can get rid of the DETACHED mode: I agree that
> at a high level, given that everything will now be asynchronous, there
> is no need to keep the DETACHED mode but I think we should specify
> some aspects. For example, without the explicit separation of the
> modes, what happens when the job finishes. Does the client
> periodically poll for the result always or the result is pushed when
> in NON-DETACHED mode? What happens if the client disconnects and
> reconnects?
>
> 2) On the "how to retrieve a JobClient for a running Job", I think
> this is related to the other discussion you opened in the ML about
> multi-layered clients. First of all, I agree that exposing different
> "levels" of clients would be a nice addition, and actually there have
> been some discussions about doing so in the future. Now for this
> specific discussion:
>       i) I do not think that we should expose the
> ClusterDescriptor/ClusterSpecification to the user, as this ties us to
> a specific architecture which may change in the future.
>      ii) I do not think it should be the Executor that will provide a
> JobClient for an already running job (only for the Jobs that it
> submits). The job of the executor should just be to execute() a
> pipeline.
>      iii) I think a solution that respects the separation of concerns
> could be the addition of another component (in the future), something
> like a ClientFactory, or ClusterFactory that will have methods like:
> ClusterClient createCluster(Configuration), JobClient
> retrieveJobClient(Configuration , JobId), maybe even (although not
> sure) Executor getExecutor(Configuration ) and maybe more. This
> component would be responsible to interact with a cluster manager like
> Yarn and do what is now being done by the ClusterDescriptor plus some
> more stuff.
>
> Although under the hood all these abstractions (Environments,
> Executors, ...) underneath use the same clients, I believe their
> job/existence is not contradicting but they simply hide some of the
> complexity from the user, and give us, as developers some freedom to
> change in the future some of the parts. For example, the executor will
> take a Pipeline, create a JobGraph and submit it, instead of requiring
> the user to do each step separately. This allows us to, for example,
> get rid of the Plan if in the future everything is DataStream.
> Essentially, I think of these as layers of an onion with the clients
> being close to the core. The higher you go, the more functionality is
> included and hidden from the public eye.
>
> Point iii) by the way is just a thought and by no means final. I also
> like the idea of multi-layered clients so this may spark up the
> discussion.
>
> Cheers,
> Kostas
>
> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]>
> wrote:
> >
> > Hi Tison,
> >
> > Thanks for proposing the document! I had some comments on the document.
> >
> > I think the only complex thing that we still need to figure out is how
> to get a JobClient for a job that is already running. As you mentioned in
> the document. Currently I’m thinking that its ok to add a method to
> Executor for retrieving a JobClient for a running job by providing an ID.
> Let’s see what Kostas has to say on the topic.
> >
> > Best,
> > Aljoscha
> >
> > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote:
> > >
> > > Hi all,
> > >
> > > Summary from the discussion about introducing Flink JobClient API[1] we
> > > draft FLIP-74[2] to
> > > gather thoughts and towards a standard public user-facing interfaces.
> > >
> > > This discussion thread aims at standardizing job level client API. But
> I'd
> > > like to emphasize that
> > > how to retrieve JobClient possibly causes further discussion on
> different
> > > level clients exposed from
> > > Flink so that a following thread will be started later to coordinate
> > > FLIP-73 and FLIP-74 on
> > > expose issue.
> > >
> > > Looking forward to your opinions.
> > >
> > > Best,
> > > tison.
> > >
> > > [1]
> > >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > [2]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
About JobCluster

Actually I am not quite sure what we gains from DETACHED configuration on
cluster side.
We don't have a NON-DETACHED JobCluster in fact in our codebase, right?

It comes to me one major questions we have to answer first.

*What JobCluster conceptually is exactly*

Related discussion can be found in JIRA[1] and mailing list[2]. Stephan
gives a nice
description of JobCluster:

Two things to add: - The job mode is very nice in the way that it runs the
client inside the cluster (in the same image/process that is the JM) and
thus unifies both applications and what the Spark world calls the "driver
mode". - Another thing I would add is that during the FLIP-6 design, we
were thinking about setups where Dispatcher and JobManager are separate
processes. A Yarn or Mesos Dispatcher of a session could run independently
(even as privileged processes executing no code). Then you the "per-job"
mode could still be helpful: when a job is submitted to the dispatcher, it
launches the JM again in a per-job mode, so that JM and TM processes are
bound to teh job only. For higher security setups, it is important that
processes are not reused across jobs.

However, currently in "per-job" mode we generate JobGraph in client side,
launching
the JobCluster and retrieve the JobGraph for execution. So actually, we
don't "run the
client inside the cluster".

Besides, refer to the discussion with Till[1], it would be helpful we
follow the same process
of session mode for that of "per-job" mode in user perspective, that we
don't use
OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink
cluster in env.execute.

Generally 2 points

1. Running Flink job by invoke user main method and execute throughout,
instead of create
JobGraph from main-class.
2. Run the client inside the cluster.

If 1 and 2 are implemented. There is obvious no need for DETACHED mode in
cluster side
because we just shutdown the cluster on the exit of client that running
inside cluster. Whether
or not delivered the result is up to user code.

[1]
https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
[2]
https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E


Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:

> Thanks for your replies Kostas & Aljoscha!
>
> Below are replies point by point.
>
> 1. For DETACHED mode, what I said there is about the DETACHED mode in
> client side.
> There are two configurations overload the item DETACHED[1].
>
> In client side, it means whether or not client.submitJob is blocking to
> job execution result.
> Due to client.submitJob returns CompletableFuture<JobClient> NON-DETACHED
> is no
> power at all. Caller of submitJob makes the decision whether or not
> blocking to get the
> JobClient and request for the job execution result. If client crashes, it
> is a user scope
> exception that should be handled in user code; if client lost connection
> to cluster, we have
> a retry times and interval configuration that automatically retry and
> throws an user scope
> exception if exceed.
>
> Your comment about poll for result or job result sounds like a concern on
> cluster side.
>
> In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED
> configured,
> JobCluster exits on job finished; if NON-DETACHED configured, JobCluster
> exits on job
> execution result delivered. FLIP-74 doesn't stick to changes on this
> scope, it is just remained.
>
> However, it is an interesting part we can revisit this implementation a
> bit.
>
> <see the next email for compact reply in this one>
>
> 2. The retrieval of JobClient is so important that if we don't have a way
> to retrieve JobClient it is
> a dumb public user-facing interface(what a strange state :P).
>
> About the retrieval of JobClient, as mentioned in the document, two ways
> should be supported.
>
> (1). Retrieved as return type of job submission.
> (2). Retrieve a JobClient of existing job.(with job id)
>
> I highly respect your thoughts about how Executors should be and thoughts
> on multi-layered clients.
> Although, (2) is not supported by public interfaces as summary of
> discussion above, we can discuss
> a bit on the place of Executors on multi-layered clients and find a way to
> retrieve JobClient of
> existing job with public client API. I will comment in FLIP-73 thread[2]
> since it is almost about Executors.
>
> Best,
> tison.
>
> [1]
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
> [2]
> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
>
>
>
>
> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
>
>> Hi Tison,
>>
>> Thanks for the FLIP and launching the discussion!
>>
>> As a first note, big +1 on providing/exposing a JobClient to the users!
>>
>> Some points that would be nice to be clarified:
>> 1) You mention that we can get rid of the DETACHED mode: I agree that
>> at a high level, given that everything will now be asynchronous, there
>> is no need to keep the DETACHED mode but I think we should specify
>> some aspects. For example, without the explicit separation of the
>> modes, what happens when the job finishes. Does the client
>> periodically poll for the result always or the result is pushed when
>> in NON-DETACHED mode? What happens if the client disconnects and
>> reconnects?
>>
>> 2) On the "how to retrieve a JobClient for a running Job", I think
>> this is related to the other discussion you opened in the ML about
>> multi-layered clients. First of all, I agree that exposing different
>> "levels" of clients would be a nice addition, and actually there have
>> been some discussions about doing so in the future. Now for this
>> specific discussion:
>>       i) I do not think that we should expose the
>> ClusterDescriptor/ClusterSpecification to the user, as this ties us to
>> a specific architecture which may change in the future.
>>      ii) I do not think it should be the Executor that will provide a
>> JobClient for an already running job (only for the Jobs that it
>> submits). The job of the executor should just be to execute() a
>> pipeline.
>>      iii) I think a solution that respects the separation of concerns
>> could be the addition of another component (in the future), something
>> like a ClientFactory, or ClusterFactory that will have methods like:
>> ClusterClient createCluster(Configuration), JobClient
>> retrieveJobClient(Configuration , JobId), maybe even (although not
>> sure) Executor getExecutor(Configuration ) and maybe more. This
>> component would be responsible to interact with a cluster manager like
>> Yarn and do what is now being done by the ClusterDescriptor plus some
>> more stuff.
>>
>> Although under the hood all these abstractions (Environments,
>> Executors, ...) underneath use the same clients, I believe their
>> job/existence is not contradicting but they simply hide some of the
>> complexity from the user, and give us, as developers some freedom to
>> change in the future some of the parts. For example, the executor will
>> take a Pipeline, create a JobGraph and submit it, instead of requiring
>> the user to do each step separately. This allows us to, for example,
>> get rid of the Plan if in the future everything is DataStream.
>> Essentially, I think of these as layers of an onion with the clients
>> being close to the core. The higher you go, the more functionality is
>> included and hidden from the public eye.
>>
>> Point iii) by the way is just a thought and by no means final. I also
>> like the idea of multi-layered clients so this may spark up the
>> discussion.
>>
>> Cheers,
>> Kostas
>>
>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]>
>> wrote:
>> >
>> > Hi Tison,
>> >
>> > Thanks for proposing the document! I had some comments on the document.
>> >
>> > I think the only complex thing that we still need to figure out is how
>> to get a JobClient for a job that is already running. As you mentioned in
>> the document. Currently I’m thinking that its ok to add a method to
>> Executor for retrieving a JobClient for a running job by providing an ID.
>> Let’s see what Kostas has to say on the topic.
>> >
>> > Best,
>> > Aljoscha
>> >
>> > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote:
>> > >
>> > > Hi all,
>> > >
>> > > Summary from the discussion about introducing Flink JobClient API[1]
>> we
>> > > draft FLIP-74[2] to
>> > > gather thoughts and towards a standard public user-facing interfaces.
>> > >
>> > > This discussion thread aims at standardizing job level client API.
>> But I'd
>> > > like to emphasize that
>> > > how to retrieve JobClient possibly causes further discussion on
>> different
>> > > level clients exposed from
>> > > Flink so that a following thread will be started later to coordinate
>> > > FLIP-73 and FLIP-74 on
>> > > expose issue.
>> > >
>> > > Looking forward to your opinions.
>> > >
>> > > Best,
>> > > tison.
>> > >
>> > > [1]
>> > >
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>> > > [2]
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
modify

/we just shutdown the cluster on the exit of client that running inside
cluster/

to

we just shutdown the cluster on both the exit of client that running inside
cluster and the finish of job.
Since client is running inside cluster we can easily wait for the end of
two both in ClusterEntrypoint.


Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:

> About JobCluster
>
> Actually I am not quite sure what we gains from DETACHED configuration on
> cluster side.
> We don't have a NON-DETACHED JobCluster in fact in our codebase, right?
>
> It comes to me one major questions we have to answer first.
>
> *What JobCluster conceptually is exactly*
>
> Related discussion can be found in JIRA[1] and mailing list[2]. Stephan
> gives a nice
> description of JobCluster:
>
> Two things to add: - The job mode is very nice in the way that it runs the
> client inside the cluster (in the same image/process that is the JM) and
> thus unifies both applications and what the Spark world calls the "driver
> mode". - Another thing I would add is that during the FLIP-6 design, we
> were thinking about setups where Dispatcher and JobManager are separate
> processes. A Yarn or Mesos Dispatcher of a session could run independently
> (even as privileged processes executing no code). Then you the "per-job"
> mode could still be helpful: when a job is submitted to the dispatcher, it
> launches the JM again in a per-job mode, so that JM and TM processes are
> bound to teh job only. For higher security setups, it is important that
> processes are not reused across jobs.
>
> However, currently in "per-job" mode we generate JobGraph in client side,
> launching
> the JobCluster and retrieve the JobGraph for execution. So actually, we
> don't "run the
> client inside the cluster".
>
> Besides, refer to the discussion with Till[1], it would be helpful we
> follow the same process
> of session mode for that of "per-job" mode in user perspective, that we
> don't use
> OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink
> cluster in env.execute.
>
> Generally 2 points
>
> 1. Running Flink job by invoke user main method and execute throughout,
> instead of create
> JobGraph from main-class.
> 2. Run the client inside the cluster.
>
> If 1 and 2 are implemented. There is obvious no need for DETACHED mode in
> cluster side
> because we just shutdown the cluster on the exit of client that running
> inside cluster. Whether
> or not delivered the result is up to user code.
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
> [2]
> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
>
>
> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
>
>> Thanks for your replies Kostas & Aljoscha!
>>
>> Below are replies point by point.
>>
>> 1. For DETACHED mode, what I said there is about the DETACHED mode in
>> client side.
>> There are two configurations overload the item DETACHED[1].
>>
>> In client side, it means whether or not client.submitJob is blocking to
>> job execution result.
>> Due to client.submitJob returns CompletableFuture<JobClient> NON-DETACHED
>> is no
>> power at all. Caller of submitJob makes the decision whether or not
>> blocking to get the
>> JobClient and request for the job execution result. If client crashes, it
>> is a user scope
>> exception that should be handled in user code; if client lost connection
>> to cluster, we have
>> a retry times and interval configuration that automatically retry and
>> throws an user scope
>> exception if exceed.
>>
>> Your comment about poll for result or job result sounds like a concern on
>> cluster side.
>>
>> In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED
>> configured,
>> JobCluster exits on job finished; if NON-DETACHED configured, JobCluster
>> exits on job
>> execution result delivered. FLIP-74 doesn't stick to changes on this
>> scope, it is just remained.
>>
>> However, it is an interesting part we can revisit this implementation a
>> bit.
>>
>> <see the next email for compact reply in this one>
>>
>> 2. The retrieval of JobClient is so important that if we don't have a way
>> to retrieve JobClient it is
>> a dumb public user-facing interface(what a strange state :P).
>>
>> About the retrieval of JobClient, as mentioned in the document, two ways
>> should be supported.
>>
>> (1). Retrieved as return type of job submission.
>> (2). Retrieve a JobClient of existing job.(with job id)
>>
>> I highly respect your thoughts about how Executors should be and thoughts
>> on multi-layered clients.
>> Although, (2) is not supported by public interfaces as summary of
>> discussion above, we can discuss
>> a bit on the place of Executors on multi-layered clients and find a way
>> to retrieve JobClient of
>> existing job with public client API. I will comment in FLIP-73 thread[2]
>> since it is almost about Executors.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
>> [2]
>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
>>
>>
>>
>>
>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
>>
>>> Hi Tison,
>>>
>>> Thanks for the FLIP and launching the discussion!
>>>
>>> As a first note, big +1 on providing/exposing a JobClient to the users!
>>>
>>> Some points that would be nice to be clarified:
>>> 1) You mention that we can get rid of the DETACHED mode: I agree that
>>> at a high level, given that everything will now be asynchronous, there
>>> is no need to keep the DETACHED mode but I think we should specify
>>> some aspects. For example, without the explicit separation of the
>>> modes, what happens when the job finishes. Does the client
>>> periodically poll for the result always or the result is pushed when
>>> in NON-DETACHED mode? What happens if the client disconnects and
>>> reconnects?
>>>
>>> 2) On the "how to retrieve a JobClient for a running Job", I think
>>> this is related to the other discussion you opened in the ML about
>>> multi-layered clients. First of all, I agree that exposing different
>>> "levels" of clients would be a nice addition, and actually there have
>>> been some discussions about doing so in the future. Now for this
>>> specific discussion:
>>>       i) I do not think that we should expose the
>>> ClusterDescriptor/ClusterSpecification to the user, as this ties us to
>>> a specific architecture which may change in the future.
>>>      ii) I do not think it should be the Executor that will provide a
>>> JobClient for an already running job (only for the Jobs that it
>>> submits). The job of the executor should just be to execute() a
>>> pipeline.
>>>      iii) I think a solution that respects the separation of concerns
>>> could be the addition of another component (in the future), something
>>> like a ClientFactory, or ClusterFactory that will have methods like:
>>> ClusterClient createCluster(Configuration), JobClient
>>> retrieveJobClient(Configuration , JobId), maybe even (although not
>>> sure) Executor getExecutor(Configuration ) and maybe more. This
>>> component would be responsible to interact with a cluster manager like
>>> Yarn and do what is now being done by the ClusterDescriptor plus some
>>> more stuff.
>>>
>>> Although under the hood all these abstractions (Environments,
>>> Executors, ...) underneath use the same clients, I believe their
>>> job/existence is not contradicting but they simply hide some of the
>>> complexity from the user, and give us, as developers some freedom to
>>> change in the future some of the parts. For example, the executor will
>>> take a Pipeline, create a JobGraph and submit it, instead of requiring
>>> the user to do each step separately. This allows us to, for example,
>>> get rid of the Plan if in the future everything is DataStream.
>>> Essentially, I think of these as layers of an onion with the clients
>>> being close to the core. The higher you go, the more functionality is
>>> included and hidden from the public eye.
>>>
>>> Point iii) by the way is just a thought and by no means final. I also
>>> like the idea of multi-layered clients so this may spark up the
>>> discussion.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]>
>>> wrote:
>>> >
>>> > Hi Tison,
>>> >
>>> > Thanks for proposing the document! I had some comments on the document.
>>> >
>>> > I think the only complex thing that we still need to figure out is how
>>> to get a JobClient for a job that is already running. As you mentioned in
>>> the document. Currently I’m thinking that its ok to add a method to
>>> Executor for retrieving a JobClient for a running job by providing an ID.
>>> Let’s see what Kostas has to say on the topic.
>>> >
>>> > Best,
>>> > Aljoscha
>>> >
>>> > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote:
>>> > >
>>> > > Hi all,
>>> > >
>>> > > Summary from the discussion about introducing Flink JobClient API[1]
>>> we
>>> > > draft FLIP-74[2] to
>>> > > gather thoughts and towards a standard public user-facing interfaces.
>>> > >
>>> > > This discussion thread aims at standardizing job level client API.
>>> But I'd
>>> > > like to emphasize that
>>> > > how to retrieve JobClient possibly causes further discussion on
>>> different
>>> > > level clients exposed from
>>> > > Flink so that a following thread will be started later to coordinate
>>> > > FLIP-73 and FLIP-74 on
>>> > > expose issue.
>>> > >
>>> > > Looking forward to your opinions.
>>> > >
>>> > > Best,
>>> > > tison.
>>> > >
>>> > > [1]
>>> > >
>>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>> > > [2]
>>> > >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Flavio Pompermaier
Hi all,
just a remark about the Flink REST APIs (and its client as well): almost
all the times we need a way to dynamically know which jobs are contained in
a jar file, and this could be exposed by the REST endpoint under
/jars/:jarid/entry-points (a simple way to implement this would be to check
the value of Main-class or Main-classes inside the Manifest of the jar if
they exists [1]).

I understand that this is something that is not strictly required to
execute Flink jobs but IMHO it would ease A LOT the work of UI developers
that could have a way to show the users all available jobs inside a jar +
their configurable parameters.
For example, right now in the WebUI, you can upload a jar and then you have
to set (without any autocomplete or UI support) the main class and their
params (for example using a string like --param1 xx --param2 yy).
Adding this functionality to the REST API and the respective client would
enable the WebUI (and all UIs interacting with a Flink cluster) to prefill
a dropdown list containing the list of entry-point classes (i.e. Flink
jobs) and, once selected, their required (typed) parameters.

Best,
Flavio

[1] https://issues.apache.org/jira/browse/FLINK-10864

On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote:

> modify
>
> /we just shutdown the cluster on the exit of client that running inside
> cluster/
>
> to
>
> we just shutdown the cluster on both the exit of client that running inside
> cluster and the finish of job.
> Since client is running inside cluster we can easily wait for the end of
> two both in ClusterEntrypoint.
>
>
> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
>
> > About JobCluster
> >
> > Actually I am not quite sure what we gains from DETACHED configuration on
> > cluster side.
> > We don't have a NON-DETACHED JobCluster in fact in our codebase, right?
> >
> > It comes to me one major questions we have to answer first.
> >
> > *What JobCluster conceptually is exactly*
> >
> > Related discussion can be found in JIRA[1] and mailing list[2]. Stephan
> > gives a nice
> > description of JobCluster:
> >
> > Two things to add: - The job mode is very nice in the way that it runs
> the
> > client inside the cluster (in the same image/process that is the JM) and
> > thus unifies both applications and what the Spark world calls the "driver
> > mode". - Another thing I would add is that during the FLIP-6 design, we
> > were thinking about setups where Dispatcher and JobManager are separate
> > processes. A Yarn or Mesos Dispatcher of a session could run
> independently
> > (even as privileged processes executing no code). Then you the "per-job"
> > mode could still be helpful: when a job is submitted to the dispatcher,
> it
> > launches the JM again in a per-job mode, so that JM and TM processes are
> > bound to teh job only. For higher security setups, it is important that
> > processes are not reused across jobs.
> >
> > However, currently in "per-job" mode we generate JobGraph in client side,
> > launching
> > the JobCluster and retrieve the JobGraph for execution. So actually, we
> > don't "run the
> > client inside the cluster".
> >
> > Besides, refer to the discussion with Till[1], it would be helpful we
> > follow the same process
> > of session mode for that of "per-job" mode in user perspective, that we
> > don't use
> > OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink
> > cluster in env.execute.
> >
> > Generally 2 points
> >
> > 1. Running Flink job by invoke user main method and execute throughout,
> > instead of create
> > JobGraph from main-class.
> > 2. Run the client inside the cluster.
> >
> > If 1 and 2 are implemented. There is obvious no need for DETACHED mode in
> > cluster side
> > because we just shutdown the cluster on the exit of client that running
> > inside cluster. Whether
> > or not delivered the result is up to user code.
> >
> > [1]
> >
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
> > [2]
> >
> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
> >
> >
> > Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
> >
> >> Thanks for your replies Kostas & Aljoscha!
> >>
> >> Below are replies point by point.
> >>
> >> 1. For DETACHED mode, what I said there is about the DETACHED mode in
> >> client side.
> >> There are two configurations overload the item DETACHED[1].
> >>
> >> In client side, it means whether or not client.submitJob is blocking to
> >> job execution result.
> >> Due to client.submitJob returns CompletableFuture<JobClient>
> NON-DETACHED
> >> is no
> >> power at all. Caller of submitJob makes the decision whether or not
> >> blocking to get the
> >> JobClient and request for the job execution result. If client crashes,
> it
> >> is a user scope
> >> exception that should be handled in user code; if client lost connection
> >> to cluster, we have
> >> a retry times and interval configuration that automatically retry and
> >> throws an user scope
> >> exception if exceed.
> >>
> >> Your comment about poll for result or job result sounds like a concern
> on
> >> cluster side.
> >>
> >> In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED
> >> configured,
> >> JobCluster exits on job finished; if NON-DETACHED configured, JobCluster
> >> exits on job
> >> execution result delivered. FLIP-74 doesn't stick to changes on this
> >> scope, it is just remained.
> >>
> >> However, it is an interesting part we can revisit this implementation a
> >> bit.
> >>
> >> <see the next email for compact reply in this one>
> >>
> >> 2. The retrieval of JobClient is so important that if we don't have a
> way
> >> to retrieve JobClient it is
> >> a dumb public user-facing interface(what a strange state :P).
> >>
> >> About the retrieval of JobClient, as mentioned in the document, two ways
> >> should be supported.
> >>
> >> (1). Retrieved as return type of job submission.
> >> (2). Retrieve a JobClient of existing job.(with job id)
> >>
> >> I highly respect your thoughts about how Executors should be and
> thoughts
> >> on multi-layered clients.
> >> Although, (2) is not supported by public interfaces as summary of
> >> discussion above, we can discuss
> >> a bit on the place of Executors on multi-layered clients and find a way
> >> to retrieve JobClient of
> >> existing job with public client API. I will comment in FLIP-73 thread[2]
> >> since it is almost about Executors.
> >>
> >> Best,
> >> tison.
> >>
> >> [1]
> >>
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
> >> [2]
> >>
> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
> >>
> >>
> >>
> >>
> >> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
> >>
> >>> Hi Tison,
> >>>
> >>> Thanks for the FLIP and launching the discussion!
> >>>
> >>> As a first note, big +1 on providing/exposing a JobClient to the users!
> >>>
> >>> Some points that would be nice to be clarified:
> >>> 1) You mention that we can get rid of the DETACHED mode: I agree that
> >>> at a high level, given that everything will now be asynchronous, there
> >>> is no need to keep the DETACHED mode but I think we should specify
> >>> some aspects. For example, without the explicit separation of the
> >>> modes, what happens when the job finishes. Does the client
> >>> periodically poll for the result always or the result is pushed when
> >>> in NON-DETACHED mode? What happens if the client disconnects and
> >>> reconnects?
> >>>
> >>> 2) On the "how to retrieve a JobClient for a running Job", I think
> >>> this is related to the other discussion you opened in the ML about
> >>> multi-layered clients. First of all, I agree that exposing different
> >>> "levels" of clients would be a nice addition, and actually there have
> >>> been some discussions about doing so in the future. Now for this
> >>> specific discussion:
> >>>       i) I do not think that we should expose the
> >>> ClusterDescriptor/ClusterSpecification to the user, as this ties us to
> >>> a specific architecture which may change in the future.
> >>>      ii) I do not think it should be the Executor that will provide a
> >>> JobClient for an already running job (only for the Jobs that it
> >>> submits). The job of the executor should just be to execute() a
> >>> pipeline.
> >>>      iii) I think a solution that respects the separation of concerns
> >>> could be the addition of another component (in the future), something
> >>> like a ClientFactory, or ClusterFactory that will have methods like:
> >>> ClusterClient createCluster(Configuration), JobClient
> >>> retrieveJobClient(Configuration , JobId), maybe even (although not
> >>> sure) Executor getExecutor(Configuration ) and maybe more. This
> >>> component would be responsible to interact with a cluster manager like
> >>> Yarn and do what is now being done by the ClusterDescriptor plus some
> >>> more stuff.
> >>>
> >>> Although under the hood all these abstractions (Environments,
> >>> Executors, ...) underneath use the same clients, I believe their
> >>> job/existence is not contradicting but they simply hide some of the
> >>> complexity from the user, and give us, as developers some freedom to
> >>> change in the future some of the parts. For example, the executor will
> >>> take a Pipeline, create a JobGraph and submit it, instead of requiring
> >>> the user to do each step separately. This allows us to, for example,
> >>> get rid of the Plan if in the future everything is DataStream.
> >>> Essentially, I think of these as layers of an onion with the clients
> >>> being close to the core. The higher you go, the more functionality is
> >>> included and hidden from the public eye.
> >>>
> >>> Point iii) by the way is just a thought and by no means final. I also
> >>> like the idea of multi-layered clients so this may spark up the
> >>> discussion.
> >>>
> >>> Cheers,
> >>> Kostas
> >>>
> >>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]>
> >>> wrote:
> >>> >
> >>> > Hi Tison,
> >>> >
> >>> > Thanks for proposing the document! I had some comments on the
> document.
> >>> >
> >>> > I think the only complex thing that we still need to figure out is
> how
> >>> to get a JobClient for a job that is already running. As you mentioned
> in
> >>> the document. Currently I’m thinking that its ok to add a method to
> >>> Executor for retrieving a JobClient for a running job by providing an
> ID.
> >>> Let’s see what Kostas has to say on the topic.
> >>> >
> >>> > Best,
> >>> > Aljoscha
> >>> >
> >>> > > On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote:
> >>> > >
> >>> > > Hi all,
> >>> > >
> >>> > > Summary from the discussion about introducing Flink JobClient
> API[1]
> >>> we
> >>> > > draft FLIP-74[2] to
> >>> > > gather thoughts and towards a standard public user-facing
> interfaces.
> >>> > >
> >>> > > This discussion thread aims at standardizing job level client API.
> >>> But I'd
> >>> > > like to emphasize that
> >>> > > how to retrieve JobClient possibly causes further discussion on
> >>> different
> >>> > > level clients exposed from
> >>> > > Flink so that a following thread will be started later to
> coordinate
> >>> > > FLIP-73 and FLIP-74 on
> >>> > > expose issue.
> >>> > >
> >>> > > Looking forward to your opinions.
> >>> > >
> >>> > > Best,
> >>> > > tison.
> >>> > >
> >>> > > [1]
> >>> > >
> >>>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >>> > > [2]
> >>> > >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> >>> >
> >>>
> >>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Aljoscha Krettek-2
Hi Flavio,

I agree that this would be good to have. But I also think that this is outside the scope of FLIP-74, I think it is an orthogonal feature.

Best,
Aljoscha

> On 27. Sep 2019, at 10:31, Flavio Pompermaier <[hidden email]> wrote:
>
> Hi all,
> just a remark about the Flink REST APIs (and its client as well): almost
> all the times we need a way to dynamically know which jobs are contained in
> a jar file, and this could be exposed by the REST endpoint under
> /jars/:jarid/entry-points (a simple way to implement this would be to check
> the value of Main-class or Main-classes inside the Manifest of the jar if
> they exists [1]).
>
> I understand that this is something that is not strictly required to
> execute Flink jobs but IMHO it would ease A LOT the work of UI developers
> that could have a way to show the users all available jobs inside a jar +
> their configurable parameters.
> For example, right now in the WebUI, you can upload a jar and then you have
> to set (without any autocomplete or UI support) the main class and their
> params (for example using a string like --param1 xx --param2 yy).
> Adding this functionality to the REST API and the respective client would
> enable the WebUI (and all UIs interacting with a Flink cluster) to prefill
> a dropdown list containing the list of entry-point classes (i.e. Flink
> jobs) and, once selected, their required (typed) parameters.
>
> Best,
> Flavio
>
> [1] https://issues.apache.org/jira/browse/FLINK-10864
>
> On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote:
>
>> modify
>>
>> /we just shutdown the cluster on the exit of client that running inside
>> cluster/
>>
>> to
>>
>> we just shutdown the cluster on both the exit of client that running inside
>> cluster and the finish of job.
>> Since client is running inside cluster we can easily wait for the end of
>> two both in ClusterEntrypoint.
>>
>>
>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
>>
>>> About JobCluster
>>>
>>> Actually I am not quite sure what we gains from DETACHED configuration on
>>> cluster side.
>>> We don't have a NON-DETACHED JobCluster in fact in our codebase, right?
>>>
>>> It comes to me one major questions we have to answer first.
>>>
>>> *What JobCluster conceptually is exactly*
>>>
>>> Related discussion can be found in JIRA[1] and mailing list[2]. Stephan
>>> gives a nice
>>> description of JobCluster:
>>>
>>> Two things to add: - The job mode is very nice in the way that it runs
>> the
>>> client inside the cluster (in the same image/process that is the JM) and
>>> thus unifies both applications and what the Spark world calls the "driver
>>> mode". - Another thing I would add is that during the FLIP-6 design, we
>>> were thinking about setups where Dispatcher and JobManager are separate
>>> processes. A Yarn or Mesos Dispatcher of a session could run
>> independently
>>> (even as privileged processes executing no code). Then you the "per-job"
>>> mode could still be helpful: when a job is submitted to the dispatcher,
>> it
>>> launches the JM again in a per-job mode, so that JM and TM processes are
>>> bound to teh job only. For higher security setups, it is important that
>>> processes are not reused across jobs.
>>>
>>> However, currently in "per-job" mode we generate JobGraph in client side,
>>> launching
>>> the JobCluster and retrieve the JobGraph for execution. So actually, we
>>> don't "run the
>>> client inside the cluster".
>>>
>>> Besides, refer to the discussion with Till[1], it would be helpful we
>>> follow the same process
>>> of session mode for that of "per-job" mode in user perspective, that we
>>> don't use
>>> OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink
>>> cluster in env.execute.
>>>
>>> Generally 2 points
>>>
>>> 1. Running Flink job by invoke user main method and execute throughout,
>>> instead of create
>>> JobGraph from main-class.
>>> 2. Run the client inside the cluster.
>>>
>>> If 1 and 2 are implemented. There is obvious no need for DETACHED mode in
>>> cluster side
>>> because we just shutdown the cluster on the exit of client that running
>>> inside cluster. Whether
>>> or not delivered the result is up to user code.
>>>
>>> [1]
>>>
>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
>>> [2]
>>>
>> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
>>>
>>>
>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
>>>
>>>> Thanks for your replies Kostas & Aljoscha!
>>>>
>>>> Below are replies point by point.
>>>>
>>>> 1. For DETACHED mode, what I said there is about the DETACHED mode in
>>>> client side.
>>>> There are two configurations overload the item DETACHED[1].
>>>>
>>>> In client side, it means whether or not client.submitJob is blocking to
>>>> job execution result.
>>>> Due to client.submitJob returns CompletableFuture<JobClient>
>> NON-DETACHED
>>>> is no
>>>> power at all. Caller of submitJob makes the decision whether or not
>>>> blocking to get the
>>>> JobClient and request for the job execution result. If client crashes,
>> it
>>>> is a user scope
>>>> exception that should be handled in user code; if client lost connection
>>>> to cluster, we have
>>>> a retry times and interval configuration that automatically retry and
>>>> throws an user scope
>>>> exception if exceed.
>>>>
>>>> Your comment about poll for result or job result sounds like a concern
>> on
>>>> cluster side.
>>>>
>>>> In cluster side, DETACHED mode is alive only in JobCluster. If DETACHED
>>>> configured,
>>>> JobCluster exits on job finished; if NON-DETACHED configured, JobCluster
>>>> exits on job
>>>> execution result delivered. FLIP-74 doesn't stick to changes on this
>>>> scope, it is just remained.
>>>>
>>>> However, it is an interesting part we can revisit this implementation a
>>>> bit.
>>>>
>>>> <see the next email for compact reply in this one>
>>>>
>>>> 2. The retrieval of JobClient is so important that if we don't have a
>> way
>>>> to retrieve JobClient it is
>>>> a dumb public user-facing interface(what a strange state :P).
>>>>
>>>> About the retrieval of JobClient, as mentioned in the document, two ways
>>>> should be supported.
>>>>
>>>> (1). Retrieved as return type of job submission.
>>>> (2). Retrieve a JobClient of existing job.(with job id)
>>>>
>>>> I highly respect your thoughts about how Executors should be and
>> thoughts
>>>> on multi-layered clients.
>>>> Although, (2) is not supported by public interfaces as summary of
>>>> discussion above, we can discuss
>>>> a bit on the place of Executors on multi-layered clients and find a way
>>>> to retrieve JobClient of
>>>> existing job with public client API. I will comment in FLIP-73 thread[2]
>>>> since it is almost about Executors.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>> [1]
>>>>
>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
>>>> [2]
>>>>
>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
>>>>
>>>>
>>>>
>>>>
>>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
>>>>
>>>>> Hi Tison,
>>>>>
>>>>> Thanks for the FLIP and launching the discussion!
>>>>>
>>>>> As a first note, big +1 on providing/exposing a JobClient to the users!
>>>>>
>>>>> Some points that would be nice to be clarified:
>>>>> 1) You mention that we can get rid of the DETACHED mode: I agree that
>>>>> at a high level, given that everything will now be asynchronous, there
>>>>> is no need to keep the DETACHED mode but I think we should specify
>>>>> some aspects. For example, without the explicit separation of the
>>>>> modes, what happens when the job finishes. Does the client
>>>>> periodically poll for the result always or the result is pushed when
>>>>> in NON-DETACHED mode? What happens if the client disconnects and
>>>>> reconnects?
>>>>>
>>>>> 2) On the "how to retrieve a JobClient for a running Job", I think
>>>>> this is related to the other discussion you opened in the ML about
>>>>> multi-layered clients. First of all, I agree that exposing different
>>>>> "levels" of clients would be a nice addition, and actually there have
>>>>> been some discussions about doing so in the future. Now for this
>>>>> specific discussion:
>>>>>      i) I do not think that we should expose the
>>>>> ClusterDescriptor/ClusterSpecification to the user, as this ties us to
>>>>> a specific architecture which may change in the future.
>>>>>     ii) I do not think it should be the Executor that will provide a
>>>>> JobClient for an already running job (only for the Jobs that it
>>>>> submits). The job of the executor should just be to execute() a
>>>>> pipeline.
>>>>>     iii) I think a solution that respects the separation of concerns
>>>>> could be the addition of another component (in the future), something
>>>>> like a ClientFactory, or ClusterFactory that will have methods like:
>>>>> ClusterClient createCluster(Configuration), JobClient
>>>>> retrieveJobClient(Configuration , JobId), maybe even (although not
>>>>> sure) Executor getExecutor(Configuration ) and maybe more. This
>>>>> component would be responsible to interact with a cluster manager like
>>>>> Yarn and do what is now being done by the ClusterDescriptor plus some
>>>>> more stuff.
>>>>>
>>>>> Although under the hood all these abstractions (Environments,
>>>>> Executors, ...) underneath use the same clients, I believe their
>>>>> job/existence is not contradicting but they simply hide some of the
>>>>> complexity from the user, and give us, as developers some freedom to
>>>>> change in the future some of the parts. For example, the executor will
>>>>> take a Pipeline, create a JobGraph and submit it, instead of requiring
>>>>> the user to do each step separately. This allows us to, for example,
>>>>> get rid of the Plan if in the future everything is DataStream.
>>>>> Essentially, I think of these as layers of an onion with the clients
>>>>> being close to the core. The higher you go, the more functionality is
>>>>> included and hidden from the public eye.
>>>>>
>>>>> Point iii) by the way is just a thought and by no means final. I also
>>>>> like the idea of multi-layered clients so this may spark up the
>>>>> discussion.
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Hi Tison,
>>>>>>
>>>>>> Thanks for proposing the document! I had some comments on the
>> document.
>>>>>>
>>>>>> I think the only complex thing that we still need to figure out is
>> how
>>>>> to get a JobClient for a job that is already running. As you mentioned
>> in
>>>>> the document. Currently I’m thinking that its ok to add a method to
>>>>> Executor for retrieving a JobClient for a running job by providing an
>> ID.
>>>>> Let’s see what Kostas has to say on the topic.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Summary from the discussion about introducing Flink JobClient
>> API[1]
>>>>> we
>>>>>>> draft FLIP-74[2] to
>>>>>>> gather thoughts and towards a standard public user-facing
>> interfaces.
>>>>>>>
>>>>>>> This discussion thread aims at standardizing job level client API.
>>>>> But I'd
>>>>>>> like to emphasize that
>>>>>>> how to retrieve JobClient possibly causes further discussion on
>>>>> different
>>>>>>> level clients exposed from
>>>>>>> Flink so that a following thread will be started later to
>> coordinate
>>>>>>> FLIP-73 and FLIP-74 on
>>>>>>> expose issue.
>>>>>>>
>>>>>>> Looking forward to your opinions.
>>>>>>>
>>>>>>> Best,
>>>>>>> tison.
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>> [2]
>>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
>>>>>>
>>>>>
>>>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Konstantin Knauf-3
Hi Zili,

thanks for working on this topic. Just read through the FLIP and I have two
questions:

* should we add "cancelWithSavepeoint" to a new public API, when we have
deprecated the corresponding REST API/CLI methods? In my understanding
there is no reason to use it anymore.
* should we call "stopWithSavepoint" simply "stop" as "stop" always
performs a savepoint?

Best,

Konstantin



On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <[hidden email]>
wrote:

> Hi Flavio,
>
> I agree that this would be good to have. But I also think that this is
> outside the scope of FLIP-74, I think it is an orthogonal feature.
>
> Best,
> Aljoscha
>
> > On 27. Sep 2019, at 10:31, Flavio Pompermaier <[hidden email]>
> wrote:
> >
> > Hi all,
> > just a remark about the Flink REST APIs (and its client as well): almost
> > all the times we need a way to dynamically know which jobs are contained
> in
> > a jar file, and this could be exposed by the REST endpoint under
> > /jars/:jarid/entry-points (a simple way to implement this would be to
> check
> > the value of Main-class or Main-classes inside the Manifest of the jar if
> > they exists [1]).
> >
> > I understand that this is something that is not strictly required to
> > execute Flink jobs but IMHO it would ease A LOT the work of UI developers
> > that could have a way to show the users all available jobs inside a jar +
> > their configurable parameters.
> > For example, right now in the WebUI, you can upload a jar and then you
> have
> > to set (without any autocomplete or UI support) the main class and their
> > params (for example using a string like --param1 xx --param2 yy).
> > Adding this functionality to the REST API and the respective client would
> > enable the WebUI (and all UIs interacting with a Flink cluster) to
> prefill
> > a dropdown list containing the list of entry-point classes (i.e. Flink
> > jobs) and, once selected, their required (typed) parameters.
> >
> > Best,
> > Flavio
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-10864
> >
> > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]> wrote:
> >
> >> modify
> >>
> >> /we just shutdown the cluster on the exit of client that running inside
> >> cluster/
> >>
> >> to
> >>
> >> we just shutdown the cluster on both the exit of client that running
> inside
> >> cluster and the finish of job.
> >> Since client is running inside cluster we can easily wait for the end of
> >> two both in ClusterEntrypoint.
> >>
> >>
> >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
> >>
> >>> About JobCluster
> >>>
> >>> Actually I am not quite sure what we gains from DETACHED configuration
> on
> >>> cluster side.
> >>> We don't have a NON-DETACHED JobCluster in fact in our codebase, right?
> >>>
> >>> It comes to me one major questions we have to answer first.
> >>>
> >>> *What JobCluster conceptually is exactly*
> >>>
> >>> Related discussion can be found in JIRA[1] and mailing list[2]. Stephan
> >>> gives a nice
> >>> description of JobCluster:
> >>>
> >>> Two things to add: - The job mode is very nice in the way that it runs
> >> the
> >>> client inside the cluster (in the same image/process that is the JM)
> and
> >>> thus unifies both applications and what the Spark world calls the
> "driver
> >>> mode". - Another thing I would add is that during the FLIP-6 design, we
> >>> were thinking about setups where Dispatcher and JobManager are separate
> >>> processes. A Yarn or Mesos Dispatcher of a session could run
> >> independently
> >>> (even as privileged processes executing no code). Then you the
> "per-job"
> >>> mode could still be helpful: when a job is submitted to the dispatcher,
> >> it
> >>> launches the JM again in a per-job mode, so that JM and TM processes
> are
> >>> bound to teh job only. For higher security setups, it is important that
> >>> processes are not reused across jobs.
> >>>
> >>> However, currently in "per-job" mode we generate JobGraph in client
> side,
> >>> launching
> >>> the JobCluster and retrieve the JobGraph for execution. So actually, we
> >>> don't "run the
> >>> client inside the cluster".
> >>>
> >>> Besides, refer to the discussion with Till[1], it would be helpful we
> >>> follow the same process
> >>> of session mode for that of "per-job" mode in user perspective, that we
> >>> don't use
> >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy Flink
> >>> cluster in env.execute.
> >>>
> >>> Generally 2 points
> >>>
> >>> 1. Running Flink job by invoke user main method and execute throughout,
> >>> instead of create
> >>> JobGraph from main-class.
> >>> 2. Run the client inside the cluster.
> >>>
> >>> If 1 and 2 are implemented. There is obvious no need for DETACHED mode
> in
> >>> cluster side
> >>> because we just shutdown the cluster on the exit of client that running
> >>> inside cluster. Whether
> >>> or not delivered the result is up to user code.
> >>>
> >>> [1]
> >>>
> >>
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
> >>> [2]
> >>>
> >>
> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
> >>>
> >>>
> >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
> >>>
> >>>> Thanks for your replies Kostas & Aljoscha!
> >>>>
> >>>> Below are replies point by point.
> >>>>
> >>>> 1. For DETACHED mode, what I said there is about the DETACHED mode in
> >>>> client side.
> >>>> There are two configurations overload the item DETACHED[1].
> >>>>
> >>>> In client side, it means whether or not client.submitJob is blocking
> to
> >>>> job execution result.
> >>>> Due to client.submitJob returns CompletableFuture<JobClient>
> >> NON-DETACHED
> >>>> is no
> >>>> power at all. Caller of submitJob makes the decision whether or not
> >>>> blocking to get the
> >>>> JobClient and request for the job execution result. If client crashes,
> >> it
> >>>> is a user scope
> >>>> exception that should be handled in user code; if client lost
> connection
> >>>> to cluster, we have
> >>>> a retry times and interval configuration that automatically retry and
> >>>> throws an user scope
> >>>> exception if exceed.
> >>>>
> >>>> Your comment about poll for result or job result sounds like a concern
> >> on
> >>>> cluster side.
> >>>>
> >>>> In cluster side, DETACHED mode is alive only in JobCluster. If
> DETACHED
> >>>> configured,
> >>>> JobCluster exits on job finished; if NON-DETACHED configured,
> JobCluster
> >>>> exits on job
> >>>> execution result delivered. FLIP-74 doesn't stick to changes on this
> >>>> scope, it is just remained.
> >>>>
> >>>> However, it is an interesting part we can revisit this implementation
> a
> >>>> bit.
> >>>>
> >>>> <see the next email for compact reply in this one>
> >>>>
> >>>> 2. The retrieval of JobClient is so important that if we don't have a
> >> way
> >>>> to retrieve JobClient it is
> >>>> a dumb public user-facing interface(what a strange state :P).
> >>>>
> >>>> About the retrieval of JobClient, as mentioned in the document, two
> ways
> >>>> should be supported.
> >>>>
> >>>> (1). Retrieved as return type of job submission.
> >>>> (2). Retrieve a JobClient of existing job.(with job id)
> >>>>
> >>>> I highly respect your thoughts about how Executors should be and
> >> thoughts
> >>>> on multi-layered clients.
> >>>> Although, (2) is not supported by public interfaces as summary of
> >>>> discussion above, we can discuss
> >>>> a bit on the place of Executors on multi-layered clients and find a
> way
> >>>> to retrieve JobClient of
> >>>> existing job with public client API. I will comment in FLIP-73
> thread[2]
> >>>> since it is almost about Executors.
> >>>>
> >>>> Best,
> >>>> tison.
> >>>>
> >>>> [1]
> >>>>
> >>
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
> >>>> [2]
> >>>>
> >>
> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
> >>>>
> >>>>> Hi Tison,
> >>>>>
> >>>>> Thanks for the FLIP and launching the discussion!
> >>>>>
> >>>>> As a first note, big +1 on providing/exposing a JobClient to the
> users!
> >>>>>
> >>>>> Some points that would be nice to be clarified:
> >>>>> 1) You mention that we can get rid of the DETACHED mode: I agree that
> >>>>> at a high level, given that everything will now be asynchronous,
> there
> >>>>> is no need to keep the DETACHED mode but I think we should specify
> >>>>> some aspects. For example, without the explicit separation of the
> >>>>> modes, what happens when the job finishes. Does the client
> >>>>> periodically poll for the result always or the result is pushed when
> >>>>> in NON-DETACHED mode? What happens if the client disconnects and
> >>>>> reconnects?
> >>>>>
> >>>>> 2) On the "how to retrieve a JobClient for a running Job", I think
> >>>>> this is related to the other discussion you opened in the ML about
> >>>>> multi-layered clients. First of all, I agree that exposing different
> >>>>> "levels" of clients would be a nice addition, and actually there have
> >>>>> been some discussions about doing so in the future. Now for this
> >>>>> specific discussion:
> >>>>>      i) I do not think that we should expose the
> >>>>> ClusterDescriptor/ClusterSpecification to the user, as this ties us
> to
> >>>>> a specific architecture which may change in the future.
> >>>>>     ii) I do not think it should be the Executor that will provide a
> >>>>> JobClient for an already running job (only for the Jobs that it
> >>>>> submits). The job of the executor should just be to execute() a
> >>>>> pipeline.
> >>>>>     iii) I think a solution that respects the separation of concerns
> >>>>> could be the addition of another component (in the future), something
> >>>>> like a ClientFactory, or ClusterFactory that will have methods like:
> >>>>> ClusterClient createCluster(Configuration), JobClient
> >>>>> retrieveJobClient(Configuration , JobId), maybe even (although not
> >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This
> >>>>> component would be responsible to interact with a cluster manager
> like
> >>>>> Yarn and do what is now being done by the ClusterDescriptor plus some
> >>>>> more stuff.
> >>>>>
> >>>>> Although under the hood all these abstractions (Environments,
> >>>>> Executors, ...) underneath use the same clients, I believe their
> >>>>> job/existence is not contradicting but they simply hide some of the
> >>>>> complexity from the user, and give us, as developers some freedom to
> >>>>> change in the future some of the parts. For example, the executor
> will
> >>>>> take a Pipeline, create a JobGraph and submit it, instead of
> requiring
> >>>>> the user to do each step separately. This allows us to, for example,
> >>>>> get rid of the Plan if in the future everything is DataStream.
> >>>>> Essentially, I think of these as layers of an onion with the clients
> >>>>> being close to the core. The higher you go, the more functionality is
> >>>>> included and hidden from the public eye.
> >>>>>
> >>>>> Point iii) by the way is just a thought and by no means final. I also
> >>>>> like the idea of multi-layered clients so this may spark up the
> >>>>> discussion.
> >>>>>
> >>>>> Cheers,
> >>>>> Kostas
> >>>>>
> >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
> [hidden email]>
> >>>>> wrote:
> >>>>>>
> >>>>>> Hi Tison,
> >>>>>>
> >>>>>> Thanks for proposing the document! I had some comments on the
> >> document.
> >>>>>>
> >>>>>> I think the only complex thing that we still need to figure out is
> >> how
> >>>>> to get a JobClient for a job that is already running. As you
> mentioned
> >> in
> >>>>> the document. Currently I’m thinking that its ok to add a method to
> >>>>> Executor for retrieving a JobClient for a running job by providing an
> >> ID.
> >>>>> Let’s see what Kostas has to say on the topic.
> >>>>>>
> >>>>>> Best,
> >>>>>> Aljoscha
> >>>>>>
> >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]> wrote:
> >>>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> Summary from the discussion about introducing Flink JobClient
> >> API[1]
> >>>>> we
> >>>>>>> draft FLIP-74[2] to
> >>>>>>> gather thoughts and towards a standard public user-facing
> >> interfaces.
> >>>>>>>
> >>>>>>> This discussion thread aims at standardizing job level client API.
> >>>>> But I'd
> >>>>>>> like to emphasize that
> >>>>>>> how to retrieve JobClient possibly causes further discussion on
> >>>>> different
> >>>>>>> level clients exposed from
> >>>>>>> Flink so that a following thread will be started later to
> >> coordinate
> >>>>>>> FLIP-73 and FLIP-74 on
> >>>>>>> expose issue.
> >>>>>>>
> >>>>>>> Looking forward to your opinions.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> tison.
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>
> >>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >>>>>>> [2]
> >>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> >>>>>>
> >>>>>
> >>>>
>
>

--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData Ververica <https://www.ververica.com/>


--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Thomas Weise
I did not realize there was a plan to deprecate anything in the REST API?

The REST API is super important for tooling written in non JVM languages,
that does not include a Flink client (like FlinkK8sOperator). The REST API
should continue to support all job management operations, including job
submission.

Thomas


On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <[hidden email]>
wrote:

> Hi Zili,
>
> thanks for working on this topic. Just read through the FLIP and I have two
> questions:
>
> * should we add "cancelWithSavepeoint" to a new public API, when we have
> deprecated the corresponding REST API/CLI methods? In my understanding
> there is no reason to use it anymore.
> * should we call "stopWithSavepoint" simply "stop" as "stop" always
> performs a savepoint?
>
> Best,
>
> Konstantin
>
>
>
> On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi Flavio,
> >
> > I agree that this would be good to have. But I also think that this is
> > outside the scope of FLIP-74, I think it is an orthogonal feature.
> >
> > Best,
> > Aljoscha
> >
> > > On 27. Sep 2019, at 10:31, Flavio Pompermaier <[hidden email]>
> > wrote:
> > >
> > > Hi all,
> > > just a remark about the Flink REST APIs (and its client as well):
> almost
> > > all the times we need a way to dynamically know which jobs are
> contained
> > in
> > > a jar file, and this could be exposed by the REST endpoint under
> > > /jars/:jarid/entry-points (a simple way to implement this would be to
> > check
> > > the value of Main-class or Main-classes inside the Manifest of the jar
> if
> > > they exists [1]).
> > >
> > > I understand that this is something that is not strictly required to
> > > execute Flink jobs but IMHO it would ease A LOT the work of UI
> developers
> > > that could have a way to show the users all available jobs inside a
> jar +
> > > their configurable parameters.
> > > For example, right now in the WebUI, you can upload a jar and then you
> > have
> > > to set (without any autocomplete or UI support) the main class and
> their
> > > params (for example using a string like --param1 xx --param2 yy).
> > > Adding this functionality to the REST API and the respective client
> would
> > > enable the WebUI (and all UIs interacting with a Flink cluster) to
> > prefill
> > > a dropdown list containing the list of entry-point classes (i.e. Flink
> > > jobs) and, once selected, their required (typed) parameters.
> > >
> > > Best,
> > > Flavio
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-10864
> > >
> > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]>
> wrote:
> > >
> > >> modify
> > >>
> > >> /we just shutdown the cluster on the exit of client that running
> inside
> > >> cluster/
> > >>
> > >> to
> > >>
> > >> we just shutdown the cluster on both the exit of client that running
> > inside
> > >> cluster and the finish of job.
> > >> Since client is running inside cluster we can easily wait for the end
> of
> > >> two both in ClusterEntrypoint.
> > >>
> > >>
> > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
> > >>
> > >>> About JobCluster
> > >>>
> > >>> Actually I am not quite sure what we gains from DETACHED
> configuration
> > on
> > >>> cluster side.
> > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase,
> right?
> > >>>
> > >>> It comes to me one major questions we have to answer first.
> > >>>
> > >>> *What JobCluster conceptually is exactly*
> > >>>
> > >>> Related discussion can be found in JIRA[1] and mailing list[2].
> Stephan
> > >>> gives a nice
> > >>> description of JobCluster:
> > >>>
> > >>> Two things to add: - The job mode is very nice in the way that it
> runs
> > >> the
> > >>> client inside the cluster (in the same image/process that is the JM)
> > and
> > >>> thus unifies both applications and what the Spark world calls the
> > "driver
> > >>> mode". - Another thing I would add is that during the FLIP-6 design,
> we
> > >>> were thinking about setups where Dispatcher and JobManager are
> separate
> > >>> processes. A Yarn or Mesos Dispatcher of a session could run
> > >> independently
> > >>> (even as privileged processes executing no code). Then you the
> > "per-job"
> > >>> mode could still be helpful: when a job is submitted to the
> dispatcher,
> > >> it
> > >>> launches the JM again in a per-job mode, so that JM and TM processes
> > are
> > >>> bound to teh job only. For higher security setups, it is important
> that
> > >>> processes are not reused across jobs.
> > >>>
> > >>> However, currently in "per-job" mode we generate JobGraph in client
> > side,
> > >>> launching
> > >>> the JobCluster and retrieve the JobGraph for execution. So actually,
> we
> > >>> don't "run the
> > >>> client inside the cluster".
> > >>>
> > >>> Besides, refer to the discussion with Till[1], it would be helpful we
> > >>> follow the same process
> > >>> of session mode for that of "per-job" mode in user perspective, that
> we
> > >>> don't use
> > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy
> Flink
> > >>> cluster in env.execute.
> > >>>
> > >>> Generally 2 points
> > >>>
> > >>> 1. Running Flink job by invoke user main method and execute
> throughout,
> > >>> instead of create
> > >>> JobGraph from main-class.
> > >>> 2. Run the client inside the cluster.
> > >>>
> > >>> If 1 and 2 are implemented. There is obvious no need for DETACHED
> mode
> > in
> > >>> cluster side
> > >>> because we just shutdown the cluster on the exit of client that
> running
> > >>> inside cluster. Whether
> > >>> or not delivered the result is up to user code.
> > >>>
> > >>> [1]
> > >>>
> > >>
> >
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
> > >>> [2]
> > >>>
> > >>
> >
> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
> > >>>
> > >>>
> > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
> > >>>
> > >>>> Thanks for your replies Kostas & Aljoscha!
> > >>>>
> > >>>> Below are replies point by point.
> > >>>>
> > >>>> 1. For DETACHED mode, what I said there is about the DETACHED mode
> in
> > >>>> client side.
> > >>>> There are two configurations overload the item DETACHED[1].
> > >>>>
> > >>>> In client side, it means whether or not client.submitJob is blocking
> > to
> > >>>> job execution result.
> > >>>> Due to client.submitJob returns CompletableFuture<JobClient>
> > >> NON-DETACHED
> > >>>> is no
> > >>>> power at all. Caller of submitJob makes the decision whether or not
> > >>>> blocking to get the
> > >>>> JobClient and request for the job execution result. If client
> crashes,
> > >> it
> > >>>> is a user scope
> > >>>> exception that should be handled in user code; if client lost
> > connection
> > >>>> to cluster, we have
> > >>>> a retry times and interval configuration that automatically retry
> and
> > >>>> throws an user scope
> > >>>> exception if exceed.
> > >>>>
> > >>>> Your comment about poll for result or job result sounds like a
> concern
> > >> on
> > >>>> cluster side.
> > >>>>
> > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If
> > DETACHED
> > >>>> configured,
> > >>>> JobCluster exits on job finished; if NON-DETACHED configured,
> > JobCluster
> > >>>> exits on job
> > >>>> execution result delivered. FLIP-74 doesn't stick to changes on this
> > >>>> scope, it is just remained.
> > >>>>
> > >>>> However, it is an interesting part we can revisit this
> implementation
> > a
> > >>>> bit.
> > >>>>
> > >>>> <see the next email for compact reply in this one>
> > >>>>
> > >>>> 2. The retrieval of JobClient is so important that if we don't have
> a
> > >> way
> > >>>> to retrieve JobClient it is
> > >>>> a dumb public user-facing interface(what a strange state :P).
> > >>>>
> > >>>> About the retrieval of JobClient, as mentioned in the document, two
> > ways
> > >>>> should be supported.
> > >>>>
> > >>>> (1). Retrieved as return type of job submission.
> > >>>> (2). Retrieve a JobClient of existing job.(with job id)
> > >>>>
> > >>>> I highly respect your thoughts about how Executors should be and
> > >> thoughts
> > >>>> on multi-layered clients.
> > >>>> Although, (2) is not supported by public interfaces as summary of
> > >>>> discussion above, we can discuss
> > >>>> a bit on the place of Executors on multi-layered clients and find a
> > way
> > >>>> to retrieve JobClient of
> > >>>> existing job with public client API. I will comment in FLIP-73
> > thread[2]
> > >>>> since it is almost about Executors.
> > >>>>
> > >>>> Best,
> > >>>> tison.
> > >>>>
> > >>>> [1]
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
> > >>>> [2]
> > >>>>
> > >>
> >
> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
> > >>>>
> > >>>>> Hi Tison,
> > >>>>>
> > >>>>> Thanks for the FLIP and launching the discussion!
> > >>>>>
> > >>>>> As a first note, big +1 on providing/exposing a JobClient to the
> > users!
> > >>>>>
> > >>>>> Some points that would be nice to be clarified:
> > >>>>> 1) You mention that we can get rid of the DETACHED mode: I agree
> that
> > >>>>> at a high level, given that everything will now be asynchronous,
> > there
> > >>>>> is no need to keep the DETACHED mode but I think we should specify
> > >>>>> some aspects. For example, without the explicit separation of the
> > >>>>> modes, what happens when the job finishes. Does the client
> > >>>>> periodically poll for the result always or the result is pushed
> when
> > >>>>> in NON-DETACHED mode? What happens if the client disconnects and
> > >>>>> reconnects?
> > >>>>>
> > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I think
> > >>>>> this is related to the other discussion you opened in the ML about
> > >>>>> multi-layered clients. First of all, I agree that exposing
> different
> > >>>>> "levels" of clients would be a nice addition, and actually there
> have
> > >>>>> been some discussions about doing so in the future. Now for this
> > >>>>> specific discussion:
> > >>>>>      i) I do not think that we should expose the
> > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this ties us
> > to
> > >>>>> a specific architecture which may change in the future.
> > >>>>>     ii) I do not think it should be the Executor that will provide
> a
> > >>>>> JobClient for an already running job (only for the Jobs that it
> > >>>>> submits). The job of the executor should just be to execute() a
> > >>>>> pipeline.
> > >>>>>     iii) I think a solution that respects the separation of
> concerns
> > >>>>> could be the addition of another component (in the future),
> something
> > >>>>> like a ClientFactory, or ClusterFactory that will have methods
> like:
> > >>>>> ClusterClient createCluster(Configuration), JobClient
> > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although not
> > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This
> > >>>>> component would be responsible to interact with a cluster manager
> > like
> > >>>>> Yarn and do what is now being done by the ClusterDescriptor plus
> some
> > >>>>> more stuff.
> > >>>>>
> > >>>>> Although under the hood all these abstractions (Environments,
> > >>>>> Executors, ...) underneath use the same clients, I believe their
> > >>>>> job/existence is not contradicting but they simply hide some of the
> > >>>>> complexity from the user, and give us, as developers some freedom
> to
> > >>>>> change in the future some of the parts. For example, the executor
> > will
> > >>>>> take a Pipeline, create a JobGraph and submit it, instead of
> > requiring
> > >>>>> the user to do each step separately. This allows us to, for
> example,
> > >>>>> get rid of the Plan if in the future everything is DataStream.
> > >>>>> Essentially, I think of these as layers of an onion with the
> clients
> > >>>>> being close to the core. The higher you go, the more functionality
> is
> > >>>>> included and hidden from the public eye.
> > >>>>>
> > >>>>> Point iii) by the way is just a thought and by no means final. I
> also
> > >>>>> like the idea of multi-layered clients so this may spark up the
> > >>>>> discussion.
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Kostas
> > >>>>>
> > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
> > [hidden email]>
> > >>>>> wrote:
> > >>>>>>
> > >>>>>> Hi Tison,
> > >>>>>>
> > >>>>>> Thanks for proposing the document! I had some comments on the
> > >> document.
> > >>>>>>
> > >>>>>> I think the only complex thing that we still need to figure out is
> > >> how
> > >>>>> to get a JobClient for a job that is already running. As you
> > mentioned
> > >> in
> > >>>>> the document. Currently I’m thinking that its ok to add a method to
> > >>>>> Executor for retrieving a JobClient for a running job by providing
> an
> > >> ID.
> > >>>>> Let’s see what Kostas has to say on the topic.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> Aljoscha
> > >>>>>>
> > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]>
> wrote:
> > >>>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> Summary from the discussion about introducing Flink JobClient
> > >> API[1]
> > >>>>> we
> > >>>>>>> draft FLIP-74[2] to
> > >>>>>>> gather thoughts and towards a standard public user-facing
> > >> interfaces.
> > >>>>>>>
> > >>>>>>> This discussion thread aims at standardizing job level client
> API.
> > >>>>> But I'd
> > >>>>>>> like to emphasize that
> > >>>>>>> how to retrieve JobClient possibly causes further discussion on
> > >>>>> different
> > >>>>>>> level clients exposed from
> > >>>>>>> Flink so that a following thread will be started later to
> > >> coordinate
> > >>>>>>> FLIP-73 and FLIP-74 on
> > >>>>>>> expose issue.
> > >>>>>>>
> > >>>>>>> Looking forward to your opinions.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> tison.
> > >>>>>>>
> > >>>>>>> [1]
> > >>>>>>>
> > >>>>>
> > >>
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > >>>>>>> [2]
> > >>>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> > >>>>>>
> > >>>>>
> > >>>>
> >
> >
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Konstantin Knauf-3
Hi Thomas,

maybe there is a misunderstanding. There is no plan to deprecate anything
in the REST API in the process of introducing the JobClient API, and it
shouldn't.

Since "cancel with savepoint" was already deprecated in the REST API and
CLI, I am just raising the question whether to add it to the JobClient API
in the first place.

Best,

Konstantin



On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote:

> I did not realize there was a plan to deprecate anything in the REST API?
>
> The REST API is super important for tooling written in non JVM languages,
> that does not include a Flink client (like FlinkK8sOperator). The REST API
> should continue to support all job management operations, including job
> submission.
>
> Thomas
>
>
> On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <[hidden email]
> >
> wrote:
>
> > Hi Zili,
> >
> > thanks for working on this topic. Just read through the FLIP and I have
> two
> > questions:
> >
> > * should we add "cancelWithSavepeoint" to a new public API, when we have
> > deprecated the corresponding REST API/CLI methods? In my understanding
> > there is no reason to use it anymore.
> > * should we call "stopWithSavepoint" simply "stop" as "stop" always
> > performs a savepoint?
> >
> > Best,
> >
> > Konstantin
> >
> >
> >
> > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi Flavio,
> > >
> > > I agree that this would be good to have. But I also think that this is
> > > outside the scope of FLIP-74, I think it is an orthogonal feature.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier <[hidden email]>
> > > wrote:
> > > >
> > > > Hi all,
> > > > just a remark about the Flink REST APIs (and its client as well):
> > almost
> > > > all the times we need a way to dynamically know which jobs are
> > contained
> > > in
> > > > a jar file, and this could be exposed by the REST endpoint under
> > > > /jars/:jarid/entry-points (a simple way to implement this would be to
> > > check
> > > > the value of Main-class or Main-classes inside the Manifest of the
> jar
> > if
> > > > they exists [1]).
> > > >
> > > > I understand that this is something that is not strictly required to
> > > > execute Flink jobs but IMHO it would ease A LOT the work of UI
> > developers
> > > > that could have a way to show the users all available jobs inside a
> > jar +
> > > > their configurable parameters.
> > > > For example, right now in the WebUI, you can upload a jar and then
> you
> > > have
> > > > to set (without any autocomplete or UI support) the main class and
> > their
> > > > params (for example using a string like --param1 xx --param2 yy).
> > > > Adding this functionality to the REST API and the respective client
> > would
> > > > enable the WebUI (and all UIs interacting with a Flink cluster) to
> > > prefill
> > > > a dropdown list containing the list of entry-point classes (i.e.
> Flink
> > > > jobs) and, once selected, their required (typed) parameters.
> > > >
> > > > Best,
> > > > Flavio
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-10864
> > > >
> > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]>
> > wrote:
> > > >
> > > >> modify
> > > >>
> > > >> /we just shutdown the cluster on the exit of client that running
> > inside
> > > >> cluster/
> > > >>
> > > >> to
> > > >>
> > > >> we just shutdown the cluster on both the exit of client that running
> > > inside
> > > >> cluster and the finish of job.
> > > >> Since client is running inside cluster we can easily wait for the
> end
> > of
> > > >> two both in ClusterEntrypoint.
> > > >>
> > > >>
> > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
> > > >>
> > > >>> About JobCluster
> > > >>>
> > > >>> Actually I am not quite sure what we gains from DETACHED
> > configuration
> > > on
> > > >>> cluster side.
> > > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase,
> > right?
> > > >>>
> > > >>> It comes to me one major questions we have to answer first.
> > > >>>
> > > >>> *What JobCluster conceptually is exactly*
> > > >>>
> > > >>> Related discussion can be found in JIRA[1] and mailing list[2].
> > Stephan
> > > >>> gives a nice
> > > >>> description of JobCluster:
> > > >>>
> > > >>> Two things to add: - The job mode is very nice in the way that it
> > runs
> > > >> the
> > > >>> client inside the cluster (in the same image/process that is the
> JM)
> > > and
> > > >>> thus unifies both applications and what the Spark world calls the
> > > "driver
> > > >>> mode". - Another thing I would add is that during the FLIP-6
> design,
> > we
> > > >>> were thinking about setups where Dispatcher and JobManager are
> > separate
> > > >>> processes. A Yarn or Mesos Dispatcher of a session could run
> > > >> independently
> > > >>> (even as privileged processes executing no code). Then you the
> > > "per-job"
> > > >>> mode could still be helpful: when a job is submitted to the
> > dispatcher,
> > > >> it
> > > >>> launches the JM again in a per-job mode, so that JM and TM
> processes
> > > are
> > > >>> bound to teh job only. For higher security setups, it is important
> > that
> > > >>> processes are not reused across jobs.
> > > >>>
> > > >>> However, currently in "per-job" mode we generate JobGraph in client
> > > side,
> > > >>> launching
> > > >>> the JobCluster and retrieve the JobGraph for execution. So
> actually,
> > we
> > > >>> don't "run the
> > > >>> client inside the cluster".
> > > >>>
> > > >>> Besides, refer to the discussion with Till[1], it would be helpful
> we
> > > >>> follow the same process
> > > >>> of session mode for that of "per-job" mode in user perspective,
> that
> > we
> > > >>> don't use
> > > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy
> > Flink
> > > >>> cluster in env.execute.
> > > >>>
> > > >>> Generally 2 points
> > > >>>
> > > >>> 1. Running Flink job by invoke user main method and execute
> > throughout,
> > > >>> instead of create
> > > >>> JobGraph from main-class.
> > > >>> 2. Run the client inside the cluster.
> > > >>>
> > > >>> If 1 and 2 are implemented. There is obvious no need for DETACHED
> > mode
> > > in
> > > >>> cluster side
> > > >>> because we just shutdown the cluster on the exit of client that
> > running
> > > >>> inside cluster. Whether
> > > >>> or not delivered the result is up to user code.
> > > >>>
> > > >>> [1]
> > > >>>
> > > >>
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
> > > >>> [2]
> > > >>>
> > > >>
> > >
> >
> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
> > > >>>
> > > >>>
> > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
> > > >>>
> > > >>>> Thanks for your replies Kostas & Aljoscha!
> > > >>>>
> > > >>>> Below are replies point by point.
> > > >>>>
> > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED mode
> > in
> > > >>>> client side.
> > > >>>> There are two configurations overload the item DETACHED[1].
> > > >>>>
> > > >>>> In client side, it means whether or not client.submitJob is
> blocking
> > > to
> > > >>>> job execution result.
> > > >>>> Due to client.submitJob returns CompletableFuture<JobClient>
> > > >> NON-DETACHED
> > > >>>> is no
> > > >>>> power at all. Caller of submitJob makes the decision whether or
> not
> > > >>>> blocking to get the
> > > >>>> JobClient and request for the job execution result. If client
> > crashes,
> > > >> it
> > > >>>> is a user scope
> > > >>>> exception that should be handled in user code; if client lost
> > > connection
> > > >>>> to cluster, we have
> > > >>>> a retry times and interval configuration that automatically retry
> > and
> > > >>>> throws an user scope
> > > >>>> exception if exceed.
> > > >>>>
> > > >>>> Your comment about poll for result or job result sounds like a
> > concern
> > > >> on
> > > >>>> cluster side.
> > > >>>>
> > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If
> > > DETACHED
> > > >>>> configured,
> > > >>>> JobCluster exits on job finished; if NON-DETACHED configured,
> > > JobCluster
> > > >>>> exits on job
> > > >>>> execution result delivered. FLIP-74 doesn't stick to changes on
> this
> > > >>>> scope, it is just remained.
> > > >>>>
> > > >>>> However, it is an interesting part we can revisit this
> > implementation
> > > a
> > > >>>> bit.
> > > >>>>
> > > >>>> <see the next email for compact reply in this one>
> > > >>>>
> > > >>>> 2. The retrieval of JobClient is so important that if we don't
> have
> > a
> > > >> way
> > > >>>> to retrieve JobClient it is
> > > >>>> a dumb public user-facing interface(what a strange state :P).
> > > >>>>
> > > >>>> About the retrieval of JobClient, as mentioned in the document,
> two
> > > ways
> > > >>>> should be supported.
> > > >>>>
> > > >>>> (1). Retrieved as return type of job submission.
> > > >>>> (2). Retrieve a JobClient of existing job.(with job id)
> > > >>>>
> > > >>>> I highly respect your thoughts about how Executors should be and
> > > >> thoughts
> > > >>>> on multi-layered clients.
> > > >>>> Although, (2) is not supported by public interfaces as summary of
> > > >>>> discussion above, we can discuss
> > > >>>> a bit on the place of Executors on multi-layered clients and find
> a
> > > way
> > > >>>> to retrieve JobClient of
> > > >>>> existing job with public client API. I will comment in FLIP-73
> > > thread[2]
> > > >>>> since it is almost about Executors.
> > > >>>>
> > > >>>> Best,
> > > >>>> tison.
> > > >>>>
> > > >>>> [1]
> > > >>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
> > > >>>> [2]
> > > >>>>
> > > >>
> > >
> >
> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
> > > >>>>
> > > >>>>> Hi Tison,
> > > >>>>>
> > > >>>>> Thanks for the FLIP and launching the discussion!
> > > >>>>>
> > > >>>>> As a first note, big +1 on providing/exposing a JobClient to the
> > > users!
> > > >>>>>
> > > >>>>> Some points that would be nice to be clarified:
> > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I agree
> > that
> > > >>>>> at a high level, given that everything will now be asynchronous,
> > > there
> > > >>>>> is no need to keep the DETACHED mode but I think we should
> specify
> > > >>>>> some aspects. For example, without the explicit separation of the
> > > >>>>> modes, what happens when the job finishes. Does the client
> > > >>>>> periodically poll for the result always or the result is pushed
> > when
> > > >>>>> in NON-DETACHED mode? What happens if the client disconnects and
> > > >>>>> reconnects?
> > > >>>>>
> > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I
> think
> > > >>>>> this is related to the other discussion you opened in the ML
> about
> > > >>>>> multi-layered clients. First of all, I agree that exposing
> > different
> > > >>>>> "levels" of clients would be a nice addition, and actually there
> > have
> > > >>>>> been some discussions about doing so in the future. Now for this
> > > >>>>> specific discussion:
> > > >>>>>      i) I do not think that we should expose the
> > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this ties
> us
> > > to
> > > >>>>> a specific architecture which may change in the future.
> > > >>>>>     ii) I do not think it should be the Executor that will
> provide
> > a
> > > >>>>> JobClient for an already running job (only for the Jobs that it
> > > >>>>> submits). The job of the executor should just be to execute() a
> > > >>>>> pipeline.
> > > >>>>>     iii) I think a solution that respects the separation of
> > concerns
> > > >>>>> could be the addition of another component (in the future),
> > something
> > > >>>>> like a ClientFactory, or ClusterFactory that will have methods
> > like:
> > > >>>>> ClusterClient createCluster(Configuration), JobClient
> > > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although
> not
> > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This
> > > >>>>> component would be responsible to interact with a cluster manager
> > > like
> > > >>>>> Yarn and do what is now being done by the ClusterDescriptor plus
> > some
> > > >>>>> more stuff.
> > > >>>>>
> > > >>>>> Although under the hood all these abstractions (Environments,
> > > >>>>> Executors, ...) underneath use the same clients, I believe their
> > > >>>>> job/existence is not contradicting but they simply hide some of
> the
> > > >>>>> complexity from the user, and give us, as developers some freedom
> > to
> > > >>>>> change in the future some of the parts. For example, the executor
> > > will
> > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of
> > > requiring
> > > >>>>> the user to do each step separately. This allows us to, for
> > example,
> > > >>>>> get rid of the Plan if in the future everything is DataStream.
> > > >>>>> Essentially, I think of these as layers of an onion with the
> > clients
> > > >>>>> being close to the core. The higher you go, the more
> functionality
> > is
> > > >>>>> included and hidden from the public eye.
> > > >>>>>
> > > >>>>> Point iii) by the way is just a thought and by no means final. I
> > also
> > > >>>>> like the idea of multi-layered clients so this may spark up the
> > > >>>>> discussion.
> > > >>>>>
> > > >>>>> Cheers,
> > > >>>>> Kostas
> > > >>>>>
> > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
> > > [hidden email]>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>> Hi Tison,
> > > >>>>>>
> > > >>>>>> Thanks for proposing the document! I had some comments on the
> > > >> document.
> > > >>>>>>
> > > >>>>>> I think the only complex thing that we still need to figure out
> is
> > > >> how
> > > >>>>> to get a JobClient for a job that is already running. As you
> > > mentioned
> > > >> in
> > > >>>>> the document. Currently I’m thinking that its ok to add a method
> to
> > > >>>>> Executor for retrieving a JobClient for a running job by
> providing
> > an
> > > >> ID.
> > > >>>>> Let’s see what Kostas has to say on the topic.
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Aljoscha
> > > >>>>>>
> > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]>
> > wrote:
> > > >>>>>>>
> > > >>>>>>> Hi all,
> > > >>>>>>>
> > > >>>>>>> Summary from the discussion about introducing Flink JobClient
> > > >> API[1]
> > > >>>>> we
> > > >>>>>>> draft FLIP-74[2] to
> > > >>>>>>> gather thoughts and towards a standard public user-facing
> > > >> interfaces.
> > > >>>>>>>
> > > >>>>>>> This discussion thread aims at standardizing job level client
> > API.
> > > >>>>> But I'd
> > > >>>>>>> like to emphasize that
> > > >>>>>>> how to retrieve JobClient possibly causes further discussion on
> > > >>>>> different
> > > >>>>>>> level clients exposed from
> > > >>>>>>> Flink so that a following thread will be started later to
> > > >> coordinate
> > > >>>>>>> FLIP-73 and FLIP-74 on
> > > >>>>>>> expose issue.
> > > >>>>>>>
> > > >>>>>>> Looking forward to your opinions.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> tison.
> > > >>>>>>>
> > > >>>>>>> [1]
> > > >>>>>>>
> > > >>>>>
> > > >>
> > >
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > >>>>>>> [2]
> > > >>>>>>>
> > > >>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > >
> > >
> >
> > --
> >
> > Konstantin Knauf | Solutions Architect
> >
> > +49 160 91394525
> >
> >
> > Follow us @VervericaData Ververica <https://www.ververica.com/>
> >
> >
> > --
> >
> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Tony) Cheng
> >
>


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData Ververica <https://www.ververica.com/>


--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
Hi Konstantin,

* should we add "cancelWithSavepeoint" to a new public API, when we have
deprecated the corresponding REST API/CLI methods? In my understanding
there is no reason to use it anymore.

Good point. We can exclude "cancelWithSavepoint" from public API at least
for now,
since it is deprecated already. Let's see if there is other concerns.

* should we call "stopWithSavepoint" simply "stop" as "stop" always
performs a savepoint?

Well for naming issue I'm fine with that if it is a consensus of our
community. I can see
there is a "stop" CLI command which means "stop with savepoint".

Best,
tison.


Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道:

> Hi Thomas,
>
> maybe there is a misunderstanding. There is no plan to deprecate anything
> in the REST API in the process of introducing the JobClient API, and it
> shouldn't.
>
> Since "cancel with savepoint" was already deprecated in the REST API and
> CLI, I am just raising the question whether to add it to the JobClient API
> in the first place.
>
> Best,
>
> Konstantin
>
>
>
> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote:
>
> > I did not realize there was a plan to deprecate anything in the REST API?
> >
> > The REST API is super important for tooling written in non JVM languages,
> > that does not include a Flink client (like FlinkK8sOperator). The REST
> API
> > should continue to support all job management operations, including job
> > submission.
> >
> > Thomas
> >
> >
> > On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <
> [hidden email]
> > >
> > wrote:
> >
> > > Hi Zili,
> > >
> > > thanks for working on this topic. Just read through the FLIP and I have
> > two
> > > questions:
> > >
> > > * should we add "cancelWithSavepeoint" to a new public API, when we
> have
> > > deprecated the corresponding REST API/CLI methods? In my understanding
> > > there is no reason to use it anymore.
> > > * should we call "stopWithSavepoint" simply "stop" as "stop" always
> > > performs a savepoint?
> > >
> > > Best,
> > >
> > > Konstantin
> > >
> > >
> > >
> > > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > > > Hi Flavio,
> > > >
> > > > I agree that this would be good to have. But I also think that this
> is
> > > > outside the scope of FLIP-74, I think it is an orthogonal feature.
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier <
> [hidden email]>
> > > > wrote:
> > > > >
> > > > > Hi all,
> > > > > just a remark about the Flink REST APIs (and its client as well):
> > > almost
> > > > > all the times we need a way to dynamically know which jobs are
> > > contained
> > > > in
> > > > > a jar file, and this could be exposed by the REST endpoint under
> > > > > /jars/:jarid/entry-points (a simple way to implement this would be
> to
> > > > check
> > > > > the value of Main-class or Main-classes inside the Manifest of the
> > jar
> > > if
> > > > > they exists [1]).
> > > > >
> > > > > I understand that this is something that is not strictly required
> to
> > > > > execute Flink jobs but IMHO it would ease A LOT the work of UI
> > > developers
> > > > > that could have a way to show the users all available jobs inside a
> > > jar +
> > > > > their configurable parameters.
> > > > > For example, right now in the WebUI, you can upload a jar and then
> > you
> > > > have
> > > > > to set (without any autocomplete or UI support) the main class and
> > > their
> > > > > params (for example using a string like --param1 xx --param2 yy).
> > > > > Adding this functionality to the REST API and the respective client
> > > would
> > > > > enable the WebUI (and all UIs interacting with a Flink cluster) to
> > > > prefill
> > > > > a dropdown list containing the list of entry-point classes (i.e.
> > Flink
> > > > > jobs) and, once selected, their required (typed) parameters.
> > > > >
> > > > > Best,
> > > > > Flavio
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864
> > > > >
> > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]>
> > > wrote:
> > > > >
> > > > >> modify
> > > > >>
> > > > >> /we just shutdown the cluster on the exit of client that running
> > > inside
> > > > >> cluster/
> > > > >>
> > > > >> to
> > > > >>
> > > > >> we just shutdown the cluster on both the exit of client that
> running
> > > > inside
> > > > >> cluster and the finish of job.
> > > > >> Since client is running inside cluster we can easily wait for the
> > end
> > > of
> > > > >> two both in ClusterEntrypoint.
> > > > >>
> > > > >>
> > > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
> > > > >>
> > > > >>> About JobCluster
> > > > >>>
> > > > >>> Actually I am not quite sure what we gains from DETACHED
> > > configuration
> > > > on
> > > > >>> cluster side.
> > > > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase,
> > > right?
> > > > >>>
> > > > >>> It comes to me one major questions we have to answer first.
> > > > >>>
> > > > >>> *What JobCluster conceptually is exactly*
> > > > >>>
> > > > >>> Related discussion can be found in JIRA[1] and mailing list[2].
> > > Stephan
> > > > >>> gives a nice
> > > > >>> description of JobCluster:
> > > > >>>
> > > > >>> Two things to add: - The job mode is very nice in the way that it
> > > runs
> > > > >> the
> > > > >>> client inside the cluster (in the same image/process that is the
> > JM)
> > > > and
> > > > >>> thus unifies both applications and what the Spark world calls the
> > > > "driver
> > > > >>> mode". - Another thing I would add is that during the FLIP-6
> > design,
> > > we
> > > > >>> were thinking about setups where Dispatcher and JobManager are
> > > separate
> > > > >>> processes. A Yarn or Mesos Dispatcher of a session could run
> > > > >> independently
> > > > >>> (even as privileged processes executing no code). Then you the
> > > > "per-job"
> > > > >>> mode could still be helpful: when a job is submitted to the
> > > dispatcher,
> > > > >> it
> > > > >>> launches the JM again in a per-job mode, so that JM and TM
> > processes
> > > > are
> > > > >>> bound to teh job only. For higher security setups, it is
> important
> > > that
> > > > >>> processes are not reused across jobs.
> > > > >>>
> > > > >>> However, currently in "per-job" mode we generate JobGraph in
> client
> > > > side,
> > > > >>> launching
> > > > >>> the JobCluster and retrieve the JobGraph for execution. So
> > actually,
> > > we
> > > > >>> don't "run the
> > > > >>> client inside the cluster".
> > > > >>>
> > > > >>> Besides, refer to the discussion with Till[1], it would be
> helpful
> > we
> > > > >>> follow the same process
> > > > >>> of session mode for that of "per-job" mode in user perspective,
> > that
> > > we
> > > > >>> don't use
> > > > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy
> > > Flink
> > > > >>> cluster in env.execute.
> > > > >>>
> > > > >>> Generally 2 points
> > > > >>>
> > > > >>> 1. Running Flink job by invoke user main method and execute
> > > throughout,
> > > > >>> instead of create
> > > > >>> JobGraph from main-class.
> > > > >>> 2. Run the client inside the cluster.
> > > > >>>
> > > > >>> If 1 and 2 are implemented. There is obvious no need for DETACHED
> > > mode
> > > > in
> > > > >>> cluster side
> > > > >>> because we just shutdown the cluster on the exit of client that
> > > running
> > > > >>> inside cluster. Whether
> > > > >>> or not delivered the result is up to user code.
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
> > > > >>> [2]
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
> > > > >>>
> > > > >>>
> > > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
> > > > >>>
> > > > >>>> Thanks for your replies Kostas & Aljoscha!
> > > > >>>>
> > > > >>>> Below are replies point by point.
> > > > >>>>
> > > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED
> mode
> > > in
> > > > >>>> client side.
> > > > >>>> There are two configurations overload the item DETACHED[1].
> > > > >>>>
> > > > >>>> In client side, it means whether or not client.submitJob is
> > blocking
> > > > to
> > > > >>>> job execution result.
> > > > >>>> Due to client.submitJob returns CompletableFuture<JobClient>
> > > > >> NON-DETACHED
> > > > >>>> is no
> > > > >>>> power at all. Caller of submitJob makes the decision whether or
> > not
> > > > >>>> blocking to get the
> > > > >>>> JobClient and request for the job execution result. If client
> > > crashes,
> > > > >> it
> > > > >>>> is a user scope
> > > > >>>> exception that should be handled in user code; if client lost
> > > > connection
> > > > >>>> to cluster, we have
> > > > >>>> a retry times and interval configuration that automatically
> retry
> > > and
> > > > >>>> throws an user scope
> > > > >>>> exception if exceed.
> > > > >>>>
> > > > >>>> Your comment about poll for result or job result sounds like a
> > > concern
> > > > >> on
> > > > >>>> cluster side.
> > > > >>>>
> > > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If
> > > > DETACHED
> > > > >>>> configured,
> > > > >>>> JobCluster exits on job finished; if NON-DETACHED configured,
> > > > JobCluster
> > > > >>>> exits on job
> > > > >>>> execution result delivered. FLIP-74 doesn't stick to changes on
> > this
> > > > >>>> scope, it is just remained.
> > > > >>>>
> > > > >>>> However, it is an interesting part we can revisit this
> > > implementation
> > > > a
> > > > >>>> bit.
> > > > >>>>
> > > > >>>> <see the next email for compact reply in this one>
> > > > >>>>
> > > > >>>> 2. The retrieval of JobClient is so important that if we don't
> > have
> > > a
> > > > >> way
> > > > >>>> to retrieve JobClient it is
> > > > >>>> a dumb public user-facing interface(what a strange state :P).
> > > > >>>>
> > > > >>>> About the retrieval of JobClient, as mentioned in the document,
> > two
> > > > ways
> > > > >>>> should be supported.
> > > > >>>>
> > > > >>>> (1). Retrieved as return type of job submission.
> > > > >>>> (2). Retrieve a JobClient of existing job.(with job id)
> > > > >>>>
> > > > >>>> I highly respect your thoughts about how Executors should be and
> > > > >> thoughts
> > > > >>>> on multi-layered clients.
> > > > >>>> Although, (2) is not supported by public interfaces as summary
> of
> > > > >>>> discussion above, we can discuss
> > > > >>>> a bit on the place of Executors on multi-layered clients and
> find
> > a
> > > > way
> > > > >>>> to retrieve JobClient of
> > > > >>>> existing job with public client API. I will comment in FLIP-73
> > > > thread[2]
> > > > >>>> since it is almost about Executors.
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> tison.
> > > > >>>>
> > > > >>>> [1]
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
> > > > >>>> [2]
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
> > > > >>>>
> > > > >>>>> Hi Tison,
> > > > >>>>>
> > > > >>>>> Thanks for the FLIP and launching the discussion!
> > > > >>>>>
> > > > >>>>> As a first note, big +1 on providing/exposing a JobClient to
> the
> > > > users!
> > > > >>>>>
> > > > >>>>> Some points that would be nice to be clarified:
> > > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I
> agree
> > > that
> > > > >>>>> at a high level, given that everything will now be
> asynchronous,
> > > > there
> > > > >>>>> is no need to keep the DETACHED mode but I think we should
> > specify
> > > > >>>>> some aspects. For example, without the explicit separation of
> the
> > > > >>>>> modes, what happens when the job finishes. Does the client
> > > > >>>>> periodically poll for the result always or the result is pushed
> > > when
> > > > >>>>> in NON-DETACHED mode? What happens if the client disconnects
> and
> > > > >>>>> reconnects?
> > > > >>>>>
> > > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I
> > think
> > > > >>>>> this is related to the other discussion you opened in the ML
> > about
> > > > >>>>> multi-layered clients. First of all, I agree that exposing
> > > different
> > > > >>>>> "levels" of clients would be a nice addition, and actually
> there
> > > have
> > > > >>>>> been some discussions about doing so in the future. Now for
> this
> > > > >>>>> specific discussion:
> > > > >>>>>      i) I do not think that we should expose the
> > > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this
> ties
> > us
> > > > to
> > > > >>>>> a specific architecture which may change in the future.
> > > > >>>>>     ii) I do not think it should be the Executor that will
> > provide
> > > a
> > > > >>>>> JobClient for an already running job (only for the Jobs that it
> > > > >>>>> submits). The job of the executor should just be to execute() a
> > > > >>>>> pipeline.
> > > > >>>>>     iii) I think a solution that respects the separation of
> > > concerns
> > > > >>>>> could be the addition of another component (in the future),
> > > something
> > > > >>>>> like a ClientFactory, or ClusterFactory that will have methods
> > > like:
> > > > >>>>> ClusterClient createCluster(Configuration), JobClient
> > > > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although
> > not
> > > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more. This
> > > > >>>>> component would be responsible to interact with a cluster
> manager
> > > > like
> > > > >>>>> Yarn and do what is now being done by the ClusterDescriptor
> plus
> > > some
> > > > >>>>> more stuff.
> > > > >>>>>
> > > > >>>>> Although under the hood all these abstractions (Environments,
> > > > >>>>> Executors, ...) underneath use the same clients, I believe
> their
> > > > >>>>> job/existence is not contradicting but they simply hide some of
> > the
> > > > >>>>> complexity from the user, and give us, as developers some
> freedom
> > > to
> > > > >>>>> change in the future some of the parts. For example, the
> executor
> > > > will
> > > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of
> > > > requiring
> > > > >>>>> the user to do each step separately. This allows us to, for
> > > example,
> > > > >>>>> get rid of the Plan if in the future everything is DataStream.
> > > > >>>>> Essentially, I think of these as layers of an onion with the
> > > clients
> > > > >>>>> being close to the core. The higher you go, the more
> > functionality
> > > is
> > > > >>>>> included and hidden from the public eye.
> > > > >>>>>
> > > > >>>>> Point iii) by the way is just a thought and by no means final.
> I
> > > also
> > > > >>>>> like the idea of multi-layered clients so this may spark up the
> > > > >>>>> discussion.
> > > > >>>>>
> > > > >>>>> Cheers,
> > > > >>>>> Kostas
> > > > >>>>>
> > > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
> > > > [hidden email]>
> > > > >>>>> wrote:
> > > > >>>>>>
> > > > >>>>>> Hi Tison,
> > > > >>>>>>
> > > > >>>>>> Thanks for proposing the document! I had some comments on the
> > > > >> document.
> > > > >>>>>>
> > > > >>>>>> I think the only complex thing that we still need to figure
> out
> > is
> > > > >> how
> > > > >>>>> to get a JobClient for a job that is already running. As you
> > > > mentioned
> > > > >> in
> > > > >>>>> the document. Currently I’m thinking that its ok to add a
> method
> > to
> > > > >>>>> Executor for retrieving a JobClient for a running job by
> > providing
> > > an
> > > > >> ID.
> > > > >>>>> Let’s see what Kostas has to say on the topic.
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> Aljoscha
> > > > >>>>>>
> > > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]>
> > > wrote:
> > > > >>>>>>>
> > > > >>>>>>> Hi all,
> > > > >>>>>>>
> > > > >>>>>>> Summary from the discussion about introducing Flink JobClient
> > > > >> API[1]
> > > > >>>>> we
> > > > >>>>>>> draft FLIP-74[2] to
> > > > >>>>>>> gather thoughts and towards a standard public user-facing
> > > > >> interfaces.
> > > > >>>>>>>
> > > > >>>>>>> This discussion thread aims at standardizing job level client
> > > API.
> > > > >>>>> But I'd
> > > > >>>>>>> like to emphasize that
> > > > >>>>>>> how to retrieve JobClient possibly causes further discussion
> on
> > > > >>>>> different
> > > > >>>>>>> level clients exposed from
> > > > >>>>>>> Flink so that a following thread will be started later to
> > > > >> coordinate
> > > > >>>>>>> FLIP-73 and FLIP-74 on
> > > > >>>>>>> expose issue.
> > > > >>>>>>>
> > > > >>>>>>> Looking forward to your opinions.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> tison.
> > > > >>>>>>>
> > > > >>>>>>> [1]
> > > > >>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > > >>>>>>> [2]
> > > > >>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > >
> > > >
> > >
> > > --
> > >
> > > Konstantin Knauf | Solutions Architect
> > >
> > > +49 160 91394525
> > >
> > >
> > > Follow us @VervericaData Ververica <https://www.ververica.com/>
> > >
> > >
> > > --
> > >
> > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > > Conference
> > >
> > > Stream Processing | Event Driven | Real Time
> > >
> > > --
> > >
> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > >
> > > --
> > > Ververica GmbH
> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > > (Tony) Cheng
> > >
> >
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
Hi all,

Narrow the scope to FLIP-74 we aimed at introduce a useful and extensible
user-facing public interface JobClient. Let me reemphasize two major works
under this thread.

1. standard interface

As in FLIP-74 we introduce an interface JobClient with its methods, we'd
like to
make it a standard (non-final since we can always extends on demand)
interface.

On this branch I'd like to, with respect to Konstantin's suggestion, 1)
exclude deprecated
cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to stop
to keep
consistency with our CLI command. If there is no more concern on these
topics I will
update proposal tomorrow.

2. client interfaces are asynchronous

If the asynchronous JobClient interfaces approved, a necessary proposed
changed is
corresponding update ClusterClient interfaces. Still ClusterClient is an
internal concept
after this FLIP but it might have some impact so I think it's better to
reach a community
consensus as prerequisite. Note that with all client methods are
asynchronous, no matter
whether or not we remove client side detach option it is no power.

Let me know your ideas on these topic and keep moving forward :-)

Best,
tison.


Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道:

> Hi Konstantin,
>
> * should we add "cancelWithSavepeoint" to a new public API, when we have
> deprecated the corresponding REST API/CLI methods? In my understanding
> there is no reason to use it anymore.
>
> Good point. We can exclude "cancelWithSavepoint" from public API at least
> for now,
> since it is deprecated already. Let's see if there is other concerns.
>
> * should we call "stopWithSavepoint" simply "stop" as "stop" always
> performs a savepoint?
>
> Well for naming issue I'm fine with that if it is a consensus of our
> community. I can see
> there is a "stop" CLI command which means "stop with savepoint".
>
> Best,
> tison.
>
>
> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道:
>
>> Hi Thomas,
>>
>> maybe there is a misunderstanding. There is no plan to deprecate anything
>> in the REST API in the process of introducing the JobClient API, and it
>> shouldn't.
>>
>> Since "cancel with savepoint" was already deprecated in the REST API and
>> CLI, I am just raising the question whether to add it to the JobClient API
>> in the first place.
>>
>> Best,
>>
>> Konstantin
>>
>>
>>
>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote:
>>
>> > I did not realize there was a plan to deprecate anything in the REST
>> API?
>> >
>> > The REST API is super important for tooling written in non JVM
>> languages,
>> > that does not include a Flink client (like FlinkK8sOperator). The REST
>> API
>> > should continue to support all job management operations, including job
>> > submission.
>> >
>> > Thomas
>> >
>> >
>> > On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <
>> [hidden email]
>> > >
>> > wrote:
>> >
>> > > Hi Zili,
>> > >
>> > > thanks for working on this topic. Just read through the FLIP and I
>> have
>> > two
>> > > questions:
>> > >
>> > > * should we add "cancelWithSavepeoint" to a new public API, when we
>> have
>> > > deprecated the corresponding REST API/CLI methods? In my understanding
>> > > there is no reason to use it anymore.
>> > > * should we call "stopWithSavepoint" simply "stop" as "stop" always
>> > > performs a savepoint?
>> > >
>> > > Best,
>> > >
>> > > Konstantin
>> > >
>> > >
>> > >
>> > > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <
>> [hidden email]>
>> > > wrote:
>> > >
>> > > > Hi Flavio,
>> > > >
>> > > > I agree that this would be good to have. But I also think that this
>> is
>> > > > outside the scope of FLIP-74, I think it is an orthogonal feature.
>> > > >
>> > > > Best,
>> > > > Aljoscha
>> > > >
>> > > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier <
>> [hidden email]>
>> > > > wrote:
>> > > > >
>> > > > > Hi all,
>> > > > > just a remark about the Flink REST APIs (and its client as well):
>> > > almost
>> > > > > all the times we need a way to dynamically know which jobs are
>> > > contained
>> > > > in
>> > > > > a jar file, and this could be exposed by the REST endpoint under
>> > > > > /jars/:jarid/entry-points (a simple way to implement this would
>> be to
>> > > > check
>> > > > > the value of Main-class or Main-classes inside the Manifest of the
>> > jar
>> > > if
>> > > > > they exists [1]).
>> > > > >
>> > > > > I understand that this is something that is not strictly required
>> to
>> > > > > execute Flink jobs but IMHO it would ease A LOT the work of UI
>> > > developers
>> > > > > that could have a way to show the users all available jobs inside
>> a
>> > > jar +
>> > > > > their configurable parameters.
>> > > > > For example, right now in the WebUI, you can upload a jar and then
>> > you
>> > > > have
>> > > > > to set (without any autocomplete or UI support) the main class and
>> > > their
>> > > > > params (for example using a string like --param1 xx --param2 yy).
>> > > > > Adding this functionality to the REST API and the respective
>> client
>> > > would
>> > > > > enable the WebUI (and all UIs interacting with a Flink cluster) to
>> > > > prefill
>> > > > > a dropdown list containing the list of entry-point classes (i.e.
>> > Flink
>> > > > > jobs) and, once selected, their required (typed) parameters.
>> > > > >
>> > > > > Best,
>> > > > > Flavio
>> > > > >
>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864
>> > > > >
>> > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]>
>> > > wrote:
>> > > > >
>> > > > >> modify
>> > > > >>
>> > > > >> /we just shutdown the cluster on the exit of client that running
>> > > inside
>> > > > >> cluster/
>> > > > >>
>> > > > >> to
>> > > > >>
>> > > > >> we just shutdown the cluster on both the exit of client that
>> running
>> > > > inside
>> > > > >> cluster and the finish of job.
>> > > > >> Since client is running inside cluster we can easily wait for the
>> > end
>> > > of
>> > > > >> two both in ClusterEntrypoint.
>> > > > >>
>> > > > >>
>> > > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
>> > > > >>
>> > > > >>> About JobCluster
>> > > > >>>
>> > > > >>> Actually I am not quite sure what we gains from DETACHED
>> > > configuration
>> > > > on
>> > > > >>> cluster side.
>> > > > >>> We don't have a NON-DETACHED JobCluster in fact in our codebase,
>> > > right?
>> > > > >>>
>> > > > >>> It comes to me one major questions we have to answer first.
>> > > > >>>
>> > > > >>> *What JobCluster conceptually is exactly*
>> > > > >>>
>> > > > >>> Related discussion can be found in JIRA[1] and mailing list[2].
>> > > Stephan
>> > > > >>> gives a nice
>> > > > >>> description of JobCluster:
>> > > > >>>
>> > > > >>> Two things to add: - The job mode is very nice in the way that
>> it
>> > > runs
>> > > > >> the
>> > > > >>> client inside the cluster (in the same image/process that is the
>> > JM)
>> > > > and
>> > > > >>> thus unifies both applications and what the Spark world calls
>> the
>> > > > "driver
>> > > > >>> mode". - Another thing I would add is that during the FLIP-6
>> > design,
>> > > we
>> > > > >>> were thinking about setups where Dispatcher and JobManager are
>> > > separate
>> > > > >>> processes. A Yarn or Mesos Dispatcher of a session could run
>> > > > >> independently
>> > > > >>> (even as privileged processes executing no code). Then you the
>> > > > "per-job"
>> > > > >>> mode could still be helpful: when a job is submitted to the
>> > > dispatcher,
>> > > > >> it
>> > > > >>> launches the JM again in a per-job mode, so that JM and TM
>> > processes
>> > > > are
>> > > > >>> bound to teh job only. For higher security setups, it is
>> important
>> > > that
>> > > > >>> processes are not reused across jobs.
>> > > > >>>
>> > > > >>> However, currently in "per-job" mode we generate JobGraph in
>> client
>> > > > side,
>> > > > >>> launching
>> > > > >>> the JobCluster and retrieve the JobGraph for execution. So
>> > actually,
>> > > we
>> > > > >>> don't "run the
>> > > > >>> client inside the cluster".
>> > > > >>>
>> > > > >>> Besides, refer to the discussion with Till[1], it would be
>> helpful
>> > we
>> > > > >>> follow the same process
>> > > > >>> of session mode for that of "per-job" mode in user perspective,
>> > that
>> > > we
>> > > > >>> don't use
>> > > > >>> OptimizedPlanEnvironment to create JobGraph, but directly deploy
>> > > Flink
>> > > > >>> cluster in env.execute.
>> > > > >>>
>> > > > >>> Generally 2 points
>> > > > >>>
>> > > > >>> 1. Running Flink job by invoke user main method and execute
>> > > throughout,
>> > > > >>> instead of create
>> > > > >>> JobGraph from main-class.
>> > > > >>> 2. Run the client inside the cluster.
>> > > > >>>
>> > > > >>> If 1 and 2 are implemented. There is obvious no need for
>> DETACHED
>> > > mode
>> > > > in
>> > > > >>> cluster side
>> > > > >>> because we just shutdown the cluster on the exit of client that
>> > > running
>> > > > >>> inside cluster. Whether
>> > > > >>> or not delivered the result is up to user code.
>> > > > >>>
>> > > > >>> [1]
>> > > > >>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
>> > > > >>> [2]
>> > > > >>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
>> > > > >>>
>> > > > >>>
>> > > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
>> > > > >>>
>> > > > >>>> Thanks for your replies Kostas & Aljoscha!
>> > > > >>>>
>> > > > >>>> Below are replies point by point.
>> > > > >>>>
>> > > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED
>> mode
>> > > in
>> > > > >>>> client side.
>> > > > >>>> There are two configurations overload the item DETACHED[1].
>> > > > >>>>
>> > > > >>>> In client side, it means whether or not client.submitJob is
>> > blocking
>> > > > to
>> > > > >>>> job execution result.
>> > > > >>>> Due to client.submitJob returns CompletableFuture<JobClient>
>> > > > >> NON-DETACHED
>> > > > >>>> is no
>> > > > >>>> power at all. Caller of submitJob makes the decision whether or
>> > not
>> > > > >>>> blocking to get the
>> > > > >>>> JobClient and request for the job execution result. If client
>> > > crashes,
>> > > > >> it
>> > > > >>>> is a user scope
>> > > > >>>> exception that should be handled in user code; if client lost
>> > > > connection
>> > > > >>>> to cluster, we have
>> > > > >>>> a retry times and interval configuration that automatically
>> retry
>> > > and
>> > > > >>>> throws an user scope
>> > > > >>>> exception if exceed.
>> > > > >>>>
>> > > > >>>> Your comment about poll for result or job result sounds like a
>> > > concern
>> > > > >> on
>> > > > >>>> cluster side.
>> > > > >>>>
>> > > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If
>> > > > DETACHED
>> > > > >>>> configured,
>> > > > >>>> JobCluster exits on job finished; if NON-DETACHED configured,
>> > > > JobCluster
>> > > > >>>> exits on job
>> > > > >>>> execution result delivered. FLIP-74 doesn't stick to changes on
>> > this
>> > > > >>>> scope, it is just remained.
>> > > > >>>>
>> > > > >>>> However, it is an interesting part we can revisit this
>> > > implementation
>> > > > a
>> > > > >>>> bit.
>> > > > >>>>
>> > > > >>>> <see the next email for compact reply in this one>
>> > > > >>>>
>> > > > >>>> 2. The retrieval of JobClient is so important that if we don't
>> > have
>> > > a
>> > > > >> way
>> > > > >>>> to retrieve JobClient it is
>> > > > >>>> a dumb public user-facing interface(what a strange state :P).
>> > > > >>>>
>> > > > >>>> About the retrieval of JobClient, as mentioned in the document,
>> > two
>> > > > ways
>> > > > >>>> should be supported.
>> > > > >>>>
>> > > > >>>> (1). Retrieved as return type of job submission.
>> > > > >>>> (2). Retrieve a JobClient of existing job.(with job id)
>> > > > >>>>
>> > > > >>>> I highly respect your thoughts about how Executors should be
>> and
>> > > > >> thoughts
>> > > > >>>> on multi-layered clients.
>> > > > >>>> Although, (2) is not supported by public interfaces as summary
>> of
>> > > > >>>> discussion above, we can discuss
>> > > > >>>> a bit on the place of Executors on multi-layered clients and
>> find
>> > a
>> > > > way
>> > > > >>>> to retrieve JobClient of
>> > > > >>>> existing job with public client API. I will comment in FLIP-73
>> > > > thread[2]
>> > > > >>>> since it is almost about Executors.
>> > > > >>>>
>> > > > >>>> Best,
>> > > > >>>> tison.
>> > > > >>>>
>> > > > >>>> [1]
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
>> > > > >>>> [2]
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
>> > > > >>>>
>> > > > >>>>
>> > > > >>>>
>> > > > >>>>
>> > > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
>> > > > >>>>
>> > > > >>>>> Hi Tison,
>> > > > >>>>>
>> > > > >>>>> Thanks for the FLIP and launching the discussion!
>> > > > >>>>>
>> > > > >>>>> As a first note, big +1 on providing/exposing a JobClient to
>> the
>> > > > users!
>> > > > >>>>>
>> > > > >>>>> Some points that would be nice to be clarified:
>> > > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I
>> agree
>> > > that
>> > > > >>>>> at a high level, given that everything will now be
>> asynchronous,
>> > > > there
>> > > > >>>>> is no need to keep the DETACHED mode but I think we should
>> > specify
>> > > > >>>>> some aspects. For example, without the explicit separation of
>> the
>> > > > >>>>> modes, what happens when the job finishes. Does the client
>> > > > >>>>> periodically poll for the result always or the result is
>> pushed
>> > > when
>> > > > >>>>> in NON-DETACHED mode? What happens if the client disconnects
>> and
>> > > > >>>>> reconnects?
>> > > > >>>>>
>> > > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I
>> > think
>> > > > >>>>> this is related to the other discussion you opened in the ML
>> > about
>> > > > >>>>> multi-layered clients. First of all, I agree that exposing
>> > > different
>> > > > >>>>> "levels" of clients would be a nice addition, and actually
>> there
>> > > have
>> > > > >>>>> been some discussions about doing so in the future. Now for
>> this
>> > > > >>>>> specific discussion:
>> > > > >>>>>      i) I do not think that we should expose the
>> > > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this
>> ties
>> > us
>> > > > to
>> > > > >>>>> a specific architecture which may change in the future.
>> > > > >>>>>     ii) I do not think it should be the Executor that will
>> > provide
>> > > a
>> > > > >>>>> JobClient for an already running job (only for the Jobs that
>> it
>> > > > >>>>> submits). The job of the executor should just be to execute()
>> a
>> > > > >>>>> pipeline.
>> > > > >>>>>     iii) I think a solution that respects the separation of
>> > > concerns
>> > > > >>>>> could be the addition of another component (in the future),
>> > > something
>> > > > >>>>> like a ClientFactory, or ClusterFactory that will have methods
>> > > like:
>> > > > >>>>> ClusterClient createCluster(Configuration), JobClient
>> > > > >>>>> retrieveJobClient(Configuration , JobId), maybe even (although
>> > not
>> > > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more.
>> This
>> > > > >>>>> component would be responsible to interact with a cluster
>> manager
>> > > > like
>> > > > >>>>> Yarn and do what is now being done by the ClusterDescriptor
>> plus
>> > > some
>> > > > >>>>> more stuff.
>> > > > >>>>>
>> > > > >>>>> Although under the hood all these abstractions (Environments,
>> > > > >>>>> Executors, ...) underneath use the same clients, I believe
>> their
>> > > > >>>>> job/existence is not contradicting but they simply hide some
>> of
>> > the
>> > > > >>>>> complexity from the user, and give us, as developers some
>> freedom
>> > > to
>> > > > >>>>> change in the future some of the parts. For example, the
>> executor
>> > > > will
>> > > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of
>> > > > requiring
>> > > > >>>>> the user to do each step separately. This allows us to, for
>> > > example,
>> > > > >>>>> get rid of the Plan if in the future everything is DataStream.
>> > > > >>>>> Essentially, I think of these as layers of an onion with the
>> > > clients
>> > > > >>>>> being close to the core. The higher you go, the more
>> > functionality
>> > > is
>> > > > >>>>> included and hidden from the public eye.
>> > > > >>>>>
>> > > > >>>>> Point iii) by the way is just a thought and by no means
>> final. I
>> > > also
>> > > > >>>>> like the idea of multi-layered clients so this may spark up
>> the
>> > > > >>>>> discussion.
>> > > > >>>>>
>> > > > >>>>> Cheers,
>> > > > >>>>> Kostas
>> > > > >>>>>
>> > > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
>> > > > [hidden email]>
>> > > > >>>>> wrote:
>> > > > >>>>>>
>> > > > >>>>>> Hi Tison,
>> > > > >>>>>>
>> > > > >>>>>> Thanks for proposing the document! I had some comments on the
>> > > > >> document.
>> > > > >>>>>>
>> > > > >>>>>> I think the only complex thing that we still need to figure
>> out
>> > is
>> > > > >> how
>> > > > >>>>> to get a JobClient for a job that is already running. As you
>> > > > mentioned
>> > > > >> in
>> > > > >>>>> the document. Currently I’m thinking that its ok to add a
>> method
>> > to
>> > > > >>>>> Executor for retrieving a JobClient for a running job by
>> > providing
>> > > an
>> > > > >> ID.
>> > > > >>>>> Let’s see what Kostas has to say on the topic.
>> > > > >>>>>>
>> > > > >>>>>> Best,
>> > > > >>>>>> Aljoscha
>> > > > >>>>>>
>> > > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]>
>> > > wrote:
>> > > > >>>>>>>
>> > > > >>>>>>> Hi all,
>> > > > >>>>>>>
>> > > > >>>>>>> Summary from the discussion about introducing Flink
>> JobClient
>> > > > >> API[1]
>> > > > >>>>> we
>> > > > >>>>>>> draft FLIP-74[2] to
>> > > > >>>>>>> gather thoughts and towards a standard public user-facing
>> > > > >> interfaces.
>> > > > >>>>>>>
>> > > > >>>>>>> This discussion thread aims at standardizing job level
>> client
>> > > API.
>> > > > >>>>> But I'd
>> > > > >>>>>>> like to emphasize that
>> > > > >>>>>>> how to retrieve JobClient possibly causes further
>> discussion on
>> > > > >>>>> different
>> > > > >>>>>>> level clients exposed from
>> > > > >>>>>>> Flink so that a following thread will be started later to
>> > > > >> coordinate
>> > > > >>>>>>> FLIP-73 and FLIP-74 on
>> > > > >>>>>>> expose issue.
>> > > > >>>>>>>
>> > > > >>>>>>> Looking forward to your opinions.
>> > > > >>>>>>>
>> > > > >>>>>>> Best,
>> > > > >>>>>>> tison.
>> > > > >>>>>>>
>> > > > >>>>>>> [1]
>> > > > >>>>>>>
>> > > > >>>>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>> > > > >>>>>>> [2]
>> > > > >>>>>>>
>> > > > >>>>>
>> > > > >>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
>> > > > >>>>>>
>> > > > >>>>>
>> > > > >>>>
>> > > >
>> > > >
>> > >
>> > > --
>> > >
>> > > Konstantin Knauf | Solutions Architect
>> > >
>> > > +49 160 91394525
>> > >
>> > >
>> > > Follow us @VervericaData Ververica <https://www.ververica.com/>
>> > >
>> > >
>> > > --
>> > >
>> > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> > > Conference
>> > >
>> > > Stream Processing | Event Driven | Real Time
>> > >
>> > > --
>> > >
>> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> > >
>> > > --
>> > > Ververica GmbH
>> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>> Ji
>> > > (Tony) Cheng
>> > >
>> >
>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>>
>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Tony) Cheng
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
Hi Kostas,

By mention "integration to be a follow up discussion" in FLIP-73 discussion
I think I'm more on the context if FLIP-74 because without including the
retrieval of JobClient in FLIP-74 we actually introduce a dummy public
interface.

1. return JobClient from Executor#execute actually has a dependency of
FLIP-73.
2. retrieve JobClient of an existing job directly lead to the discussion of
the retrieval
chains which I started as [DISCUSS] Expose multiple level clients.

Best,
tison.


Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:35写道:

> Hi all,
>
> Narrow the scope to FLIP-74 we aimed at introduce a useful and extensible
> user-facing public interface JobClient. Let me reemphasize two major works
> under this thread.
>
> 1. standard interface
>
> As in FLIP-74 we introduce an interface JobClient with its methods, we'd
> like to
> make it a standard (non-final since we can always extends on demand)
> interface.
>
> On this branch I'd like to, with respect to Konstantin's suggestion, 1)
> exclude deprecated
> cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to stop
> to keep
> consistency with our CLI command. If there is no more concern on these
> topics I will
> update proposal tomorrow.
>
> 2. client interfaces are asynchronous
>
> If the asynchronous JobClient interfaces approved, a necessary proposed
> changed is
> corresponding update ClusterClient interfaces. Still ClusterClient is an
> internal concept
> after this FLIP but it might have some impact so I think it's better to
> reach a community
> consensus as prerequisite. Note that with all client methods are
> asynchronous, no matter
> whether or not we remove client side detach option it is no power.
>
> Let me know your ideas on these topic and keep moving forward :-)
>
> Best,
> tison.
>
>
> Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道:
>
>> Hi Konstantin,
>>
>> * should we add "cancelWithSavepeoint" to a new public API, when we have
>> deprecated the corresponding REST API/CLI methods? In my understanding
>> there is no reason to use it anymore.
>>
>> Good point. We can exclude "cancelWithSavepoint" from public API at least
>> for now,
>> since it is deprecated already. Let's see if there is other concerns.
>>
>> * should we call "stopWithSavepoint" simply "stop" as "stop" always
>> performs a savepoint?
>>
>> Well for naming issue I'm fine with that if it is a consensus of our
>> community. I can see
>> there is a "stop" CLI command which means "stop with savepoint".
>>
>> Best,
>> tison.
>>
>>
>> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道:
>>
>>> Hi Thomas,
>>>
>>> maybe there is a misunderstanding. There is no plan to deprecate anything
>>> in the REST API in the process of introducing the JobClient API, and it
>>> shouldn't.
>>>
>>> Since "cancel with savepoint" was already deprecated in the REST API and
>>> CLI, I am just raising the question whether to add it to the JobClient
>>> API
>>> in the first place.
>>>
>>> Best,
>>>
>>> Konstantin
>>>
>>>
>>>
>>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote:
>>>
>>> > I did not realize there was a plan to deprecate anything in the REST
>>> API?
>>> >
>>> > The REST API is super important for tooling written in non JVM
>>> languages,
>>> > that does not include a Flink client (like FlinkK8sOperator). The REST
>>> API
>>> > should continue to support all job management operations, including job
>>> > submission.
>>> >
>>> > Thomas
>>> >
>>> >
>>> > On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <
>>> [hidden email]
>>> > >
>>> > wrote:
>>> >
>>> > > Hi Zili,
>>> > >
>>> > > thanks for working on this topic. Just read through the FLIP and I
>>> have
>>> > two
>>> > > questions:
>>> > >
>>> > > * should we add "cancelWithSavepeoint" to a new public API, when we
>>> have
>>> > > deprecated the corresponding REST API/CLI methods? In my
>>> understanding
>>> > > there is no reason to use it anymore.
>>> > > * should we call "stopWithSavepoint" simply "stop" as "stop" always
>>> > > performs a savepoint?
>>> > >
>>> > > Best,
>>> > >
>>> > > Konstantin
>>> > >
>>> > >
>>> > >
>>> > > On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <
>>> [hidden email]>
>>> > > wrote:
>>> > >
>>> > > > Hi Flavio,
>>> > > >
>>> > > > I agree that this would be good to have. But I also think that
>>> this is
>>> > > > outside the scope of FLIP-74, I think it is an orthogonal feature.
>>> > > >
>>> > > > Best,
>>> > > > Aljoscha
>>> > > >
>>> > > > > On 27. Sep 2019, at 10:31, Flavio Pompermaier <
>>> [hidden email]>
>>> > > > wrote:
>>> > > > >
>>> > > > > Hi all,
>>> > > > > just a remark about the Flink REST APIs (and its client as well):
>>> > > almost
>>> > > > > all the times we need a way to dynamically know which jobs are
>>> > > contained
>>> > > > in
>>> > > > > a jar file, and this could be exposed by the REST endpoint under
>>> > > > > /jars/:jarid/entry-points (a simple way to implement this would
>>> be to
>>> > > > check
>>> > > > > the value of Main-class or Main-classes inside the Manifest of
>>> the
>>> > jar
>>> > > if
>>> > > > > they exists [1]).
>>> > > > >
>>> > > > > I understand that this is something that is not strictly
>>> required to
>>> > > > > execute Flink jobs but IMHO it would ease A LOT the work of UI
>>> > > developers
>>> > > > > that could have a way to show the users all available jobs
>>> inside a
>>> > > jar +
>>> > > > > their configurable parameters.
>>> > > > > For example, right now in the WebUI, you can upload a jar and
>>> then
>>> > you
>>> > > > have
>>> > > > > to set (without any autocomplete or UI support) the main class
>>> and
>>> > > their
>>> > > > > params (for example using a string like --param1 xx --param2 yy).
>>> > > > > Adding this functionality to the REST API and the respective
>>> client
>>> > > would
>>> > > > > enable the WebUI (and all UIs interacting with a Flink cluster)
>>> to
>>> > > > prefill
>>> > > > > a dropdown list containing the list of entry-point classes (i.e.
>>> > Flink
>>> > > > > jobs) and, once selected, their required (typed) parameters.
>>> > > > >
>>> > > > > Best,
>>> > > > > Flavio
>>> > > > >
>>> > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864
>>> > > > >
>>> > > > > On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]>
>>> > > wrote:
>>> > > > >
>>> > > > >> modify
>>> > > > >>
>>> > > > >> /we just shutdown the cluster on the exit of client that running
>>> > > inside
>>> > > > >> cluster/
>>> > > > >>
>>> > > > >> to
>>> > > > >>
>>> > > > >> we just shutdown the cluster on both the exit of client that
>>> running
>>> > > > inside
>>> > > > >> cluster and the finish of job.
>>> > > > >> Since client is running inside cluster we can easily wait for
>>> the
>>> > end
>>> > > of
>>> > > > >> two both in ClusterEntrypoint.
>>> > > > >>
>>> > > > >>
>>> > > > >> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
>>> > > > >>
>>> > > > >>> About JobCluster
>>> > > > >>>
>>> > > > >>> Actually I am not quite sure what we gains from DETACHED
>>> > > configuration
>>> > > > on
>>> > > > >>> cluster side.
>>> > > > >>> We don't have a NON-DETACHED JobCluster in fact in our
>>> codebase,
>>> > > right?
>>> > > > >>>
>>> > > > >>> It comes to me one major questions we have to answer first.
>>> > > > >>>
>>> > > > >>> *What JobCluster conceptually is exactly*
>>> > > > >>>
>>> > > > >>> Related discussion can be found in JIRA[1] and mailing list[2].
>>> > > Stephan
>>> > > > >>> gives a nice
>>> > > > >>> description of JobCluster:
>>> > > > >>>
>>> > > > >>> Two things to add: - The job mode is very nice in the way that
>>> it
>>> > > runs
>>> > > > >> the
>>> > > > >>> client inside the cluster (in the same image/process that is
>>> the
>>> > JM)
>>> > > > and
>>> > > > >>> thus unifies both applications and what the Spark world calls
>>> the
>>> > > > "driver
>>> > > > >>> mode". - Another thing I would add is that during the FLIP-6
>>> > design,
>>> > > we
>>> > > > >>> were thinking about setups where Dispatcher and JobManager are
>>> > > separate
>>> > > > >>> processes. A Yarn or Mesos Dispatcher of a session could run
>>> > > > >> independently
>>> > > > >>> (even as privileged processes executing no code). Then you the
>>> > > > "per-job"
>>> > > > >>> mode could still be helpful: when a job is submitted to the
>>> > > dispatcher,
>>> > > > >> it
>>> > > > >>> launches the JM again in a per-job mode, so that JM and TM
>>> > processes
>>> > > > are
>>> > > > >>> bound to teh job only. For higher security setups, it is
>>> important
>>> > > that
>>> > > > >>> processes are not reused across jobs.
>>> > > > >>>
>>> > > > >>> However, currently in "per-job" mode we generate JobGraph in
>>> client
>>> > > > side,
>>> > > > >>> launching
>>> > > > >>> the JobCluster and retrieve the JobGraph for execution. So
>>> > actually,
>>> > > we
>>> > > > >>> don't "run the
>>> > > > >>> client inside the cluster".
>>> > > > >>>
>>> > > > >>> Besides, refer to the discussion with Till[1], it would be
>>> helpful
>>> > we
>>> > > > >>> follow the same process
>>> > > > >>> of session mode for that of "per-job" mode in user perspective,
>>> > that
>>> > > we
>>> > > > >>> don't use
>>> > > > >>> OptimizedPlanEnvironment to create JobGraph, but directly
>>> deploy
>>> > > Flink
>>> > > > >>> cluster in env.execute.
>>> > > > >>>
>>> > > > >>> Generally 2 points
>>> > > > >>>
>>> > > > >>> 1. Running Flink job by invoke user main method and execute
>>> > > throughout,
>>> > > > >>> instead of create
>>> > > > >>> JobGraph from main-class.
>>> > > > >>> 2. Run the client inside the cluster.
>>> > > > >>>
>>> > > > >>> If 1 and 2 are implemented. There is obvious no need for
>>> DETACHED
>>> > > mode
>>> > > > in
>>> > > > >>> cluster side
>>> > > > >>> because we just shutdown the cluster on the exit of client that
>>> > > running
>>> > > > >>> inside cluster. Whether
>>> > > > >>> or not delivered the result is up to user code.
>>> > > > >>>
>>> > > > >>> [1]
>>> > > > >>>
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
>>> > > > >>> [2]
>>> > > > >>>
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
>>> > > > >>>
>>> > > > >>>
>>> > > > >>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
>>> > > > >>>
>>> > > > >>>> Thanks for your replies Kostas & Aljoscha!
>>> > > > >>>>
>>> > > > >>>> Below are replies point by point.
>>> > > > >>>>
>>> > > > >>>> 1. For DETACHED mode, what I said there is about the DETACHED
>>> mode
>>> > > in
>>> > > > >>>> client side.
>>> > > > >>>> There are two configurations overload the item DETACHED[1].
>>> > > > >>>>
>>> > > > >>>> In client side, it means whether or not client.submitJob is
>>> > blocking
>>> > > > to
>>> > > > >>>> job execution result.
>>> > > > >>>> Due to client.submitJob returns CompletableFuture<JobClient>
>>> > > > >> NON-DETACHED
>>> > > > >>>> is no
>>> > > > >>>> power at all. Caller of submitJob makes the decision whether
>>> or
>>> > not
>>> > > > >>>> blocking to get the
>>> > > > >>>> JobClient and request for the job execution result. If client
>>> > > crashes,
>>> > > > >> it
>>> > > > >>>> is a user scope
>>> > > > >>>> exception that should be handled in user code; if client lost
>>> > > > connection
>>> > > > >>>> to cluster, we have
>>> > > > >>>> a retry times and interval configuration that automatically
>>> retry
>>> > > and
>>> > > > >>>> throws an user scope
>>> > > > >>>> exception if exceed.
>>> > > > >>>>
>>> > > > >>>> Your comment about poll for result or job result sounds like a
>>> > > concern
>>> > > > >> on
>>> > > > >>>> cluster side.
>>> > > > >>>>
>>> > > > >>>> In cluster side, DETACHED mode is alive only in JobCluster. If
>>> > > > DETACHED
>>> > > > >>>> configured,
>>> > > > >>>> JobCluster exits on job finished; if NON-DETACHED configured,
>>> > > > JobCluster
>>> > > > >>>> exits on job
>>> > > > >>>> execution result delivered. FLIP-74 doesn't stick to changes
>>> on
>>> > this
>>> > > > >>>> scope, it is just remained.
>>> > > > >>>>
>>> > > > >>>> However, it is an interesting part we can revisit this
>>> > > implementation
>>> > > > a
>>> > > > >>>> bit.
>>> > > > >>>>
>>> > > > >>>> <see the next email for compact reply in this one>
>>> > > > >>>>
>>> > > > >>>> 2. The retrieval of JobClient is so important that if we don't
>>> > have
>>> > > a
>>> > > > >> way
>>> > > > >>>> to retrieve JobClient it is
>>> > > > >>>> a dumb public user-facing interface(what a strange state :P).
>>> > > > >>>>
>>> > > > >>>> About the retrieval of JobClient, as mentioned in the
>>> document,
>>> > two
>>> > > > ways
>>> > > > >>>> should be supported.
>>> > > > >>>>
>>> > > > >>>> (1). Retrieved as return type of job submission.
>>> > > > >>>> (2). Retrieve a JobClient of existing job.(with job id)
>>> > > > >>>>
>>> > > > >>>> I highly respect your thoughts about how Executors should be
>>> and
>>> > > > >> thoughts
>>> > > > >>>> on multi-layered clients.
>>> > > > >>>> Although, (2) is not supported by public interfaces as
>>> summary of
>>> > > > >>>> discussion above, we can discuss
>>> > > > >>>> a bit on the place of Executors on multi-layered clients and
>>> find
>>> > a
>>> > > > way
>>> > > > >>>> to retrieve JobClient of
>>> > > > >>>> existing job with public client API. I will comment in FLIP-73
>>> > > > thread[2]
>>> > > > >>>> since it is almost about Executors.
>>> > > > >>>>
>>> > > > >>>> Best,
>>> > > > >>>> tison.
>>> > > > >>>>
>>> > > > >>>> [1]
>>> > > > >>>>
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
>>> > > > >>>> [2]
>>> > > > >>>>
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
>>> > > > >>>>
>>> > > > >>>>> Hi Tison,
>>> > > > >>>>>
>>> > > > >>>>> Thanks for the FLIP and launching the discussion!
>>> > > > >>>>>
>>> > > > >>>>> As a first note, big +1 on providing/exposing a JobClient to
>>> the
>>> > > > users!
>>> > > > >>>>>
>>> > > > >>>>> Some points that would be nice to be clarified:
>>> > > > >>>>> 1) You mention that we can get rid of the DETACHED mode: I
>>> agree
>>> > > that
>>> > > > >>>>> at a high level, given that everything will now be
>>> asynchronous,
>>> > > > there
>>> > > > >>>>> is no need to keep the DETACHED mode but I think we should
>>> > specify
>>> > > > >>>>> some aspects. For example, without the explicit separation
>>> of the
>>> > > > >>>>> modes, what happens when the job finishes. Does the client
>>> > > > >>>>> periodically poll for the result always or the result is
>>> pushed
>>> > > when
>>> > > > >>>>> in NON-DETACHED mode? What happens if the client disconnects
>>> and
>>> > > > >>>>> reconnects?
>>> > > > >>>>>
>>> > > > >>>>> 2) On the "how to retrieve a JobClient for a running Job", I
>>> > think
>>> > > > >>>>> this is related to the other discussion you opened in the ML
>>> > about
>>> > > > >>>>> multi-layered clients. First of all, I agree that exposing
>>> > > different
>>> > > > >>>>> "levels" of clients would be a nice addition, and actually
>>> there
>>> > > have
>>> > > > >>>>> been some discussions about doing so in the future. Now for
>>> this
>>> > > > >>>>> specific discussion:
>>> > > > >>>>>      i) I do not think that we should expose the
>>> > > > >>>>> ClusterDescriptor/ClusterSpecification to the user, as this
>>> ties
>>> > us
>>> > > > to
>>> > > > >>>>> a specific architecture which may change in the future.
>>> > > > >>>>>     ii) I do not think it should be the Executor that will
>>> > provide
>>> > > a
>>> > > > >>>>> JobClient for an already running job (only for the Jobs that
>>> it
>>> > > > >>>>> submits). The job of the executor should just be to
>>> execute() a
>>> > > > >>>>> pipeline.
>>> > > > >>>>>     iii) I think a solution that respects the separation of
>>> > > concerns
>>> > > > >>>>> could be the addition of another component (in the future),
>>> > > something
>>> > > > >>>>> like a ClientFactory, or ClusterFactory that will have
>>> methods
>>> > > like:
>>> > > > >>>>> ClusterClient createCluster(Configuration), JobClient
>>> > > > >>>>> retrieveJobClient(Configuration , JobId), maybe even
>>> (although
>>> > not
>>> > > > >>>>> sure) Executor getExecutor(Configuration ) and maybe more.
>>> This
>>> > > > >>>>> component would be responsible to interact with a cluster
>>> manager
>>> > > > like
>>> > > > >>>>> Yarn and do what is now being done by the ClusterDescriptor
>>> plus
>>> > > some
>>> > > > >>>>> more stuff.
>>> > > > >>>>>
>>> > > > >>>>> Although under the hood all these abstractions (Environments,
>>> > > > >>>>> Executors, ...) underneath use the same clients, I believe
>>> their
>>> > > > >>>>> job/existence is not contradicting but they simply hide some
>>> of
>>> > the
>>> > > > >>>>> complexity from the user, and give us, as developers some
>>> freedom
>>> > > to
>>> > > > >>>>> change in the future some of the parts. For example, the
>>> executor
>>> > > > will
>>> > > > >>>>> take a Pipeline, create a JobGraph and submit it, instead of
>>> > > > requiring
>>> > > > >>>>> the user to do each step separately. This allows us to, for
>>> > > example,
>>> > > > >>>>> get rid of the Plan if in the future everything is
>>> DataStream.
>>> > > > >>>>> Essentially, I think of these as layers of an onion with the
>>> > > clients
>>> > > > >>>>> being close to the core. The higher you go, the more
>>> > functionality
>>> > > is
>>> > > > >>>>> included and hidden from the public eye.
>>> > > > >>>>>
>>> > > > >>>>> Point iii) by the way is just a thought and by no means
>>> final. I
>>> > > also
>>> > > > >>>>> like the idea of multi-layered clients so this may spark up
>>> the
>>> > > > >>>>> discussion.
>>> > > > >>>>>
>>> > > > >>>>> Cheers,
>>> > > > >>>>> Kostas
>>> > > > >>>>>
>>> > > > >>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
>>> > > > [hidden email]>
>>> > > > >>>>> wrote:
>>> > > > >>>>>>
>>> > > > >>>>>> Hi Tison,
>>> > > > >>>>>>
>>> > > > >>>>>> Thanks for proposing the document! I had some comments on
>>> the
>>> > > > >> document.
>>> > > > >>>>>>
>>> > > > >>>>>> I think the only complex thing that we still need to figure
>>> out
>>> > is
>>> > > > >> how
>>> > > > >>>>> to get a JobClient for a job that is already running. As you
>>> > > > mentioned
>>> > > > >> in
>>> > > > >>>>> the document. Currently I’m thinking that its ok to add a
>>> method
>>> > to
>>> > > > >>>>> Executor for retrieving a JobClient for a running job by
>>> > providing
>>> > > an
>>> > > > >> ID.
>>> > > > >>>>> Let’s see what Kostas has to say on the topic.
>>> > > > >>>>>>
>>> > > > >>>>>> Best,
>>> > > > >>>>>> Aljoscha
>>> > > > >>>>>>
>>> > > > >>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]
>>> >
>>> > > wrote:
>>> > > > >>>>>>>
>>> > > > >>>>>>> Hi all,
>>> > > > >>>>>>>
>>> > > > >>>>>>> Summary from the discussion about introducing Flink
>>> JobClient
>>> > > > >> API[1]
>>> > > > >>>>> we
>>> > > > >>>>>>> draft FLIP-74[2] to
>>> > > > >>>>>>> gather thoughts and towards a standard public user-facing
>>> > > > >> interfaces.
>>> > > > >>>>>>>
>>> > > > >>>>>>> This discussion thread aims at standardizing job level
>>> client
>>> > > API.
>>> > > > >>>>> But I'd
>>> > > > >>>>>>> like to emphasize that
>>> > > > >>>>>>> how to retrieve JobClient possibly causes further
>>> discussion on
>>> > > > >>>>> different
>>> > > > >>>>>>> level clients exposed from
>>> > > > >>>>>>> Flink so that a following thread will be started later to
>>> > > > >> coordinate
>>> > > > >>>>>>> FLIP-73 and FLIP-74 on
>>> > > > >>>>>>> expose issue.
>>> > > > >>>>>>>
>>> > > > >>>>>>> Looking forward to your opinions.
>>> > > > >>>>>>>
>>> > > > >>>>>>> Best,
>>> > > > >>>>>>> tison.
>>> > > > >>>>>>>
>>> > > > >>>>>>> [1]
>>> > > > >>>>>>>
>>> > > > >>>>>
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>> > > > >>>>>>> [2]
>>> > > > >>>>>>>
>>> > > > >>>>>
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
>>> > > > >>>>>>
>>> > > > >>>>>
>>> > > > >>>>
>>> > > >
>>> > > >
>>> > >
>>> > > --
>>> > >
>>> > > Konstantin Knauf | Solutions Architect
>>> > >
>>> > > +49 160 91394525
>>> > >
>>> > >
>>> > > Follow us @VervericaData Ververica <https://www.ververica.com/>
>>> > >
>>> > >
>>> > > --
>>> > >
>>> > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> > > Conference
>>> > >
>>> > > Stream Processing | Event Driven | Real Time
>>> > >
>>> > > --
>>> > >
>>> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> > >
>>> > > --
>>> > > Ververica GmbH
>>> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>> Ji
>>> > > (Tony) Cheng
>>> > >
>>> >
>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>>
>>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>>
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Tony) Cheng
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
s/on the context if/on the context of/
s/dummy/dumb/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Kostas Kloudas-4
Hi Tison,

I see. Then I would say that as a first step, and to see if people are
happy with the result,
integration with the production code can be through a new method
executeAsync() in the Executor
that we discussed earlier.

This method could potentially be exposed to ExecutionEnvironment as a
new flavour of
the execute that returns a JobClient.

In the future we could consider exposing it through a
ClusterClientFactory (or sth similar).

What do you think
Kostas

On Thu, Oct 3, 2019 at 10:12 AM Zili Chen <[hidden email]> wrote:
>
> s/on the context if/on the context of/
> s/dummy/dumb/
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
Hi Kostas,

That is exactly how things go on in my mind and the reason I
say "integration to be a follow up discussion" :-)

Best,
tison.


Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午6:23写道:

> Hi Tison,
>
> I see. Then I would say that as a first step, and to see if people are
> happy with the result,
> integration with the production code can be through a new method
> executeAsync() in the Executor
> that we discussed earlier.
>
> This method could potentially be exposed to ExecutionEnvironment as a
> new flavour of
> the execute that returns a JobClient.
>
> In the future we could consider exposing it through a
> ClusterClientFactory (or sth similar).
>
> What do you think
> Kostas
>
> On Thu, Oct 3, 2019 at 10:12 AM Zili Chen <[hidden email]> wrote:
> >
> > s/on the context if/on the context of/
> > s/dummy/dumb/
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Aljoscha Krettek-2
In reply to this post by tison
This makes sense to me, yes!

> On 2. Oct 2019, at 20:35, Zili Chen <[hidden email]> wrote:
>
> Hi all,
>
> Narrow the scope to FLIP-74 we aimed at introduce a useful and extensible
> user-facing public interface JobClient. Let me reemphasize two major works
> under this thread.
>
> 1. standard interface
>
> As in FLIP-74 we introduce an interface JobClient with its methods, we'd
> like to
> make it a standard (non-final since we can always extends on demand)
> interface.
>
> On this branch I'd like to, with respect to Konstantin's suggestion, 1)
> exclude deprecated
> cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to stop
> to keep
> consistency with our CLI command. If there is no more concern on these
> topics I will
> update proposal tomorrow.
>
> 2. client interfaces are asynchronous
>
> If the asynchronous JobClient interfaces approved, a necessary proposed
> changed is
> corresponding update ClusterClient interfaces. Still ClusterClient is an
> internal concept
> after this FLIP but it might have some impact so I think it's better to
> reach a community
> consensus as prerequisite. Note that with all client methods are
> asynchronous, no matter
> whether or not we remove client side detach option it is no power.
>
> Let me know your ideas on these topic and keep moving forward :-)
>
> Best,
> tison.
>
>
> Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道:
>
>> Hi Konstantin,
>>
>> * should we add "cancelWithSavepeoint" to a new public API, when we have
>> deprecated the corresponding REST API/CLI methods? In my understanding
>> there is no reason to use it anymore.
>>
>> Good point. We can exclude "cancelWithSavepoint" from public API at least
>> for now,
>> since it is deprecated already. Let's see if there is other concerns.
>>
>> * should we call "stopWithSavepoint" simply "stop" as "stop" always
>> performs a savepoint?
>>
>> Well for naming issue I'm fine with that if it is a consensus of our
>> community. I can see
>> there is a "stop" CLI command which means "stop with savepoint".
>>
>> Best,
>> tison.
>>
>>
>> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道:
>>
>>> Hi Thomas,
>>>
>>> maybe there is a misunderstanding. There is no plan to deprecate anything
>>> in the REST API in the process of introducing the JobClient API, and it
>>> shouldn't.
>>>
>>> Since "cancel with savepoint" was already deprecated in the REST API and
>>> CLI, I am just raising the question whether to add it to the JobClient API
>>> in the first place.
>>>
>>> Best,
>>>
>>> Konstantin
>>>
>>>
>>>
>>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote:
>>>
>>>> I did not realize there was a plan to deprecate anything in the REST
>>> API?
>>>>
>>>> The REST API is super important for tooling written in non JVM
>>> languages,
>>>> that does not include a Flink client (like FlinkK8sOperator). The REST
>>> API
>>>> should continue to support all job management operations, including job
>>>> submission.
>>>>
>>>> Thomas
>>>>
>>>>
>>>> On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <
>>> [hidden email]
>>>>>
>>>> wrote:
>>>>
>>>>> Hi Zili,
>>>>>
>>>>> thanks for working on this topic. Just read through the FLIP and I
>>> have
>>>> two
>>>>> questions:
>>>>>
>>>>> * should we add "cancelWithSavepeoint" to a new public API, when we
>>> have
>>>>> deprecated the corresponding REST API/CLI methods? In my understanding
>>>>> there is no reason to use it anymore.
>>>>> * should we call "stopWithSavepoint" simply "stop" as "stop" always
>>>>> performs a savepoint?
>>>>>
>>>>> Best,
>>>>>
>>>>> Konstantin
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <
>>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> I agree that this would be good to have. But I also think that this
>>> is
>>>>>> outside the scope of FLIP-74, I think it is an orthogonal feature.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>> On 27. Sep 2019, at 10:31, Flavio Pompermaier <
>>> [hidden email]>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>> just a remark about the Flink REST APIs (and its client as well):
>>>>> almost
>>>>>>> all the times we need a way to dynamically know which jobs are
>>>>> contained
>>>>>> in
>>>>>>> a jar file, and this could be exposed by the REST endpoint under
>>>>>>> /jars/:jarid/entry-points (a simple way to implement this would
>>> be to
>>>>>> check
>>>>>>> the value of Main-class or Main-classes inside the Manifest of the
>>>> jar
>>>>> if
>>>>>>> they exists [1]).
>>>>>>>
>>>>>>> I understand that this is something that is not strictly required
>>> to
>>>>>>> execute Flink jobs but IMHO it would ease A LOT the work of UI
>>>>> developers
>>>>>>> that could have a way to show the users all available jobs inside
>>> a
>>>>> jar +
>>>>>>> their configurable parameters.
>>>>>>> For example, right now in the WebUI, you can upload a jar and then
>>>> you
>>>>>> have
>>>>>>> to set (without any autocomplete or UI support) the main class and
>>>>> their
>>>>>>> params (for example using a string like --param1 xx --param2 yy).
>>>>>>> Adding this functionality to the REST API and the respective
>>> client
>>>>> would
>>>>>>> enable the WebUI (and all UIs interacting with a Flink cluster) to
>>>>>> prefill
>>>>>>> a dropdown list containing the list of entry-point classes (i.e.
>>>> Flink
>>>>>>> jobs) and, once selected, their required (typed) parameters.
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10864
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]>
>>>>> wrote:
>>>>>>>
>>>>>>>> modify
>>>>>>>>
>>>>>>>> /we just shutdown the cluster on the exit of client that running
>>>>> inside
>>>>>>>> cluster/
>>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>> we just shutdown the cluster on both the exit of client that
>>> running
>>>>>> inside
>>>>>>>> cluster and the finish of job.
>>>>>>>> Since client is running inside cluster we can easily wait for the
>>>> end
>>>>> of
>>>>>>>> two both in ClusterEntrypoint.
>>>>>>>>
>>>>>>>>
>>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
>>>>>>>>
>>>>>>>>> About JobCluster
>>>>>>>>>
>>>>>>>>> Actually I am not quite sure what we gains from DETACHED
>>>>> configuration
>>>>>> on
>>>>>>>>> cluster side.
>>>>>>>>> We don't have a NON-DETACHED JobCluster in fact in our codebase,
>>>>> right?
>>>>>>>>>
>>>>>>>>> It comes to me one major questions we have to answer first.
>>>>>>>>>
>>>>>>>>> *What JobCluster conceptually is exactly*
>>>>>>>>>
>>>>>>>>> Related discussion can be found in JIRA[1] and mailing list[2].
>>>>> Stephan
>>>>>>>>> gives a nice
>>>>>>>>> description of JobCluster:
>>>>>>>>>
>>>>>>>>> Two things to add: - The job mode is very nice in the way that
>>> it
>>>>> runs
>>>>>>>> the
>>>>>>>>> client inside the cluster (in the same image/process that is the
>>>> JM)
>>>>>> and
>>>>>>>>> thus unifies both applications and what the Spark world calls
>>> the
>>>>>> "driver
>>>>>>>>> mode". - Another thing I would add is that during the FLIP-6
>>>> design,
>>>>> we
>>>>>>>>> were thinking about setups where Dispatcher and JobManager are
>>>>> separate
>>>>>>>>> processes. A Yarn or Mesos Dispatcher of a session could run
>>>>>>>> independently
>>>>>>>>> (even as privileged processes executing no code). Then you the
>>>>>> "per-job"
>>>>>>>>> mode could still be helpful: when a job is submitted to the
>>>>> dispatcher,
>>>>>>>> it
>>>>>>>>> launches the JM again in a per-job mode, so that JM and TM
>>>> processes
>>>>>> are
>>>>>>>>> bound to teh job only. For higher security setups, it is
>>> important
>>>>> that
>>>>>>>>> processes are not reused across jobs.
>>>>>>>>>
>>>>>>>>> However, currently in "per-job" mode we generate JobGraph in
>>> client
>>>>>> side,
>>>>>>>>> launching
>>>>>>>>> the JobCluster and retrieve the JobGraph for execution. So
>>>> actually,
>>>>> we
>>>>>>>>> don't "run the
>>>>>>>>> client inside the cluster".
>>>>>>>>>
>>>>>>>>> Besides, refer to the discussion with Till[1], it would be
>>> helpful
>>>> we
>>>>>>>>> follow the same process
>>>>>>>>> of session mode for that of "per-job" mode in user perspective,
>>>> that
>>>>> we
>>>>>>>>> don't use
>>>>>>>>> OptimizedPlanEnvironment to create JobGraph, but directly deploy
>>>>> Flink
>>>>>>>>> cluster in env.execute.
>>>>>>>>>
>>>>>>>>> Generally 2 points
>>>>>>>>>
>>>>>>>>> 1. Running Flink job by invoke user main method and execute
>>>>> throughout,
>>>>>>>>> instead of create
>>>>>>>>> JobGraph from main-class.
>>>>>>>>> 2. Run the client inside the cluster.
>>>>>>>>>
>>>>>>>>> If 1 and 2 are implemented. There is obvious no need for
>>> DETACHED
>>>>> mode
>>>>>> in
>>>>>>>>> cluster side
>>>>>>>>> because we just shutdown the cluster on the exit of client that
>>>>> running
>>>>>>>>> inside cluster. Whether
>>>>>>>>> or not delivered the result is up to user code.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
>>>>>>>>> [2]
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
>>>>>>>>>
>>>>>>>>>> Thanks for your replies Kostas & Aljoscha!
>>>>>>>>>>
>>>>>>>>>> Below are replies point by point.
>>>>>>>>>>
>>>>>>>>>> 1. For DETACHED mode, what I said there is about the DETACHED
>>> mode
>>>>> in
>>>>>>>>>> client side.
>>>>>>>>>> There are two configurations overload the item DETACHED[1].
>>>>>>>>>>
>>>>>>>>>> In client side, it means whether or not client.submitJob is
>>>> blocking
>>>>>> to
>>>>>>>>>> job execution result.
>>>>>>>>>> Due to client.submitJob returns CompletableFuture<JobClient>
>>>>>>>> NON-DETACHED
>>>>>>>>>> is no
>>>>>>>>>> power at all. Caller of submitJob makes the decision whether or
>>>> not
>>>>>>>>>> blocking to get the
>>>>>>>>>> JobClient and request for the job execution result. If client
>>>>> crashes,
>>>>>>>> it
>>>>>>>>>> is a user scope
>>>>>>>>>> exception that should be handled in user code; if client lost
>>>>>> connection
>>>>>>>>>> to cluster, we have
>>>>>>>>>> a retry times and interval configuration that automatically
>>> retry
>>>>> and
>>>>>>>>>> throws an user scope
>>>>>>>>>> exception if exceed.
>>>>>>>>>>
>>>>>>>>>> Your comment about poll for result or job result sounds like a
>>>>> concern
>>>>>>>> on
>>>>>>>>>> cluster side.
>>>>>>>>>>
>>>>>>>>>> In cluster side, DETACHED mode is alive only in JobCluster. If
>>>>>> DETACHED
>>>>>>>>>> configured,
>>>>>>>>>> JobCluster exits on job finished; if NON-DETACHED configured,
>>>>>> JobCluster
>>>>>>>>>> exits on job
>>>>>>>>>> execution result delivered. FLIP-74 doesn't stick to changes on
>>>> this
>>>>>>>>>> scope, it is just remained.
>>>>>>>>>>
>>>>>>>>>> However, it is an interesting part we can revisit this
>>>>> implementation
>>>>>> a
>>>>>>>>>> bit.
>>>>>>>>>>
>>>>>>>>>> <see the next email for compact reply in this one>
>>>>>>>>>>
>>>>>>>>>> 2. The retrieval of JobClient is so important that if we don't
>>>> have
>>>>> a
>>>>>>>> way
>>>>>>>>>> to retrieve JobClient it is
>>>>>>>>>> a dumb public user-facing interface(what a strange state :P).
>>>>>>>>>>
>>>>>>>>>> About the retrieval of JobClient, as mentioned in the document,
>>>> two
>>>>>> ways
>>>>>>>>>> should be supported.
>>>>>>>>>>
>>>>>>>>>> (1). Retrieved as return type of job submission.
>>>>>>>>>> (2). Retrieve a JobClient of existing job.(with job id)
>>>>>>>>>>
>>>>>>>>>> I highly respect your thoughts about how Executors should be
>>> and
>>>>>>>> thoughts
>>>>>>>>>> on multi-layered clients.
>>>>>>>>>> Although, (2) is not supported by public interfaces as summary
>>> of
>>>>>>>>>> discussion above, we can discuss
>>>>>>>>>> a bit on the place of Executors on multi-layered clients and
>>> find
>>>> a
>>>>>> way
>>>>>>>>>> to retrieve JobClient of
>>>>>>>>>> existing job with public client API. I will comment in FLIP-73
>>>>>> thread[2]
>>>>>>>>>> since it is almost about Executors.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> tison.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
>>>>>>>>>> [2]
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
>>>>>>>>>>
>>>>>>>>>>> Hi Tison,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the FLIP and launching the discussion!
>>>>>>>>>>>
>>>>>>>>>>> As a first note, big +1 on providing/exposing a JobClient to
>>> the
>>>>>> users!
>>>>>>>>>>>
>>>>>>>>>>> Some points that would be nice to be clarified:
>>>>>>>>>>> 1) You mention that we can get rid of the DETACHED mode: I
>>> agree
>>>>> that
>>>>>>>>>>> at a high level, given that everything will now be
>>> asynchronous,
>>>>>> there
>>>>>>>>>>> is no need to keep the DETACHED mode but I think we should
>>>> specify
>>>>>>>>>>> some aspects. For example, without the explicit separation of
>>> the
>>>>>>>>>>> modes, what happens when the job finishes. Does the client
>>>>>>>>>>> periodically poll for the result always or the result is
>>> pushed
>>>>> when
>>>>>>>>>>> in NON-DETACHED mode? What happens if the client disconnects
>>> and
>>>>>>>>>>> reconnects?
>>>>>>>>>>>
>>>>>>>>>>> 2) On the "how to retrieve a JobClient for a running Job", I
>>>> think
>>>>>>>>>>> this is related to the other discussion you opened in the ML
>>>> about
>>>>>>>>>>> multi-layered clients. First of all, I agree that exposing
>>>>> different
>>>>>>>>>>> "levels" of clients would be a nice addition, and actually
>>> there
>>>>> have
>>>>>>>>>>> been some discussions about doing so in the future. Now for
>>> this
>>>>>>>>>>> specific discussion:
>>>>>>>>>>>     i) I do not think that we should expose the
>>>>>>>>>>> ClusterDescriptor/ClusterSpecification to the user, as this
>>> ties
>>>> us
>>>>>> to
>>>>>>>>>>> a specific architecture which may change in the future.
>>>>>>>>>>>    ii) I do not think it should be the Executor that will
>>>> provide
>>>>> a
>>>>>>>>>>> JobClient for an already running job (only for the Jobs that
>>> it
>>>>>>>>>>> submits). The job of the executor should just be to execute()
>>> a
>>>>>>>>>>> pipeline.
>>>>>>>>>>>    iii) I think a solution that respects the separation of
>>>>> concerns
>>>>>>>>>>> could be the addition of another component (in the future),
>>>>> something
>>>>>>>>>>> like a ClientFactory, or ClusterFactory that will have methods
>>>>> like:
>>>>>>>>>>> ClusterClient createCluster(Configuration), JobClient
>>>>>>>>>>> retrieveJobClient(Configuration , JobId), maybe even (although
>>>> not
>>>>>>>>>>> sure) Executor getExecutor(Configuration ) and maybe more.
>>> This
>>>>>>>>>>> component would be responsible to interact with a cluster
>>> manager
>>>>>> like
>>>>>>>>>>> Yarn and do what is now being done by the ClusterDescriptor
>>> plus
>>>>> some
>>>>>>>>>>> more stuff.
>>>>>>>>>>>
>>>>>>>>>>> Although under the hood all these abstractions (Environments,
>>>>>>>>>>> Executors, ...) underneath use the same clients, I believe
>>> their
>>>>>>>>>>> job/existence is not contradicting but they simply hide some
>>> of
>>>> the
>>>>>>>>>>> complexity from the user, and give us, as developers some
>>> freedom
>>>>> to
>>>>>>>>>>> change in the future some of the parts. For example, the
>>> executor
>>>>>> will
>>>>>>>>>>> take a Pipeline, create a JobGraph and submit it, instead of
>>>>>> requiring
>>>>>>>>>>> the user to do each step separately. This allows us to, for
>>>>> example,
>>>>>>>>>>> get rid of the Plan if in the future everything is DataStream.
>>>>>>>>>>> Essentially, I think of these as layers of an onion with the
>>>>> clients
>>>>>>>>>>> being close to the core. The higher you go, the more
>>>> functionality
>>>>> is
>>>>>>>>>>> included and hidden from the public eye.
>>>>>>>>>>>
>>>>>>>>>>> Point iii) by the way is just a thought and by no means
>>> final. I
>>>>> also
>>>>>>>>>>> like the idea of multi-layered clients so this may spark up
>>> the
>>>>>>>>>>> discussion.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
>>>>>> [hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Tison,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for proposing the document! I had some comments on the
>>>>>>>> document.
>>>>>>>>>>>>
>>>>>>>>>>>> I think the only complex thing that we still need to figure
>>> out
>>>> is
>>>>>>>> how
>>>>>>>>>>> to get a JobClient for a job that is already running. As you
>>>>>> mentioned
>>>>>>>> in
>>>>>>>>>>> the document. Currently I’m thinking that its ok to add a
>>> method
>>>> to
>>>>>>>>>>> Executor for retrieving a JobClient for a running job by
>>>> providing
>>>>> an
>>>>>>>> ID.
>>>>>>>>>>> Let’s see what Kostas has to say on the topic.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>
>>>>>>>>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]>
>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Summary from the discussion about introducing Flink
>>> JobClient
>>>>>>>> API[1]
>>>>>>>>>>> we
>>>>>>>>>>>>> draft FLIP-74[2] to
>>>>>>>>>>>>> gather thoughts and towards a standard public user-facing
>>>>>>>> interfaces.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This discussion thread aims at standardizing job level
>>> client
>>>>> API.
>>>>>>>>>>> But I'd
>>>>>>>>>>>>> like to emphasize that
>>>>>>>>>>>>> how to retrieve JobClient possibly causes further
>>> discussion on
>>>>>>>>>>> different
>>>>>>>>>>>>> level clients exposed from
>>>>>>>>>>>>> Flink so that a following thread will be started later to
>>>>>>>> coordinate
>>>>>>>>>>>>> FLIP-73 and FLIP-74 on
>>>>>>>>>>>>> expose issue.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Looking forward to your opinions.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> tison.
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Konstantin Knauf | Solutions Architect
>>>>>
>>>>> +49 160 91394525
>>>>>
>>>>>
>>>>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>> Ji
>>>>> (Tony) Cheng
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>>
>>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>>
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Tony) Cheng
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

Kostas Kloudas-4
I also agree @Zili Chen !

On Fri, Oct 4, 2019 at 10:17 AM Aljoscha Krettek <[hidden email]> wrote:

>
> This makes sense to me, yes!
>
> > On 2. Oct 2019, at 20:35, Zili Chen <[hidden email]> wrote:
> >
> > Hi all,
> >
> > Narrow the scope to FLIP-74 we aimed at introduce a useful and extensible
> > user-facing public interface JobClient. Let me reemphasize two major works
> > under this thread.
> >
> > 1. standard interface
> >
> > As in FLIP-74 we introduce an interface JobClient with its methods, we'd
> > like to
> > make it a standard (non-final since we can always extends on demand)
> > interface.
> >
> > On this branch I'd like to, with respect to Konstantin's suggestion, 1)
> > exclude deprecated
> > cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to stop
> > to keep
> > consistency with our CLI command. If there is no more concern on these
> > topics I will
> > update proposal tomorrow.
> >
> > 2. client interfaces are asynchronous
> >
> > If the asynchronous JobClient interfaces approved, a necessary proposed
> > changed is
> > corresponding update ClusterClient interfaces. Still ClusterClient is an
> > internal concept
> > after this FLIP but it might have some impact so I think it's better to
> > reach a community
> > consensus as prerequisite. Note that with all client methods are
> > asynchronous, no matter
> > whether or not we remove client side detach option it is no power.
> >
> > Let me know your ideas on these topic and keep moving forward :-)
> >
> > Best,
> > tison.
> >
> >
> > Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道:
> >
> >> Hi Konstantin,
> >>
> >> * should we add "cancelWithSavepeoint" to a new public API, when we have
> >> deprecated the corresponding REST API/CLI methods? In my understanding
> >> there is no reason to use it anymore.
> >>
> >> Good point. We can exclude "cancelWithSavepoint" from public API at least
> >> for now,
> >> since it is deprecated already. Let's see if there is other concerns.
> >>
> >> * should we call "stopWithSavepoint" simply "stop" as "stop" always
> >> performs a savepoint?
> >>
> >> Well for naming issue I'm fine with that if it is a consensus of our
> >> community. I can see
> >> there is a "stop" CLI command which means "stop with savepoint".
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道:
> >>
> >>> Hi Thomas,
> >>>
> >>> maybe there is a misunderstanding. There is no plan to deprecate anything
> >>> in the REST API in the process of introducing the JobClient API, and it
> >>> shouldn't.
> >>>
> >>> Since "cancel with savepoint" was already deprecated in the REST API and
> >>> CLI, I am just raising the question whether to add it to the JobClient API
> >>> in the first place.
> >>>
> >>> Best,
> >>>
> >>> Konstantin
> >>>
> >>>
> >>>
> >>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote:
> >>>
> >>>> I did not realize there was a plan to deprecate anything in the REST
> >>> API?
> >>>>
> >>>> The REST API is super important for tooling written in non JVM
> >>> languages,
> >>>> that does not include a Flink client (like FlinkK8sOperator). The REST
> >>> API
> >>>> should continue to support all job management operations, including job
> >>>> submission.
> >>>>
> >>>> Thomas
> >>>>
> >>>>
> >>>> On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <
> >>> [hidden email]
> >>>>>
> >>>> wrote:
> >>>>
> >>>>> Hi Zili,
> >>>>>
> >>>>> thanks for working on this topic. Just read through the FLIP and I
> >>> have
> >>>> two
> >>>>> questions:
> >>>>>
> >>>>> * should we add "cancelWithSavepeoint" to a new public API, when we
> >>> have
> >>>>> deprecated the corresponding REST API/CLI methods? In my understanding
> >>>>> there is no reason to use it anymore.
> >>>>> * should we call "stopWithSavepoint" simply "stop" as "stop" always
> >>>>> performs a savepoint?
> >>>>>
> >>>>> Best,
> >>>>>
> >>>>> Konstantin
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <
> >>> [hidden email]>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Flavio,
> >>>>>>
> >>>>>> I agree that this would be good to have. But I also think that this
> >>> is
> >>>>>> outside the scope of FLIP-74, I think it is an orthogonal feature.
> >>>>>>
> >>>>>> Best,
> >>>>>> Aljoscha
> >>>>>>
> >>>>>>> On 27. Sep 2019, at 10:31, Flavio Pompermaier <
> >>> [hidden email]>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi all,
> >>>>>>> just a remark about the Flink REST APIs (and its client as well):
> >>>>> almost
> >>>>>>> all the times we need a way to dynamically know which jobs are
> >>>>> contained
> >>>>>> in
> >>>>>>> a jar file, and this could be exposed by the REST endpoint under
> >>>>>>> /jars/:jarid/entry-points (a simple way to implement this would
> >>> be to
> >>>>>> check
> >>>>>>> the value of Main-class or Main-classes inside the Manifest of the
> >>>> jar
> >>>>> if
> >>>>>>> they exists [1]).
> >>>>>>>
> >>>>>>> I understand that this is something that is not strictly required
> >>> to
> >>>>>>> execute Flink jobs but IMHO it would ease A LOT the work of UI
> >>>>> developers
> >>>>>>> that could have a way to show the users all available jobs inside
> >>> a
> >>>>> jar +
> >>>>>>> their configurable parameters.
> >>>>>>> For example, right now in the WebUI, you can upload a jar and then
> >>>> you
> >>>>>> have
> >>>>>>> to set (without any autocomplete or UI support) the main class and
> >>>>> their
> >>>>>>> params (for example using a string like --param1 xx --param2 yy).
> >>>>>>> Adding this functionality to the REST API and the respective
> >>> client
> >>>>> would
> >>>>>>> enable the WebUI (and all UIs interacting with a Flink cluster) to
> >>>>>> prefill
> >>>>>>> a dropdown list containing the list of entry-point classes (i.e.
> >>>> Flink
> >>>>>>> jobs) and, once selected, their required (typed) parameters.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Flavio
> >>>>>>>
> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10864
> >>>>>>>
> >>>>>>> On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> modify
> >>>>>>>>
> >>>>>>>> /we just shutdown the cluster on the exit of client that running
> >>>>> inside
> >>>>>>>> cluster/
> >>>>>>>>
> >>>>>>>> to
> >>>>>>>>
> >>>>>>>> we just shutdown the cluster on both the exit of client that
> >>> running
> >>>>>> inside
> >>>>>>>> cluster and the finish of job.
> >>>>>>>> Since client is running inside cluster we can easily wait for the
> >>>> end
> >>>>> of
> >>>>>>>> two both in ClusterEntrypoint.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
> >>>>>>>>
> >>>>>>>>> About JobCluster
> >>>>>>>>>
> >>>>>>>>> Actually I am not quite sure what we gains from DETACHED
> >>>>> configuration
> >>>>>> on
> >>>>>>>>> cluster side.
> >>>>>>>>> We don't have a NON-DETACHED JobCluster in fact in our codebase,
> >>>>> right?
> >>>>>>>>>
> >>>>>>>>> It comes to me one major questions we have to answer first.
> >>>>>>>>>
> >>>>>>>>> *What JobCluster conceptually is exactly*
> >>>>>>>>>
> >>>>>>>>> Related discussion can be found in JIRA[1] and mailing list[2].
> >>>>> Stephan
> >>>>>>>>> gives a nice
> >>>>>>>>> description of JobCluster:
> >>>>>>>>>
> >>>>>>>>> Two things to add: - The job mode is very nice in the way that
> >>> it
> >>>>> runs
> >>>>>>>> the
> >>>>>>>>> client inside the cluster (in the same image/process that is the
> >>>> JM)
> >>>>>> and
> >>>>>>>>> thus unifies both applications and what the Spark world calls
> >>> the
> >>>>>> "driver
> >>>>>>>>> mode". - Another thing I would add is that during the FLIP-6
> >>>> design,
> >>>>> we
> >>>>>>>>> were thinking about setups where Dispatcher and JobManager are
> >>>>> separate
> >>>>>>>>> processes. A Yarn or Mesos Dispatcher of a session could run
> >>>>>>>> independently
> >>>>>>>>> (even as privileged processes executing no code). Then you the
> >>>>>> "per-job"
> >>>>>>>>> mode could still be helpful: when a job is submitted to the
> >>>>> dispatcher,
> >>>>>>>> it
> >>>>>>>>> launches the JM again in a per-job mode, so that JM and TM
> >>>> processes
> >>>>>> are
> >>>>>>>>> bound to teh job only. For higher security setups, it is
> >>> important
> >>>>> that
> >>>>>>>>> processes are not reused across jobs.
> >>>>>>>>>
> >>>>>>>>> However, currently in "per-job" mode we generate JobGraph in
> >>> client
> >>>>>> side,
> >>>>>>>>> launching
> >>>>>>>>> the JobCluster and retrieve the JobGraph for execution. So
> >>>> actually,
> >>>>> we
> >>>>>>>>> don't "run the
> >>>>>>>>> client inside the cluster".
> >>>>>>>>>
> >>>>>>>>> Besides, refer to the discussion with Till[1], it would be
> >>> helpful
> >>>> we
> >>>>>>>>> follow the same process
> >>>>>>>>> of session mode for that of "per-job" mode in user perspective,
> >>>> that
> >>>>> we
> >>>>>>>>> don't use
> >>>>>>>>> OptimizedPlanEnvironment to create JobGraph, but directly deploy
> >>>>> Flink
> >>>>>>>>> cluster in env.execute.
> >>>>>>>>>
> >>>>>>>>> Generally 2 points
> >>>>>>>>>
> >>>>>>>>> 1. Running Flink job by invoke user main method and execute
> >>>>> throughout,
> >>>>>>>>> instead of create
> >>>>>>>>> JobGraph from main-class.
> >>>>>>>>> 2. Run the client inside the cluster.
> >>>>>>>>>
> >>>>>>>>> If 1 and 2 are implemented. There is obvious no need for
> >>> DETACHED
> >>>>> mode
> >>>>>> in
> >>>>>>>>> cluster side
> >>>>>>>>> because we just shutdown the cluster on the exit of client that
> >>>>> running
> >>>>>>>>> inside cluster. Whether
> >>>>>>>>> or not delivered the result is up to user code.
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
> >>>>>>>>> [2]
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
> >>>>>>>>>
> >>>>>>>>>> Thanks for your replies Kostas & Aljoscha!
> >>>>>>>>>>
> >>>>>>>>>> Below are replies point by point.
> >>>>>>>>>>
> >>>>>>>>>> 1. For DETACHED mode, what I said there is about the DETACHED
> >>> mode
> >>>>> in
> >>>>>>>>>> client side.
> >>>>>>>>>> There are two configurations overload the item DETACHED[1].
> >>>>>>>>>>
> >>>>>>>>>> In client side, it means whether or not client.submitJob is
> >>>> blocking
> >>>>>> to
> >>>>>>>>>> job execution result.
> >>>>>>>>>> Due to client.submitJob returns CompletableFuture<JobClient>
> >>>>>>>> NON-DETACHED
> >>>>>>>>>> is no
> >>>>>>>>>> power at all. Caller of submitJob makes the decision whether or
> >>>> not
> >>>>>>>>>> blocking to get the
> >>>>>>>>>> JobClient and request for the job execution result. If client
> >>>>> crashes,
> >>>>>>>> it
> >>>>>>>>>> is a user scope
> >>>>>>>>>> exception that should be handled in user code; if client lost
> >>>>>> connection
> >>>>>>>>>> to cluster, we have
> >>>>>>>>>> a retry times and interval configuration that automatically
> >>> retry
> >>>>> and
> >>>>>>>>>> throws an user scope
> >>>>>>>>>> exception if exceed.
> >>>>>>>>>>
> >>>>>>>>>> Your comment about poll for result or job result sounds like a
> >>>>> concern
> >>>>>>>> on
> >>>>>>>>>> cluster side.
> >>>>>>>>>>
> >>>>>>>>>> In cluster side, DETACHED mode is alive only in JobCluster. If
> >>>>>> DETACHED
> >>>>>>>>>> configured,
> >>>>>>>>>> JobCluster exits on job finished; if NON-DETACHED configured,
> >>>>>> JobCluster
> >>>>>>>>>> exits on job
> >>>>>>>>>> execution result delivered. FLIP-74 doesn't stick to changes on
> >>>> this
> >>>>>>>>>> scope, it is just remained.
> >>>>>>>>>>
> >>>>>>>>>> However, it is an interesting part we can revisit this
> >>>>> implementation
> >>>>>> a
> >>>>>>>>>> bit.
> >>>>>>>>>>
> >>>>>>>>>> <see the next email for compact reply in this one>
> >>>>>>>>>>
> >>>>>>>>>> 2. The retrieval of JobClient is so important that if we don't
> >>>> have
> >>>>> a
> >>>>>>>> way
> >>>>>>>>>> to retrieve JobClient it is
> >>>>>>>>>> a dumb public user-facing interface(what a strange state :P).
> >>>>>>>>>>
> >>>>>>>>>> About the retrieval of JobClient, as mentioned in the document,
> >>>> two
> >>>>>> ways
> >>>>>>>>>> should be supported.
> >>>>>>>>>>
> >>>>>>>>>> (1). Retrieved as return type of job submission.
> >>>>>>>>>> (2). Retrieve a JobClient of existing job.(with job id)
> >>>>>>>>>>
> >>>>>>>>>> I highly respect your thoughts about how Executors should be
> >>> and
> >>>>>>>> thoughts
> >>>>>>>>>> on multi-layered clients.
> >>>>>>>>>> Although, (2) is not supported by public interfaces as summary
> >>> of
> >>>>>>>>>> discussion above, we can discuss
> >>>>>>>>>> a bit on the place of Executors on multi-layered clients and
> >>> find
> >>>> a
> >>>>>> way
> >>>>>>>>>> to retrieve JobClient of
> >>>>>>>>>> existing job with public client API. I will comment in FLIP-73
> >>>>>> thread[2]
> >>>>>>>>>> since it is almost about Executors.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> tison.
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
> >>>>>>>>>> [2]
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Tison,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the FLIP and launching the discussion!
> >>>>>>>>>>>
> >>>>>>>>>>> As a first note, big +1 on providing/exposing a JobClient to
> >>> the
> >>>>>> users!
> >>>>>>>>>>>
> >>>>>>>>>>> Some points that would be nice to be clarified:
> >>>>>>>>>>> 1) You mention that we can get rid of the DETACHED mode: I
> >>> agree
> >>>>> that
> >>>>>>>>>>> at a high level, given that everything will now be
> >>> asynchronous,
> >>>>>> there
> >>>>>>>>>>> is no need to keep the DETACHED mode but I think we should
> >>>> specify
> >>>>>>>>>>> some aspects. For example, without the explicit separation of
> >>> the
> >>>>>>>>>>> modes, what happens when the job finishes. Does the client
> >>>>>>>>>>> periodically poll for the result always or the result is
> >>> pushed
> >>>>> when
> >>>>>>>>>>> in NON-DETACHED mode? What happens if the client disconnects
> >>> and
> >>>>>>>>>>> reconnects?
> >>>>>>>>>>>
> >>>>>>>>>>> 2) On the "how to retrieve a JobClient for a running Job", I
> >>>> think
> >>>>>>>>>>> this is related to the other discussion you opened in the ML
> >>>> about
> >>>>>>>>>>> multi-layered clients. First of all, I agree that exposing
> >>>>> different
> >>>>>>>>>>> "levels" of clients would be a nice addition, and actually
> >>> there
> >>>>> have
> >>>>>>>>>>> been some discussions about doing so in the future. Now for
> >>> this
> >>>>>>>>>>> specific discussion:
> >>>>>>>>>>>     i) I do not think that we should expose the
> >>>>>>>>>>> ClusterDescriptor/ClusterSpecification to the user, as this
> >>> ties
> >>>> us
> >>>>>> to
> >>>>>>>>>>> a specific architecture which may change in the future.
> >>>>>>>>>>>    ii) I do not think it should be the Executor that will
> >>>> provide
> >>>>> a
> >>>>>>>>>>> JobClient for an already running job (only for the Jobs that
> >>> it
> >>>>>>>>>>> submits). The job of the executor should just be to execute()
> >>> a
> >>>>>>>>>>> pipeline.
> >>>>>>>>>>>    iii) I think a solution that respects the separation of
> >>>>> concerns
> >>>>>>>>>>> could be the addition of another component (in the future),
> >>>>> something
> >>>>>>>>>>> like a ClientFactory, or ClusterFactory that will have methods
> >>>>> like:
> >>>>>>>>>>> ClusterClient createCluster(Configuration), JobClient
> >>>>>>>>>>> retrieveJobClient(Configuration , JobId), maybe even (although
> >>>> not
> >>>>>>>>>>> sure) Executor getExecutor(Configuration ) and maybe more.
> >>> This
> >>>>>>>>>>> component would be responsible to interact with a cluster
> >>> manager
> >>>>>> like
> >>>>>>>>>>> Yarn and do what is now being done by the ClusterDescriptor
> >>> plus
> >>>>> some
> >>>>>>>>>>> more stuff.
> >>>>>>>>>>>
> >>>>>>>>>>> Although under the hood all these abstractions (Environments,
> >>>>>>>>>>> Executors, ...) underneath use the same clients, I believe
> >>> their
> >>>>>>>>>>> job/existence is not contradicting but they simply hide some
> >>> of
> >>>> the
> >>>>>>>>>>> complexity from the user, and give us, as developers some
> >>> freedom
> >>>>> to
> >>>>>>>>>>> change in the future some of the parts. For example, the
> >>> executor
> >>>>>> will
> >>>>>>>>>>> take a Pipeline, create a JobGraph and submit it, instead of
> >>>>>> requiring
> >>>>>>>>>>> the user to do each step separately. This allows us to, for
> >>>>> example,
> >>>>>>>>>>> get rid of the Plan if in the future everything is DataStream.
> >>>>>>>>>>> Essentially, I think of these as layers of an onion with the
> >>>>> clients
> >>>>>>>>>>> being close to the core. The higher you go, the more
> >>>> functionality
> >>>>> is
> >>>>>>>>>>> included and hidden from the public eye.
> >>>>>>>>>>>
> >>>>>>>>>>> Point iii) by the way is just a thought and by no means
> >>> final. I
> >>>>> also
> >>>>>>>>>>> like the idea of multi-layered clients so this may spark up
> >>> the
> >>>>>>>>>>> discussion.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Kostas
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
> >>>>>> [hidden email]>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi Tison,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for proposing the document! I had some comments on the
> >>>>>>>> document.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think the only complex thing that we still need to figure
> >>> out
> >>>> is
> >>>>>>>> how
> >>>>>>>>>>> to get a JobClient for a job that is already running. As you
> >>>>>> mentioned
> >>>>>>>> in
> >>>>>>>>>>> the document. Currently I’m thinking that its ok to add a
> >>> method
> >>>> to
> >>>>>>>>>>> Executor for retrieving a JobClient for a running job by
> >>>> providing
> >>>>> an
> >>>>>>>> ID.
> >>>>>>>>>>> Let’s see what Kostas has to say on the topic.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]>
> >>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Summary from the discussion about introducing Flink
> >>> JobClient
> >>>>>>>> API[1]
> >>>>>>>>>>> we
> >>>>>>>>>>>>> draft FLIP-74[2] to
> >>>>>>>>>>>>> gather thoughts and towards a standard public user-facing
> >>>>>>>> interfaces.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This discussion thread aims at standardizing job level
> >>> client
> >>>>> API.
> >>>>>>>>>>> But I'd
> >>>>>>>>>>>>> like to emphasize that
> >>>>>>>>>>>>> how to retrieve JobClient possibly causes further
> >>> discussion on
> >>>>>>>>>>> different
> >>>>>>>>>>>>> level clients exposed from
> >>>>>>>>>>>>> Flink so that a following thread will be started later to
> >>>>>>>> coordinate
> >>>>>>>>>>>>> FLIP-73 and FLIP-74 on
> >>>>>>>>>>>>> expose issue.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Looking forward to your opinions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> tison.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>> --
> >>>>>
> >>>>> Konstantin Knauf | Solutions Architect
> >>>>>
> >>>>> +49 160 91394525
> >>>>>
> >>>>>
> >>>>> Follow us @VervericaData Ververica <https://www.ververica.com/>
> >>>>>
> >>>>>
> >>>>> --
> >>>>>
> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> >>>>> Conference
> >>>>>
> >>>>> Stream Processing | Event Driven | Real Time
> >>>>>
> >>>>> --
> >>>>>
> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >>>>>
> >>>>> --
> >>>>> Ververica GmbH
> >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> >>> Ji
> >>>>> (Tony) Cheng
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> Konstantin Knauf | Solutions Architect
> >>>
> >>> +49 160 91394525
> >>>
> >>>
> >>> Follow us @VervericaData Ververica <https://www.ververica.com/>
> >>>
> >>>
> >>> --
> >>>
> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> >>> Conference
> >>>
> >>> Stream Processing | Event Driven | Real Time
> >>>
> >>> --
> >>>
> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >>>
> >>> --
> >>> Ververica GmbH
> >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> >>> (Tony) Cheng
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-74: Flink JobClient API

tison
Thanks for your replies.

Since no objection to Konstantin's proposal so far, I'd like to update
the FLIP correspondingly. They are naming issue and exclusion of
deprecated functionality.

I'm hereby infer that our community generally agree on the introduction
of the JobClient and its interfaces proposed in the FLIP. If there are other
concerns, please thrown into this thread. Otherwise I'm going to start a
vote thread later.

Best,
tison.


Kostas Kloudas <[hidden email]> 于2019年10月4日周五 下午11:33写道:

> I also agree @Zili Chen !
>
> On Fri, Oct 4, 2019 at 10:17 AM Aljoscha Krettek <[hidden email]>
> wrote:
> >
> > This makes sense to me, yes!
> >
> > > On 2. Oct 2019, at 20:35, Zili Chen <[hidden email]> wrote:
> > >
> > > Hi all,
> > >
> > > Narrow the scope to FLIP-74 we aimed at introduce a useful and
> extensible
> > > user-facing public interface JobClient. Let me reemphasize two major
> works
> > > under this thread.
> > >
> > > 1. standard interface
> > >
> > > As in FLIP-74 we introduce an interface JobClient with its methods,
> we'd
> > > like to
> > > make it a standard (non-final since we can always extends on demand)
> > > interface.
> > >
> > > On this branch I'd like to, with respect to Konstantin's suggestion, 1)
> > > exclude deprecated
> > > cancelWithSavepoint from the proposal 2) rename stopWithSavepoint to
> stop
> > > to keep
> > > consistency with our CLI command. If there is no more concern on these
> > > topics I will
> > > update proposal tomorrow.
> > >
> > > 2. client interfaces are asynchronous
> > >
> > > If the asynchronous JobClient interfaces approved, a necessary proposed
> > > changed is
> > > corresponding update ClusterClient interfaces. Still ClusterClient is
> an
> > > internal concept
> > > after this FLIP but it might have some impact so I think it's better to
> > > reach a community
> > > consensus as prerequisite. Note that with all client methods are
> > > asynchronous, no matter
> > > whether or not we remove client side detach option it is no power.
> > >
> > > Let me know your ideas on these topic and keep moving forward :-)
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Zili Chen <[hidden email]> 于2019年10月2日周三 下午4:10写道:
> > >
> > >> Hi Konstantin,
> > >>
> > >> * should we add "cancelWithSavepeoint" to a new public API, when we
> have
> > >> deprecated the corresponding REST API/CLI methods? In my understanding
> > >> there is no reason to use it anymore.
> > >>
> > >> Good point. We can exclude "cancelWithSavepoint" from public API at
> least
> > >> for now,
> > >> since it is deprecated already. Let's see if there is other concerns.
> > >>
> > >> * should we call "stopWithSavepoint" simply "stop" as "stop" always
> > >> performs a savepoint?
> > >>
> > >> Well for naming issue I'm fine with that if it is a consensus of our
> > >> community. I can see
> > >> there is a "stop" CLI command which means "stop with savepoint".
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >>
> > >> Konstantin Knauf <[hidden email]> 于2019年9月30日周一 下午12:16写道:
> > >>
> > >>> Hi Thomas,
> > >>>
> > >>> maybe there is a misunderstanding. There is no plan to deprecate
> anything
> > >>> in the REST API in the process of introducing the JobClient API, and
> it
> > >>> shouldn't.
> > >>>
> > >>> Since "cancel with savepoint" was already deprecated in the REST API
> and
> > >>> CLI, I am just raising the question whether to add it to the
> JobClient API
> > >>> in the first place.
> > >>>
> > >>> Best,
> > >>>
> > >>> Konstantin
> > >>>
> > >>>
> > >>>
> > >>> On Mon, Sep 30, 2019 at 1:16 AM Thomas Weise <[hidden email]> wrote:
> > >>>
> > >>>> I did not realize there was a plan to deprecate anything in the REST
> > >>> API?
> > >>>>
> > >>>> The REST API is super important for tooling written in non JVM
> > >>> languages,
> > >>>> that does not include a Flink client (like FlinkK8sOperator). The
> REST
> > >>> API
> > >>>> should continue to support all job management operations, including
> job
> > >>>> submission.
> > >>>>
> > >>>> Thomas
> > >>>>
> > >>>>
> > >>>> On Sun, Sep 29, 2019 at 1:37 PM Konstantin Knauf <
> > >>> [hidden email]
> > >>>>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Zili,
> > >>>>>
> > >>>>> thanks for working on this topic. Just read through the FLIP and I
> > >>> have
> > >>>> two
> > >>>>> questions:
> > >>>>>
> > >>>>> * should we add "cancelWithSavepeoint" to a new public API, when we
> > >>> have
> > >>>>> deprecated the corresponding REST API/CLI methods? In my
> understanding
> > >>>>> there is no reason to use it anymore.
> > >>>>> * should we call "stopWithSavepoint" simply "stop" as "stop" always
> > >>>>> performs a savepoint?
> > >>>>>
> > >>>>> Best,
> > >>>>>
> > >>>>> Konstantin
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Fri, Sep 27, 2019 at 10:48 AM Aljoscha Krettek <
> > >>> [hidden email]>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi Flavio,
> > >>>>>>
> > >>>>>> I agree that this would be good to have. But I also think that
> this
> > >>> is
> > >>>>>> outside the scope of FLIP-74, I think it is an orthogonal feature.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> Aljoscha
> > >>>>>>
> > >>>>>>> On 27. Sep 2019, at 10:31, Flavio Pompermaier <
> > >>> [hidden email]>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi all,
> > >>>>>>> just a remark about the Flink REST APIs (and its client as well):
> > >>>>> almost
> > >>>>>>> all the times we need a way to dynamically know which jobs are
> > >>>>> contained
> > >>>>>> in
> > >>>>>>> a jar file, and this could be exposed by the REST endpoint under
> > >>>>>>> /jars/:jarid/entry-points (a simple way to implement this would
> > >>> be to
> > >>>>>> check
> > >>>>>>> the value of Main-class or Main-classes inside the Manifest of
> the
> > >>>> jar
> > >>>>> if
> > >>>>>>> they exists [1]).
> > >>>>>>>
> > >>>>>>> I understand that this is something that is not strictly required
> > >>> to
> > >>>>>>> execute Flink jobs but IMHO it would ease A LOT the work of UI
> > >>>>> developers
> > >>>>>>> that could have a way to show the users all available jobs inside
> > >>> a
> > >>>>> jar +
> > >>>>>>> their configurable parameters.
> > >>>>>>> For example, right now in the WebUI, you can upload a jar and
> then
> > >>>> you
> > >>>>>> have
> > >>>>>>> to set (without any autocomplete or UI support) the main class
> and
> > >>>>> their
> > >>>>>>> params (for example using a string like --param1 xx --param2 yy).
> > >>>>>>> Adding this functionality to the REST API and the respective
> > >>> client
> > >>>>> would
> > >>>>>>> enable the WebUI (and all UIs interacting with a Flink cluster)
> to
> > >>>>>> prefill
> > >>>>>>> a dropdown list containing the list of entry-point classes (i.e.
> > >>>> Flink
> > >>>>>>> jobs) and, once selected, their required (typed) parameters.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Flavio
> > >>>>>>>
> > >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10864
> > >>>>>>>
> > >>>>>>> On Fri, Sep 27, 2019 at 9:16 AM Zili Chen <[hidden email]>
> > >>>>> wrote:
> > >>>>>>>
> > >>>>>>>> modify
> > >>>>>>>>
> > >>>>>>>> /we just shutdown the cluster on the exit of client that running
> > >>>>> inside
> > >>>>>>>> cluster/
> > >>>>>>>>
> > >>>>>>>> to
> > >>>>>>>>
> > >>>>>>>> we just shutdown the cluster on both the exit of client that
> > >>> running
> > >>>>>> inside
> > >>>>>>>> cluster and the finish of job.
> > >>>>>>>> Since client is running inside cluster we can easily wait for
> the
> > >>>> end
> > >>>>> of
> > >>>>>>>> two both in ClusterEntrypoint.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午3:13写道:
> > >>>>>>>>
> > >>>>>>>>> About JobCluster
> > >>>>>>>>>
> > >>>>>>>>> Actually I am not quite sure what we gains from DETACHED
> > >>>>> configuration
> > >>>>>> on
> > >>>>>>>>> cluster side.
> > >>>>>>>>> We don't have a NON-DETACHED JobCluster in fact in our
> codebase,
> > >>>>> right?
> > >>>>>>>>>
> > >>>>>>>>> It comes to me one major questions we have to answer first.
> > >>>>>>>>>
> > >>>>>>>>> *What JobCluster conceptually is exactly*
> > >>>>>>>>>
> > >>>>>>>>> Related discussion can be found in JIRA[1] and mailing list[2].
> > >>>>> Stephan
> > >>>>>>>>> gives a nice
> > >>>>>>>>> description of JobCluster:
> > >>>>>>>>>
> > >>>>>>>>> Two things to add: - The job mode is very nice in the way that
> > >>> it
> > >>>>> runs
> > >>>>>>>> the
> > >>>>>>>>> client inside the cluster (in the same image/process that is
> the
> > >>>> JM)
> > >>>>>> and
> > >>>>>>>>> thus unifies both applications and what the Spark world calls
> > >>> the
> > >>>>>> "driver
> > >>>>>>>>> mode". - Another thing I would add is that during the FLIP-6
> > >>>> design,
> > >>>>> we
> > >>>>>>>>> were thinking about setups where Dispatcher and JobManager are
> > >>>>> separate
> > >>>>>>>>> processes. A Yarn or Mesos Dispatcher of a session could run
> > >>>>>>>> independently
> > >>>>>>>>> (even as privileged processes executing no code). Then you the
> > >>>>>> "per-job"
> > >>>>>>>>> mode could still be helpful: when a job is submitted to the
> > >>>>> dispatcher,
> > >>>>>>>> it
> > >>>>>>>>> launches the JM again in a per-job mode, so that JM and TM
> > >>>> processes
> > >>>>>> are
> > >>>>>>>>> bound to teh job only. For higher security setups, it is
> > >>> important
> > >>>>> that
> > >>>>>>>>> processes are not reused across jobs.
> > >>>>>>>>>
> > >>>>>>>>> However, currently in "per-job" mode we generate JobGraph in
> > >>> client
> > >>>>>> side,
> > >>>>>>>>> launching
> > >>>>>>>>> the JobCluster and retrieve the JobGraph for execution. So
> > >>>> actually,
> > >>>>> we
> > >>>>>>>>> don't "run the
> > >>>>>>>>> client inside the cluster".
> > >>>>>>>>>
> > >>>>>>>>> Besides, refer to the discussion with Till[1], it would be
> > >>> helpful
> > >>>> we
> > >>>>>>>>> follow the same process
> > >>>>>>>>> of session mode for that of "per-job" mode in user perspective,
> > >>>> that
> > >>>>> we
> > >>>>>>>>> don't use
> > >>>>>>>>> OptimizedPlanEnvironment to create JobGraph, but directly
> deploy
> > >>>>> Flink
> > >>>>>>>>> cluster in env.execute.
> > >>>>>>>>>
> > >>>>>>>>> Generally 2 points
> > >>>>>>>>>
> > >>>>>>>>> 1. Running Flink job by invoke user main method and execute
> > >>>>> throughout,
> > >>>>>>>>> instead of create
> > >>>>>>>>> JobGraph from main-class.
> > >>>>>>>>> 2. Run the client inside the cluster.
> > >>>>>>>>>
> > >>>>>>>>> If 1 and 2 are implemented. There is obvious no need for
> > >>> DETACHED
> > >>>>> mode
> > >>>>>> in
> > >>>>>>>>> cluster side
> > >>>>>>>>> because we just shutdown the cluster on the exit of client that
> > >>>>> running
> > >>>>>>>>> inside cluster. Whether
> > >>>>>>>>> or not delivered the result is up to user code.
> > >>>>>>>>>
> > >>>>>>>>> [1]
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16931388&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931388
> > >>>>>>>>> [2]
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> https://lists.apache.org/x/thread.html/e8f14a381be6c027e8945f884c3cfcb309ce49c1ba557d3749fca495@%3Cdev.flink.apache.org%3E
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Zili Chen <[hidden email]> 于2019年9月27日周五 下午2:13写道:
> > >>>>>>>>>
> > >>>>>>>>>> Thanks for your replies Kostas & Aljoscha!
> > >>>>>>>>>>
> > >>>>>>>>>> Below are replies point by point.
> > >>>>>>>>>>
> > >>>>>>>>>> 1. For DETACHED mode, what I said there is about the DETACHED
> > >>> mode
> > >>>>> in
> > >>>>>>>>>> client side.
> > >>>>>>>>>> There are two configurations overload the item DETACHED[1].
> > >>>>>>>>>>
> > >>>>>>>>>> In client side, it means whether or not client.submitJob is
> > >>>> blocking
> > >>>>>> to
> > >>>>>>>>>> job execution result.
> > >>>>>>>>>> Due to client.submitJob returns CompletableFuture<JobClient>
> > >>>>>>>> NON-DETACHED
> > >>>>>>>>>> is no
> > >>>>>>>>>> power at all. Caller of submitJob makes the decision whether
> or
> > >>>> not
> > >>>>>>>>>> blocking to get the
> > >>>>>>>>>> JobClient and request for the job execution result. If client
> > >>>>> crashes,
> > >>>>>>>> it
> > >>>>>>>>>> is a user scope
> > >>>>>>>>>> exception that should be handled in user code; if client lost
> > >>>>>> connection
> > >>>>>>>>>> to cluster, we have
> > >>>>>>>>>> a retry times and interval configuration that automatically
> > >>> retry
> > >>>>> and
> > >>>>>>>>>> throws an user scope
> > >>>>>>>>>> exception if exceed.
> > >>>>>>>>>>
> > >>>>>>>>>> Your comment about poll for result or job result sounds like a
> > >>>>> concern
> > >>>>>>>> on
> > >>>>>>>>>> cluster side.
> > >>>>>>>>>>
> > >>>>>>>>>> In cluster side, DETACHED mode is alive only in JobCluster. If
> > >>>>>> DETACHED
> > >>>>>>>>>> configured,
> > >>>>>>>>>> JobCluster exits on job finished; if NON-DETACHED configured,
> > >>>>>> JobCluster
> > >>>>>>>>>> exits on job
> > >>>>>>>>>> execution result delivered. FLIP-74 doesn't stick to changes
> on
> > >>>> this
> > >>>>>>>>>> scope, it is just remained.
> > >>>>>>>>>>
> > >>>>>>>>>> However, it is an interesting part we can revisit this
> > >>>>> implementation
> > >>>>>> a
> > >>>>>>>>>> bit.
> > >>>>>>>>>>
> > >>>>>>>>>> <see the next email for compact reply in this one>
> > >>>>>>>>>>
> > >>>>>>>>>> 2. The retrieval of JobClient is so important that if we don't
> > >>>> have
> > >>>>> a
> > >>>>>>>> way
> > >>>>>>>>>> to retrieve JobClient it is
> > >>>>>>>>>> a dumb public user-facing interface(what a strange state :P).
> > >>>>>>>>>>
> > >>>>>>>>>> About the retrieval of JobClient, as mentioned in the
> document,
> > >>>> two
> > >>>>>> ways
> > >>>>>>>>>> should be supported.
> > >>>>>>>>>>
> > >>>>>>>>>> (1). Retrieved as return type of job submission.
> > >>>>>>>>>> (2). Retrieve a JobClient of existing job.(with job id)
> > >>>>>>>>>>
> > >>>>>>>>>> I highly respect your thoughts about how Executors should be
> > >>> and
> > >>>>>>>> thoughts
> > >>>>>>>>>> on multi-layered clients.
> > >>>>>>>>>> Although, (2) is not supported by public interfaces as summary
> > >>> of
> > >>>>>>>>>> discussion above, we can discuss
> > >>>>>>>>>> a bit on the place of Executors on multi-layered clients and
> > >>> find
> > >>>> a
> > >>>>>> way
> > >>>>>>>>>> to retrieve JobClient of
> > >>>>>>>>>> existing job with public client API. I will comment in FLIP-73
> > >>>>>> thread[2]
> > >>>>>>>>>> since it is almost about Executors.
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>> tison.
> > >>>>>>>>>>
> > >>>>>>>>>> [1]
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?disco=AAAADnLLvM8
> > >>>>>>>>>> [2]
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> https://lists.apache.org/x/thread.html/dc3a541709f96906b43df4155373af1cd09e08c3f105b0bd0ba3fca2@%3Cdev.flink.apache.org%3E
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Kostas Kloudas <[hidden email]> 于2019年9月25日周三 下午9:29写道:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Tison,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for the FLIP and launching the discussion!
> > >>>>>>>>>>>
> > >>>>>>>>>>> As a first note, big +1 on providing/exposing a JobClient to
> > >>> the
> > >>>>>> users!
> > >>>>>>>>>>>
> > >>>>>>>>>>> Some points that would be nice to be clarified:
> > >>>>>>>>>>> 1) You mention that we can get rid of the DETACHED mode: I
> > >>> agree
> > >>>>> that
> > >>>>>>>>>>> at a high level, given that everything will now be
> > >>> asynchronous,
> > >>>>>> there
> > >>>>>>>>>>> is no need to keep the DETACHED mode but I think we should
> > >>>> specify
> > >>>>>>>>>>> some aspects. For example, without the explicit separation of
> > >>> the
> > >>>>>>>>>>> modes, what happens when the job finishes. Does the client
> > >>>>>>>>>>> periodically poll for the result always or the result is
> > >>> pushed
> > >>>>> when
> > >>>>>>>>>>> in NON-DETACHED mode? What happens if the client disconnects
> > >>> and
> > >>>>>>>>>>> reconnects?
> > >>>>>>>>>>>
> > >>>>>>>>>>> 2) On the "how to retrieve a JobClient for a running Job", I
> > >>>> think
> > >>>>>>>>>>> this is related to the other discussion you opened in the ML
> > >>>> about
> > >>>>>>>>>>> multi-layered clients. First of all, I agree that exposing
> > >>>>> different
> > >>>>>>>>>>> "levels" of clients would be a nice addition, and actually
> > >>> there
> > >>>>> have
> > >>>>>>>>>>> been some discussions about doing so in the future. Now for
> > >>> this
> > >>>>>>>>>>> specific discussion:
> > >>>>>>>>>>>     i) I do not think that we should expose the
> > >>>>>>>>>>> ClusterDescriptor/ClusterSpecification to the user, as this
> > >>> ties
> > >>>> us
> > >>>>>> to
> > >>>>>>>>>>> a specific architecture which may change in the future.
> > >>>>>>>>>>>    ii) I do not think it should be the Executor that will
> > >>>> provide
> > >>>>> a
> > >>>>>>>>>>> JobClient for an already running job (only for the Jobs that
> > >>> it
> > >>>>>>>>>>> submits). The job of the executor should just be to execute()
> > >>> a
> > >>>>>>>>>>> pipeline.
> > >>>>>>>>>>>    iii) I think a solution that respects the separation of
> > >>>>> concerns
> > >>>>>>>>>>> could be the addition of another component (in the future),
> > >>>>> something
> > >>>>>>>>>>> like a ClientFactory, or ClusterFactory that will have
> methods
> > >>>>> like:
> > >>>>>>>>>>> ClusterClient createCluster(Configuration), JobClient
> > >>>>>>>>>>> retrieveJobClient(Configuration , JobId), maybe even
> (although
> > >>>> not
> > >>>>>>>>>>> sure) Executor getExecutor(Configuration ) and maybe more.
> > >>> This
> > >>>>>>>>>>> component would be responsible to interact with a cluster
> > >>> manager
> > >>>>>> like
> > >>>>>>>>>>> Yarn and do what is now being done by the ClusterDescriptor
> > >>> plus
> > >>>>> some
> > >>>>>>>>>>> more stuff.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Although under the hood all these abstractions (Environments,
> > >>>>>>>>>>> Executors, ...) underneath use the same clients, I believe
> > >>> their
> > >>>>>>>>>>> job/existence is not contradicting but they simply hide some
> > >>> of
> > >>>> the
> > >>>>>>>>>>> complexity from the user, and give us, as developers some
> > >>> freedom
> > >>>>> to
> > >>>>>>>>>>> change in the future some of the parts. For example, the
> > >>> executor
> > >>>>>> will
> > >>>>>>>>>>> take a Pipeline, create a JobGraph and submit it, instead of
> > >>>>>> requiring
> > >>>>>>>>>>> the user to do each step separately. This allows us to, for
> > >>>>> example,
> > >>>>>>>>>>> get rid of the Plan if in the future everything is
> DataStream.
> > >>>>>>>>>>> Essentially, I think of these as layers of an onion with the
> > >>>>> clients
> > >>>>>>>>>>> being close to the core. The higher you go, the more
> > >>>> functionality
> > >>>>> is
> > >>>>>>>>>>> included and hidden from the public eye.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Point iii) by the way is just a thought and by no means
> > >>> final. I
> > >>>>> also
> > >>>>>>>>>>> like the idea of multi-layered clients so this may spark up
> > >>> the
> > >>>>>>>>>>> discussion.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>> Kostas
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, Sep 25, 2019 at 2:21 PM Aljoscha Krettek <
> > >>>>>> [hidden email]>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Hi Tison,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks for proposing the document! I had some comments on
> the
> > >>>>>>>> document.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I think the only complex thing that we still need to figure
> > >>> out
> > >>>> is
> > >>>>>>>> how
> > >>>>>>>>>>> to get a JobClient for a job that is already running. As you
> > >>>>>> mentioned
> > >>>>>>>> in
> > >>>>>>>>>>> the document. Currently I’m thinking that its ok to add a
> > >>> method
> > >>>> to
> > >>>>>>>>>>> Executor for retrieving a JobClient for a running job by
> > >>>> providing
> > >>>>> an
> > >>>>>>>> ID.
> > >>>>>>>>>>> Let’s see what Kostas has to say on the topic.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Best,
> > >>>>>>>>>>>> Aljoscha
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> On 25. Sep 2019, at 12:31, Zili Chen <[hidden email]
> >
> > >>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Summary from the discussion about introducing Flink
> > >>> JobClient
> > >>>>>>>> API[1]
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>> draft FLIP-74[2] to
> > >>>>>>>>>>>>> gather thoughts and towards a standard public user-facing
> > >>>>>>>> interfaces.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> This discussion thread aims at standardizing job level
> > >>> client
> > >>>>> API.
> > >>>>>>>>>>> But I'd
> > >>>>>>>>>>>>> like to emphasize that
> > >>>>>>>>>>>>> how to retrieve JobClient possibly causes further
> > >>> discussion on
> > >>>>>>>>>>> different
> > >>>>>>>>>>>>> level clients exposed from
> > >>>>>>>>>>>>> Flink so that a following thread will be started later to
> > >>>>>>>> coordinate
> > >>>>>>>>>>>>> FLIP-73 and FLIP-74 on
> > >>>>>>>>>>>>> expose issue.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Looking forward to your opinions.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>> tison.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> [1]
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > >>>>>>>>>>>>> [2]
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>> --
> > >>>>>
> > >>>>> Konstantin Knauf | Solutions Architect
> > >>>>>
> > >>>>> +49 160 91394525
> > >>>>>
> > >>>>>
> > >>>>> Follow us @VervericaData Ververica <https://www.ververica.com/>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>>
> > >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > >>>>> Conference
> > >>>>>
> > >>>>> Stream Processing | Event Driven | Real Time
> > >>>>>
> > >>>>> --
> > >>>>>
> > >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > >>>>>
> > >>>>> --
> > >>>>> Ververica GmbH
> > >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung
> Jason,
> > >>> Ji
> > >>>>> (Tony) Cheng
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Konstantin Knauf | Solutions Architect
> > >>>
> > >>> +49 160 91394525
> > >>>
> > >>>
> > >>> Follow us @VervericaData Ververica <https://www.ververica.com/>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > >>> Conference
> > >>>
> > >>> Stream Processing | Event Driven | Real Time
> > >>>
> > >>> --
> > >>>
> > >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > >>>
> > >>> --
> > >>> Ververica GmbH
> > >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> Ji
> > >>> (Tony) Cheng
> > >>>
> > >>
> >
>