REST API / JarRunHandler: More flexibility for launching jobs

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

REST API / JarRunHandler: More flexibility for launching jobs

Thomas Weise
Hi,

While considering different options to launch Beam jobs through the Flink
REST API, I noticed that the implementation of JarRunHandler places quite a
few restrictions on how the entry point shall construct a Flink job, by
extracting and manipulating the job graph.

That's normally not a problem for Flink Java programs, but in the scenario
I'm looking at, the job graph would be constructed by a different process
and isn't available to the REST handler. Instead, I would like to be able
to just respond with the job ID of the already launched job.

For context, please see:

https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d

The current JarRunHandler code is here:

https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82

It would be nice if there was an option to delegate the responsibility for
job submission to the user code / entry point. That would be useful for
Beam and other frameworks built on top of Flink that dynamically create a
job graph from a different representation.

Possible ways to get there:

* an interface that the main class can be implement end when present, the
jar run handler calls instead of main.

* an annotated method

Either way query parameters like savepoint path and parallelism would be
forwarded to the user code and the result would be the ID of the launched
job.

Thougths?

Thanks,
Thomas
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Till Rohrmann
Hi Thomas,

quick question: Why do you wanna use the JarRunHandler? If another process
is building the JobGraph, then one could use the JobSubmitHandler which
expects a JobGraph and then starts executing it.

Cheers,
Till

On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]> wrote:

> Hi,
>
> While considering different options to launch Beam jobs through the Flink
> REST API, I noticed that the implementation of JarRunHandler places quite a
> few restrictions on how the entry point shall construct a Flink job, by
> extracting and manipulating the job graph.
>
> That's normally not a problem for Flink Java programs, but in the scenario
> I'm looking at, the job graph would be constructed by a different process
> and isn't available to the REST handler. Instead, I would like to be able
> to just respond with the job ID of the already launched job.
>
> For context, please see:
>
>
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>
> The current JarRunHandler code is here:
>
>
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
>
> It would be nice if there was an option to delegate the responsibility for
> job submission to the user code / entry point. That would be useful for
> Beam and other frameworks built on top of Flink that dynamically create a
> job graph from a different representation.
>
> Possible ways to get there:
>
> * an interface that the main class can be implement end when present, the
> jar run handler calls instead of main.
>
> * an annotated method
>
> Either way query parameters like savepoint path and parallelism would be
> forwarded to the user code and the result would be the ID of the launched
> job.
>
> Thougths?
>
> Thanks,
> Thomas
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Thomas Weise
Hi Till,

Thanks for taking a look!

The Beam job server does not currently have the ability to just output the
job graph (and related artifacts) that could then be used with the
JobSubmitHandler. It is itself using StreamExecutionEnvironment, which in
turn will lead to a REST API submission.

Here I'm looking at what happens before the Beam job server gets involved:
the interaction of the k8s operator with the Flink deployment. The jar run
endpoint (ignoring the current handler implementation) is generic and
pretty much exactly matches what we would need for a uniform entry point.
It's just that in the Beam case the jar file would itself be a "launcher"
that doesn't provide the job graph itself, but the dependencies and
mechanism to invoke the actual client.

I could accomplish what I'm looking for by creating a separate REST
endpoint that looks almost the same. But I would prefer to reuse the Flink
REST API interaction that is already implemented for the Flink Java jobs to
reduce the complexity of the deployment.

Thomas




On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <[hidden email]> wrote:

> Hi Thomas,
>
> quick question: Why do you wanna use the JarRunHandler? If another process
> is building the JobGraph, then one could use the JobSubmitHandler which
> expects a JobGraph and then starts executing it.
>
> Cheers,
> Till
>
> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]> wrote:
>
> > Hi,
> >
> > While considering different options to launch Beam jobs through the Flink
> > REST API, I noticed that the implementation of JarRunHandler places
> quite a
> > few restrictions on how the entry point shall construct a Flink job, by
> > extracting and manipulating the job graph.
> >
> > That's normally not a problem for Flink Java programs, but in the
> scenario
> > I'm looking at, the job graph would be constructed by a different process
> > and isn't available to the REST handler. Instead, I would like to be able
> > to just respond with the job ID of the already launched job.
> >
> > For context, please see:
> >
> >
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> >
> > The current JarRunHandler code is here:
> >
> >
> >
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> >
> > It would be nice if there was an option to delegate the responsibility
> for
> > job submission to the user code / entry point. That would be useful for
> > Beam and other frameworks built on top of Flink that dynamically create a
> > job graph from a different representation.
> >
> > Possible ways to get there:
> >
> > * an interface that the main class can be implement end when present, the
> > jar run handler calls instead of main.
> >
> > * an annotated method
> >
> > Either way query parameters like savepoint path and parallelism would be
> > forwarded to the user code and the result would be the ID of the launched
> > job.
> >
> > Thougths?
> >
> > Thanks,
> > Thomas
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

tison
Hi Thomas,

IIUC this "launcher" should run on client endpoint instead
of dispatcher endpoint. "jar run" will extract the job graph
and submit it to the dispatcher, which has mismatched
semantic from your willing.

Could you run it with CliFrontend? Or propose that "jar run"
supports running directly the main method instead of extraction?

Best,
tison.


Thomas Weise <[hidden email]> 于2019年7月26日周五 下午11:38写道:

> Hi Till,
>
> Thanks for taking a look!
>
> The Beam job server does not currently have the ability to just output the
> job graph (and related artifacts) that could then be used with the
> JobSubmitHandler. It is itself using StreamExecutionEnvironment, which in
> turn will lead to a REST API submission.
>
> Here I'm looking at what happens before the Beam job server gets involved:
> the interaction of the k8s operator with the Flink deployment. The jar run
> endpoint (ignoring the current handler implementation) is generic and
> pretty much exactly matches what we would need for a uniform entry point.
> It's just that in the Beam case the jar file would itself be a "launcher"
> that doesn't provide the job graph itself, but the dependencies and
> mechanism to invoke the actual client.
>
> I could accomplish what I'm looking for by creating a separate REST
> endpoint that looks almost the same. But I would prefer to reuse the Flink
> REST API interaction that is already implemented for the Flink Java jobs to
> reduce the complexity of the deployment.
>
> Thomas
>
>
>
>
> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <[hidden email]>
> wrote:
>
> > Hi Thomas,
> >
> > quick question: Why do you wanna use the JarRunHandler? If another
> process
> > is building the JobGraph, then one could use the JobSubmitHandler which
> > expects a JobGraph and then starts executing it.
> >
> > Cheers,
> > Till
> >
> > On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]> wrote:
> >
> > > Hi,
> > >
> > > While considering different options to launch Beam jobs through the
> Flink
> > > REST API, I noticed that the implementation of JarRunHandler places
> > quite a
> > > few restrictions on how the entry point shall construct a Flink job, by
> > > extracting and manipulating the job graph.
> > >
> > > That's normally not a problem for Flink Java programs, but in the
> > scenario
> > > I'm looking at, the job graph would be constructed by a different
> process
> > > and isn't available to the REST handler. Instead, I would like to be
> able
> > > to just respond with the job ID of the already launched job.
> > >
> > > For context, please see:
> > >
> > >
> > >
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> > >
> > > The current JarRunHandler code is here:
> > >
> > >
> > >
> >
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> > >
> > > It would be nice if there was an option to delegate the responsibility
> > for
> > > job submission to the user code / entry point. That would be useful for
> > > Beam and other frameworks built on top of Flink that dynamically
> create a
> > > job graph from a different representation.
> > >
> > > Possible ways to get there:
> > >
> > > * an interface that the main class can be implement end when present,
> the
> > > jar run handler calls instead of main.
> > >
> > > * an annotated method
> > >
> > > Either way query parameters like savepoint path and parallelism would
> be
> > > forwarded to the user code and the result would be the ID of the
> launched
> > > job.
> > >
> > > Thougths?
> > >
> > > Thanks,
> > > Thomas
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Chesnay Schepler-3
In reply to this post by Thomas Weise
Couldn't the beam job server use the same work-around we're using in the
JarRunHandler to get access to the JobGraph?

On 26/07/2019 17:38, Thomas Weise wrote:

> Hi Till,
>
> Thanks for taking a look!
>
> The Beam job server does not currently have the ability to just output the
> job graph (and related artifacts) that could then be used with the
> JobSubmitHandler. It is itself using StreamExecutionEnvironment, which in
> turn will lead to a REST API submission.
>
> Here I'm looking at what happens before the Beam job server gets involved:
> the interaction of the k8s operator with the Flink deployment. The jar run
> endpoint (ignoring the current handler implementation) is generic and
> pretty much exactly matches what we would need for a uniform entry point.
> It's just that in the Beam case the jar file would itself be a "launcher"
> that doesn't provide the job graph itself, but the dependencies and
> mechanism to invoke the actual client.
>
> I could accomplish what I'm looking for by creating a separate REST
> endpoint that looks almost the same. But I would prefer to reuse the Flink
> REST API interaction that is already implemented for the Flink Java jobs to
> reduce the complexity of the deployment.
>
> Thomas
>
>
>
>
> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <[hidden email]> wrote:
>
>> Hi Thomas,
>>
>> quick question: Why do you wanna use the JarRunHandler? If another process
>> is building the JobGraph, then one could use the JobSubmitHandler which
>> expects a JobGraph and then starts executing it.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]> wrote:
>>
>>> Hi,
>>>
>>> While considering different options to launch Beam jobs through the Flink
>>> REST API, I noticed that the implementation of JarRunHandler places
>> quite a
>>> few restrictions on how the entry point shall construct a Flink job, by
>>> extracting and manipulating the job graph.
>>>
>>> That's normally not a problem for Flink Java programs, but in the
>> scenario
>>> I'm looking at, the job graph would be constructed by a different process
>>> and isn't available to the REST handler. Instead, I would like to be able
>>> to just respond with the job ID of the already launched job.
>>>
>>> For context, please see:
>>>
>>>
>>>
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>>> The current JarRunHandler code is here:
>>>
>>>
>>>
>> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
>>> It would be nice if there was an option to delegate the responsibility
>> for
>>> job submission to the user code / entry point. That would be useful for
>>> Beam and other frameworks built on top of Flink that dynamically create a
>>> job graph from a different representation.
>>>
>>> Possible ways to get there:
>>>
>>> * an interface that the main class can be implement end when present, the
>>> jar run handler calls instead of main.
>>>
>>> * an annotated method
>>>
>>> Either way query parameters like savepoint path and parallelism would be
>>> forwarded to the user code and the result would be the ID of the launched
>>> job.
>>>
>>> Thougths?
>>>
>>> Thanks,
>>> Thomas
>>>

Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Till Rohrmann
Are you looking for something similar to the `Program` interface? This
interface, even though it is a bit outdated and might get removed in the
future, offers a `getPlan` method which is called in order to generate the
`JobGraph`. In the client refactoring discussion thread it is currently
being discussed what to do with this interface.

Cheers,
Till

On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <[hidden email]>
wrote:

> Couldn't the beam job server use the same work-around we're using in the
> JarRunHandler to get access to the JobGraph?
>
> On 26/07/2019 17:38, Thomas Weise wrote:
> > Hi Till,
> >
> > Thanks for taking a look!
> >
> > The Beam job server does not currently have the ability to just output
> the
> > job graph (and related artifacts) that could then be used with the
> > JobSubmitHandler. It is itself using StreamExecutionEnvironment, which in
> > turn will lead to a REST API submission.
> >
> > Here I'm looking at what happens before the Beam job server gets
> involved:
> > the interaction of the k8s operator with the Flink deployment. The jar
> run
> > endpoint (ignoring the current handler implementation) is generic and
> > pretty much exactly matches what we would need for a uniform entry point.
> > It's just that in the Beam case the jar file would itself be a "launcher"
> > that doesn't provide the job graph itself, but the dependencies and
> > mechanism to invoke the actual client.
> >
> > I could accomplish what I'm looking for by creating a separate REST
> > endpoint that looks almost the same. But I would prefer to reuse the
> Flink
> > REST API interaction that is already implemented for the Flink Java jobs
> to
> > reduce the complexity of the deployment.
> >
> > Thomas
> >
> >
> >
> >
> > On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <[hidden email]>
> wrote:
> >
> >> Hi Thomas,
> >>
> >> quick question: Why do you wanna use the JarRunHandler? If another
> process
> >> is building the JobGraph, then one could use the JobSubmitHandler which
> >> expects a JobGraph and then starts executing it.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]> wrote:
> >>
> >>> Hi,
> >>>
> >>> While considering different options to launch Beam jobs through the
> Flink
> >>> REST API, I noticed that the implementation of JarRunHandler places
> >> quite a
> >>> few restrictions on how the entry point shall construct a Flink job, by
> >>> extracting and manipulating the job graph.
> >>>
> >>> That's normally not a problem for Flink Java programs, but in the
> >> scenario
> >>> I'm looking at, the job graph would be constructed by a different
> process
> >>> and isn't available to the REST handler. Instead, I would like to be
> able
> >>> to just respond with the job ID of the already launched job.
> >>>
> >>> For context, please see:
> >>>
> >>>
> >>>
> >>
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> >>> The current JarRunHandler code is here:
> >>>
> >>>
> >>>
> >>
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> >>> It would be nice if there was an option to delegate the responsibility
> >> for
> >>> job submission to the user code / entry point. That would be useful for
> >>> Beam and other frameworks built on top of Flink that dynamically
> create a
> >>> job graph from a different representation.
> >>>
> >>> Possible ways to get there:
> >>>
> >>> * an interface that the main class can be implement end when present,
> the
> >>> jar run handler calls instead of main.
> >>>
> >>> * an annotated method
> >>>
> >>> Either way query parameters like savepoint path and parallelism would
> be
> >>> forwarded to the user code and the result would be the ID of the
> launched
> >>> job.
> >>>
> >>> Thougths?
> >>>
> >>> Thanks,
> >>> Thomas
> >>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

tison
I don't think the `Program` interface could solve the problem.

The launcher launches the job server which creates the job graph,
submits it and keeps monitoring. Even if user program implement
`Program` Flink still extracts the JobGraph from `getPlan` and
submits it, instead of really execute codes in main method of
user program, so that the launcher is not started.

@Thomas,

Here is an ongoing discussion on client refactoring[1] as Till
mentioned. However, I'm afraid that with current jar run semantic,
i.e., extract the job graph and submit it to the Dispatcher, it cannot
fits your requirement. The problem is that REST API directly
communicates with Dispatcher and thus it's strange to tell the
Dispatcher "just run a program in a process".

As you mentioned in the document, with CLI in session mode the
whole program would be executed sequentially. I'll appreciate it
if you can participant the thread on client refactor[1]. In the
design document[2], we propose to provide rich interfaces for
downstream projects integration. You can customize your CLI for
executing your program arbitrarily. Any requirement or advise
would be help.

Best,
tison.

[1]
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
[2]
https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit




Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:

> Are you looking for something similar to the `Program` interface? This
> interface, even though it is a bit outdated and might get removed in the
> future, offers a `getPlan` method which is called in order to generate the
> `JobGraph`. In the client refactoring discussion thread it is currently
> being discussed what to do with this interface.
>
> Cheers,
> Till
>
> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <[hidden email]>
> wrote:
>
> > Couldn't the beam job server use the same work-around we're using in the
> > JarRunHandler to get access to the JobGraph?
> >
> > On 26/07/2019 17:38, Thomas Weise wrote:
> > > Hi Till,
> > >
> > > Thanks for taking a look!
> > >
> > > The Beam job server does not currently have the ability to just output
> > the
> > > job graph (and related artifacts) that could then be used with the
> > > JobSubmitHandler. It is itself using StreamExecutionEnvironment, which
> in
> > > turn will lead to a REST API submission.
> > >
> > > Here I'm looking at what happens before the Beam job server gets
> > involved:
> > > the interaction of the k8s operator with the Flink deployment. The jar
> > run
> > > endpoint (ignoring the current handler implementation) is generic and
> > > pretty much exactly matches what we would need for a uniform entry
> point.
> > > It's just that in the Beam case the jar file would itself be a
> "launcher"
> > > that doesn't provide the job graph itself, but the dependencies and
> > > mechanism to invoke the actual client.
> > >
> > > I could accomplish what I'm looking for by creating a separate REST
> > > endpoint that looks almost the same. But I would prefer to reuse the
> > Flink
> > > REST API interaction that is already implemented for the Flink Java
> jobs
> > to
> > > reduce the complexity of the deployment.
> > >
> > > Thomas
> > >
> > >
> > >
> > >
> > > On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <[hidden email]>
> > wrote:
> > >
> > >> Hi Thomas,
> > >>
> > >> quick question: Why do you wanna use the JarRunHandler? If another
> > process
> > >> is building the JobGraph, then one could use the JobSubmitHandler
> which
> > >> expects a JobGraph and then starts executing it.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> While considering different options to launch Beam jobs through the
> > Flink
> > >>> REST API, I noticed that the implementation of JarRunHandler places
> > >> quite a
> > >>> few restrictions on how the entry point shall construct a Flink job,
> by
> > >>> extracting and manipulating the job graph.
> > >>>
> > >>> That's normally not a problem for Flink Java programs, but in the
> > >> scenario
> > >>> I'm looking at, the job graph would be constructed by a different
> > process
> > >>> and isn't available to the REST handler. Instead, I would like to be
> > able
> > >>> to just respond with the job ID of the already launched job.
> > >>>
> > >>> For context, please see:
> > >>>
> > >>>
> > >>>
> > >>
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> > >>> The current JarRunHandler code is here:
> > >>>
> > >>>
> > >>>
> > >>
> >
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> > >>> It would be nice if there was an option to delegate the
> responsibility
> > >> for
> > >>> job submission to the user code / entry point. That would be useful
> for
> > >>> Beam and other frameworks built on top of Flink that dynamically
> > create a
> > >>> job graph from a different representation.
> > >>>
> > >>> Possible ways to get there:
> > >>>
> > >>> * an interface that the main class can be implement end when present,
> > the
> > >>> jar run handler calls instead of main.
> > >>>
> > >>> * an annotated method
> > >>>
> > >>> Either way query parameters like savepoint path and parallelism would
> > be
> > >>> forwarded to the user code and the result would be the ID of the
> > launched
> > >>> job.
> > >>>
> > >>> Thougths?
> > >>>
> > >>> Thanks,
> > >>> Thomas
> > >>>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

tison
By the way, currently Dispatcher implements RestfulGateway
and delegate resource request to ResourceManager. If we can,
semantically, let WebMonitor implement RestfulGateway,
and delegate job request to Dispatcher, resource request to
ResourceManager, it seems reasonable that when WebMonitor
receives a JarRun request, it spawns a process and run
the main method of the main class of that jar.

Best,
tison.


Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:

> I don't think the `Program` interface could solve the problem.
>
> The launcher launches the job server which creates the job graph,
> submits it and keeps monitoring. Even if user program implement
> `Program` Flink still extracts the JobGraph from `getPlan` and
> submits it, instead of really execute codes in main method of
> user program, so that the launcher is not started.
>
> @Thomas,
>
> Here is an ongoing discussion on client refactoring[1] as Till
> mentioned. However, I'm afraid that with current jar run semantic,
> i.e., extract the job graph and submit it to the Dispatcher, it cannot
> fits your requirement. The problem is that REST API directly
> communicates with Dispatcher and thus it's strange to tell the
> Dispatcher "just run a program in a process".
>
> As you mentioned in the document, with CLI in session mode the
> whole program would be executed sequentially. I'll appreciate it
> if you can participant the thread on client refactor[1]. In the
> design document[2], we propose to provide rich interfaces for
> downstream projects integration. You can customize your CLI for
> executing your program arbitrarily. Any requirement or advise
> would be help.
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> [2]
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
>
>
>
>
> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
>
>> Are you looking for something similar to the `Program` interface? This
>> interface, even though it is a bit outdated and might get removed in the
>> future, offers a `getPlan` method which is called in order to generate the
>> `JobGraph`. In the client refactoring discussion thread it is currently
>> being discussed what to do with this interface.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>> > Couldn't the beam job server use the same work-around we're using in the
>> > JarRunHandler to get access to the JobGraph?
>> >
>> > On 26/07/2019 17:38, Thomas Weise wrote:
>> > > Hi Till,
>> > >
>> > > Thanks for taking a look!
>> > >
>> > > The Beam job server does not currently have the ability to just output
>> > the
>> > > job graph (and related artifacts) that could then be used with the
>> > > JobSubmitHandler. It is itself using StreamExecutionEnvironment,
>> which in
>> > > turn will lead to a REST API submission.
>> > >
>> > > Here I'm looking at what happens before the Beam job server gets
>> > involved:
>> > > the interaction of the k8s operator with the Flink deployment. The jar
>> > run
>> > > endpoint (ignoring the current handler implementation) is generic and
>> > > pretty much exactly matches what we would need for a uniform entry
>> point.
>> > > It's just that in the Beam case the jar file would itself be a
>> "launcher"
>> > > that doesn't provide the job graph itself, but the dependencies and
>> > > mechanism to invoke the actual client.
>> > >
>> > > I could accomplish what I'm looking for by creating a separate REST
>> > > endpoint that looks almost the same. But I would prefer to reuse the
>> > Flink
>> > > REST API interaction that is already implemented for the Flink Java
>> jobs
>> > to
>> > > reduce the complexity of the deployment.
>> > >
>> > > Thomas
>> > >
>> > >
>> > >
>> > >
>> > > On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <[hidden email]>
>> > wrote:
>> > >
>> > >> Hi Thomas,
>> > >>
>> > >> quick question: Why do you wanna use the JarRunHandler? If another
>> > process
>> > >> is building the JobGraph, then one could use the JobSubmitHandler
>> which
>> > >> expects a JobGraph and then starts executing it.
>> > >>
>> > >> Cheers,
>> > >> Till
>> > >>
>> > >> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]> wrote:
>> > >>
>> > >>> Hi,
>> > >>>
>> > >>> While considering different options to launch Beam jobs through the
>> > Flink
>> > >>> REST API, I noticed that the implementation of JarRunHandler places
>> > >> quite a
>> > >>> few restrictions on how the entry point shall construct a Flink
>> job, by
>> > >>> extracting and manipulating the job graph.
>> > >>>
>> > >>> That's normally not a problem for Flink Java programs, but in the
>> > >> scenario
>> > >>> I'm looking at, the job graph would be constructed by a different
>> > process
>> > >>> and isn't available to the REST handler. Instead, I would like to be
>> > able
>> > >>> to just respond with the job ID of the already launched job.
>> > >>>
>> > >>> For context, please see:
>> > >>>
>> > >>>
>> > >>>
>> > >>
>> >
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>> > >>> The current JarRunHandler code is here:
>> > >>>
>> > >>>
>> > >>>
>> > >>
>> >
>> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
>> > >>> It would be nice if there was an option to delegate the
>> responsibility
>> > >> for
>> > >>> job submission to the user code / entry point. That would be useful
>> for
>> > >>> Beam and other frameworks built on top of Flink that dynamically
>> > create a
>> > >>> job graph from a different representation.
>> > >>>
>> > >>> Possible ways to get there:
>> > >>>
>> > >>> * an interface that the main class can be implement end when
>> present,
>> > the
>> > >>> jar run handler calls instead of main.
>> > >>>
>> > >>> * an annotated method
>> > >>>
>> > >>> Either way query parameters like savepoint path and parallelism
>> would
>> > be
>> > >>> forwarded to the user code and the result would be the ID of the
>> > launched
>> > >>> job.
>> > >>>
>> > >>> Thougths?
>> > >>>
>> > >>> Thanks,
>> > >>> Thomas
>> > >>>
>> >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Thomas Weise
Thanks for looking into this.

I see the "Jar run handler" as function that takes few parameters and
returns a job ID. I think it would be nice if the handler doesn't hard code
the function. Perhaps this could be accomplished by pushing the code into
something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that the
main method could also bypass if it has an alternative way to provide the
jobId via a context variable?

Zili: I looked at the client API proposal and left a few comments. I think
it is important to improve programmatic job submission. But it also seems
orthogonal to how the jar run handler operates (i.e. these issues could be
addressed independently).

Chesnay: You are right that the Beam job sever could be hacked to extract
job graph and other ingredients. This isn't desirable though because these
Flink internals should not be exposed downstream. But even if we went down
that route we would still need a way to let the jar run handler know to
just return the ID of an already submitted job vs. trying to submit one
from OptimizerPlanEnvironment.

The intended sequence would be:

REST client provides a launcher jar
REST client "runs jar"
REST handler calls main()
main launches Beam job server, runs Beam pipeline construction code against
that job server
job server uses RemoteEnvironment to submit real job
main "returns job id"
REST handler returns job id

Thomas


On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]> wrote:

> By the way, currently Dispatcher implements RestfulGateway
> and delegate resource request to ResourceManager. If we can,
> semantically, let WebMonitor implement RestfulGateway,
> and delegate job request to Dispatcher, resource request to
> ResourceManager, it seems reasonable that when WebMonitor
> receives a JarRun request, it spawns a process and run
> the main method of the main class of that jar.
>
> Best,
> tison.
>
>
> Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
>
>> I don't think the `Program` interface could solve the problem.
>>
>> The launcher launches the job server which creates the job graph,
>> submits it and keeps monitoring. Even if user program implement
>> `Program` Flink still extracts the JobGraph from `getPlan` and
>> submits it, instead of really execute codes in main method of
>> user program, so that the launcher is not started.
>>
>> @Thomas,
>>
>> Here is an ongoing discussion on client refactoring[1] as Till
>> mentioned. However, I'm afraid that with current jar run semantic,
>> i.e., extract the job graph and submit it to the Dispatcher, it cannot
>> fits your requirement. The problem is that REST API directly
>> communicates with Dispatcher and thus it's strange to tell the
>> Dispatcher "just run a program in a process".
>>
>> As you mentioned in the document, with CLI in session mode the
>> whole program would be executed sequentially. I'll appreciate it
>> if you can participant the thread on client refactor[1]. In the
>> design document[2], we propose to provide rich interfaces for
>> downstream projects integration. You can customize your CLI for
>> executing your program arbitrarily. Any requirement or advise
>> would be help.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>> [2]
>> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
>>
>>
>>
>>
>> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
>>
>>> Are you looking for something similar to the `Program` interface? This
>>> interface, even though it is a bit outdated and might get removed in the
>>> future, offers a `getPlan` method which is called in order to generate
>>> the
>>> `JobGraph`. In the client refactoring discussion thread it is currently
>>> being discussed what to do with this interface.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <[hidden email]>
>>> wrote:
>>>
>>> > Couldn't the beam job server use the same work-around we're using in
>>> the
>>> > JarRunHandler to get access to the JobGraph?
>>> >
>>> > On 26/07/2019 17:38, Thomas Weise wrote:
>>> > > Hi Till,
>>> > >
>>> > > Thanks for taking a look!
>>> > >
>>> > > The Beam job server does not currently have the ability to just
>>> output
>>> > the
>>> > > job graph (and related artifacts) that could then be used with the
>>> > > JobSubmitHandler. It is itself using StreamExecutionEnvironment,
>>> which in
>>> > > turn will lead to a REST API submission.
>>> > >
>>> > > Here I'm looking at what happens before the Beam job server gets
>>> > involved:
>>> > > the interaction of the k8s operator with the Flink deployment. The
>>> jar
>>> > run
>>> > > endpoint (ignoring the current handler implementation) is generic and
>>> > > pretty much exactly matches what we would need for a uniform entry
>>> point.
>>> > > It's just that in the Beam case the jar file would itself be a
>>> "launcher"
>>> > > that doesn't provide the job graph itself, but the dependencies and
>>> > > mechanism to invoke the actual client.
>>> > >
>>> > > I could accomplish what I'm looking for by creating a separate REST
>>> > > endpoint that looks almost the same. But I would prefer to reuse the
>>> > Flink
>>> > > REST API interaction that is already implemented for the Flink Java
>>> jobs
>>> > to
>>> > > reduce the complexity of the deployment.
>>> > >
>>> > > Thomas
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <[hidden email]>
>>> > wrote:
>>> > >
>>> > >> Hi Thomas,
>>> > >>
>>> > >> quick question: Why do you wanna use the JarRunHandler? If another
>>> > process
>>> > >> is building the JobGraph, then one could use the JobSubmitHandler
>>> which
>>> > >> expects a JobGraph and then starts executing it.
>>> > >>
>>> > >> Cheers,
>>> > >> Till
>>> > >>
>>> > >> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]>
>>> wrote:
>>> > >>
>>> > >>> Hi,
>>> > >>>
>>> > >>> While considering different options to launch Beam jobs through the
>>> > Flink
>>> > >>> REST API, I noticed that the implementation of JarRunHandler places
>>> > >> quite a
>>> > >>> few restrictions on how the entry point shall construct a Flink
>>> job, by
>>> > >>> extracting and manipulating the job graph.
>>> > >>>
>>> > >>> That's normally not a problem for Flink Java programs, but in the
>>> > >> scenario
>>> > >>> I'm looking at, the job graph would be constructed by a different
>>> > process
>>> > >>> and isn't available to the REST handler. Instead, I would like to
>>> be
>>> > able
>>> > >>> to just respond with the job ID of the already launched job.
>>> > >>>
>>> > >>> For context, please see:
>>> > >>>
>>> > >>>
>>> > >>>
>>> > >>
>>> >
>>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>>> > >>> The current JarRunHandler code is here:
>>> > >>>
>>> > >>>
>>> > >>>
>>> > >>
>>> >
>>> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
>>> > >>> It would be nice if there was an option to delegate the
>>> responsibility
>>> > >> for
>>> > >>> job submission to the user code / entry point. That would be
>>> useful for
>>> > >>> Beam and other frameworks built on top of Flink that dynamically
>>> > create a
>>> > >>> job graph from a different representation.
>>> > >>>
>>> > >>> Possible ways to get there:
>>> > >>>
>>> > >>> * an interface that the main class can be implement end when
>>> present,
>>> > the
>>> > >>> jar run handler calls instead of main.
>>> > >>>
>>> > >>> * an annotated method
>>> > >>>
>>> > >>> Either way query parameters like savepoint path and parallelism
>>> would
>>> > be
>>> > >>> forwarded to the user code and the result would be the ID of the
>>> > launched
>>> > >>> job.
>>> > >>>
>>> > >>> Thougths?
>>> > >>>
>>> > >>> Thanks,
>>> > >>> Thomas
>>> > >>>
>>> >
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

tison
Hi Thomas,

If REST handler calls main(), the behavior inside main() is
unpredictable.

Now the jar run handler extract the job graph and submit
it with the job id configured in REST request. If REST
handler calls main() we can hardly even know how much
jobs are executed.

A new environment, as you said,
ExtractJobGraphAndSubmitToDispatcherEnvironment can be
added to satisfy your requirement. However, it is a bit
out of Flink scope. It might be better to write your own
REST handler.

WebMonitorExtension is for extending REST handlers but
it seems also unable to customize...

Best,
tison.


Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:

> Thanks for looking into this.
>
> I see the "Jar run handler" as function that takes few parameters and
> returns a job ID. I think it would be nice if the handler doesn't hard code
> the function. Perhaps this could be accomplished by pushing the code into
> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that the
> main method could also bypass if it has an alternative way to provide the
> jobId via a context variable?
>
> Zili: I looked at the client API proposal and left a few comments. I think
> it is important to improve programmatic job submission. But it also seems
> orthogonal to how the jar run handler operates (i.e. these issues could be
> addressed independently).
>
> Chesnay: You are right that the Beam job sever could be hacked to extract
> job graph and other ingredients. This isn't desirable though because these
> Flink internals should not be exposed downstream. But even if we went down
> that route we would still need a way to let the jar run handler know to
> just return the ID of an already submitted job vs. trying to submit one
> from OptimizerPlanEnvironment.
>
> The intended sequence would be:
>
> REST client provides a launcher jar
> REST client "runs jar"
> REST handler calls main()
> main launches Beam job server, runs Beam pipeline construction code against
> that job server
> job server uses RemoteEnvironment to submit real job
> main "returns job id"
> REST handler returns job id
>
> Thomas
>
>
> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]> wrote:
>
> > By the way, currently Dispatcher implements RestfulGateway
> > and delegate resource request to ResourceManager. If we can,
> > semantically, let WebMonitor implement RestfulGateway,
> > and delegate job request to Dispatcher, resource request to
> > ResourceManager, it seems reasonable that when WebMonitor
> > receives a JarRun request, it spawns a process and run
> > the main method of the main class of that jar.
> >
> > Best,
> > tison.
> >
> >
> > Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
> >
> >> I don't think the `Program` interface could solve the problem.
> >>
> >> The launcher launches the job server which creates the job graph,
> >> submits it and keeps monitoring. Even if user program implement
> >> `Program` Flink still extracts the JobGraph from `getPlan` and
> >> submits it, instead of really execute codes in main method of
> >> user program, so that the launcher is not started.
> >>
> >> @Thomas,
> >>
> >> Here is an ongoing discussion on client refactoring[1] as Till
> >> mentioned. However, I'm afraid that with current jar run semantic,
> >> i.e., extract the job graph and submit it to the Dispatcher, it cannot
> >> fits your requirement. The problem is that REST API directly
> >> communicates with Dispatcher and thus it's strange to tell the
> >> Dispatcher "just run a program in a process".
> >>
> >> As you mentioned in the document, with CLI in session mode the
> >> whole program would be executed sequentially. I'll appreciate it
> >> if you can participant the thread on client refactor[1]. In the
> >> design document[2], we propose to provide rich interfaces for
> >> downstream projects integration. You can customize your CLI for
> >> executing your program arbitrarily. Any requirement or advise
> >> would be help.
> >>
> >> Best,
> >> tison.
> >>
> >> [1]
> >>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >> [2]
> >>
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
> >>
> >>
> >>
> >>
> >> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
> >>
> >>> Are you looking for something similar to the `Program` interface? This
> >>> interface, even though it is a bit outdated and might get removed in
> the
> >>> future, offers a `getPlan` method which is called in order to generate
> >>> the
> >>> `JobGraph`. In the client refactoring discussion thread it is currently
> >>> being discussed what to do with this interface.
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <[hidden email]>
> >>> wrote:
> >>>
> >>> > Couldn't the beam job server use the same work-around we're using in
> >>> the
> >>> > JarRunHandler to get access to the JobGraph?
> >>> >
> >>> > On 26/07/2019 17:38, Thomas Weise wrote:
> >>> > > Hi Till,
> >>> > >
> >>> > > Thanks for taking a look!
> >>> > >
> >>> > > The Beam job server does not currently have the ability to just
> >>> output
> >>> > the
> >>> > > job graph (and related artifacts) that could then be used with the
> >>> > > JobSubmitHandler. It is itself using StreamExecutionEnvironment,
> >>> which in
> >>> > > turn will lead to a REST API submission.
> >>> > >
> >>> > > Here I'm looking at what happens before the Beam job server gets
> >>> > involved:
> >>> > > the interaction of the k8s operator with the Flink deployment. The
> >>> jar
> >>> > run
> >>> > > endpoint (ignoring the current handler implementation) is generic
> and
> >>> > > pretty much exactly matches what we would need for a uniform entry
> >>> point.
> >>> > > It's just that in the Beam case the jar file would itself be a
> >>> "launcher"
> >>> > > that doesn't provide the job graph itself, but the dependencies and
> >>> > > mechanism to invoke the actual client.
> >>> > >
> >>> > > I could accomplish what I'm looking for by creating a separate REST
> >>> > > endpoint that looks almost the same. But I would prefer to reuse
> the
> >>> > Flink
> >>> > > REST API interaction that is already implemented for the Flink Java
> >>> jobs
> >>> > to
> >>> > > reduce the complexity of the deployment.
> >>> > >
> >>> > > Thomas
> >>> > >
> >>> > >
> >>> > >
> >>> > >
> >>> > > On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
> [hidden email]>
> >>> > wrote:
> >>> > >
> >>> > >> Hi Thomas,
> >>> > >>
> >>> > >> quick question: Why do you wanna use the JarRunHandler? If another
> >>> > process
> >>> > >> is building the JobGraph, then one could use the JobSubmitHandler
> >>> which
> >>> > >> expects a JobGraph and then starts executing it.
> >>> > >>
> >>> > >> Cheers,
> >>> > >> Till
> >>> > >>
> >>> > >> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]>
> >>> wrote:
> >>> > >>
> >>> > >>> Hi,
> >>> > >>>
> >>> > >>> While considering different options to launch Beam jobs through
> the
> >>> > Flink
> >>> > >>> REST API, I noticed that the implementation of JarRunHandler
> places
> >>> > >> quite a
> >>> > >>> few restrictions on how the entry point shall construct a Flink
> >>> job, by
> >>> > >>> extracting and manipulating the job graph.
> >>> > >>>
> >>> > >>> That's normally not a problem for Flink Java programs, but in the
> >>> > >> scenario
> >>> > >>> I'm looking at, the job graph would be constructed by a different
> >>> > process
> >>> > >>> and isn't available to the REST handler. Instead, I would like to
> >>> be
> >>> > able
> >>> > >>> to just respond with the job ID of the already launched job.
> >>> > >>>
> >>> > >>> For context, please see:
> >>> > >>>
> >>> > >>>
> >>> > >>>
> >>> > >>
> >>> >
> >>>
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> >>> > >>> The current JarRunHandler code is here:
> >>> > >>>
> >>> > >>>
> >>> > >>>
> >>> > >>
> >>> >
> >>>
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> >>> > >>> It would be nice if there was an option to delegate the
> >>> responsibility
> >>> > >> for
> >>> > >>> job submission to the user code / entry point. That would be
> >>> useful for
> >>> > >>> Beam and other frameworks built on top of Flink that dynamically
> >>> > create a
> >>> > >>> job graph from a different representation.
> >>> > >>>
> >>> > >>> Possible ways to get there:
> >>> > >>>
> >>> > >>> * an interface that the main class can be implement end when
> >>> present,
> >>> > the
> >>> > >>> jar run handler calls instead of main.
> >>> > >>>
> >>> > >>> * an annotated method
> >>> > >>>
> >>> > >>> Either way query parameters like savepoint path and parallelism
> >>> would
> >>> > be
> >>> > >>> forwarded to the user code and the result would be the ID of the
> >>> > launched
> >>> > >>> job.
> >>> > >>>
> >>> > >>> Thougths?
> >>> > >>>
> >>> > >>> Thanks,
> >>> > >>> Thomas
> >>> > >>>
> >>> >
> >>> >
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Thomas Weise
If the goal is to keep job creation and job submission separate and we
agree that there should be more flexibility for the job construction, then
JobGraph and friends should be stable API that the user can depend on. If
that's the case, the path Chesnay pointed to may become viable.

There was discussion in the past that JobGraph cannot be relied on WRT
backward compatibility and I would expect that at some point we want to
move to a representation that allows for cross version compatibility. Beam
is an example how this could be accomplished (with its pipeline proto).

So if the Beam job server was able to produce the JobGraph, is there
agreement that we should provide a mechanism that allows the program entry
point to return the JobGraph directly (without using the
ExecutionEnvironment to build it)?


On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <[hidden email]> wrote:

> Hi Thomas,
>
> If REST handler calls main(), the behavior inside main() is
> unpredictable.
>
> Now the jar run handler extract the job graph and submit
> it with the job id configured in REST request. If REST
> handler calls main() we can hardly even know how much
> jobs are executed.
>
> A new environment, as you said,
> ExtractJobGraphAndSubmitToDispatcherEnvironment can be
> added to satisfy your requirement. However, it is a bit
> out of Flink scope. It might be better to write your own
> REST handler.
>
> WebMonitorExtension is for extending REST handlers but
> it seems also unable to customize...
>
> Best,
> tison.
>
>
> Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:
>
> > Thanks for looking into this.
> >
> > I see the "Jar run handler" as function that takes few parameters and
> > returns a job ID. I think it would be nice if the handler doesn't hard
> code
> > the function. Perhaps this could be accomplished by pushing the code into
> > something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that the
> > main method could also bypass if it has an alternative way to provide the
> > jobId via a context variable?
> >
> > Zili: I looked at the client API proposal and left a few comments. I
> think
> > it is important to improve programmatic job submission. But it also seems
> > orthogonal to how the jar run handler operates (i.e. these issues could
> be
> > addressed independently).
> >
> > Chesnay: You are right that the Beam job sever could be hacked to extract
> > job graph and other ingredients. This isn't desirable though because
> these
> > Flink internals should not be exposed downstream. But even if we went
> down
> > that route we would still need a way to let the jar run handler know to
> > just return the ID of an already submitted job vs. trying to submit one
> > from OptimizerPlanEnvironment.
> >
> > The intended sequence would be:
> >
> > REST client provides a launcher jar
> > REST client "runs jar"
> > REST handler calls main()
> > main launches Beam job server, runs Beam pipeline construction code
> against
> > that job server
> > job server uses RemoteEnvironment to submit real job
> > main "returns job id"
> > REST handler returns job id
> >
> > Thomas
> >
> >
> > On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]> wrote:
> >
> > > By the way, currently Dispatcher implements RestfulGateway
> > > and delegate resource request to ResourceManager. If we can,
> > > semantically, let WebMonitor implement RestfulGateway,
> > > and delegate job request to Dispatcher, resource request to
> > > ResourceManager, it seems reasonable that when WebMonitor
> > > receives a JarRun request, it spawns a process and run
> > > the main method of the main class of that jar.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
> > >
> > >> I don't think the `Program` interface could solve the problem.
> > >>
> > >> The launcher launches the job server which creates the job graph,
> > >> submits it and keeps monitoring. Even if user program implement
> > >> `Program` Flink still extracts the JobGraph from `getPlan` and
> > >> submits it, instead of really execute codes in main method of
> > >> user program, so that the launcher is not started.
> > >>
> > >> @Thomas,
> > >>
> > >> Here is an ongoing discussion on client refactoring[1] as Till
> > >> mentioned. However, I'm afraid that with current jar run semantic,
> > >> i.e., extract the job graph and submit it to the Dispatcher, it cannot
> > >> fits your requirement. The problem is that REST API directly
> > >> communicates with Dispatcher and thus it's strange to tell the
> > >> Dispatcher "just run a program in a process".
> > >>
> > >> As you mentioned in the document, with CLI in session mode the
> > >> whole program would be executed sequentially. I'll appreciate it
> > >> if you can participant the thread on client refactor[1]. In the
> > >> design document[2], we propose to provide rich interfaces for
> > >> downstream projects integration. You can customize your CLI for
> > >> executing your program arbitrarily. Any requirement or advise
> > >> would be help.
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >> [1]
> > >>
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > >> [2]
> > >>
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
> > >>
> > >>
> > >>
> > >>
> > >> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
> > >>
> > >>> Are you looking for something similar to the `Program` interface?
> This
> > >>> interface, even though it is a bit outdated and might get removed in
> > the
> > >>> future, offers a `getPlan` method which is called in order to
> generate
> > >>> the
> > >>> `JobGraph`. In the client refactoring discussion thread it is
> currently
> > >>> being discussed what to do with this interface.
> > >>>
> > >>> Cheers,
> > >>> Till
> > >>>
> > >>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
> [hidden email]>
> > >>> wrote:
> > >>>
> > >>> > Couldn't the beam job server use the same work-around we're using
> in
> > >>> the
> > >>> > JarRunHandler to get access to the JobGraph?
> > >>> >
> > >>> > On 26/07/2019 17:38, Thomas Weise wrote:
> > >>> > > Hi Till,
> > >>> > >
> > >>> > > Thanks for taking a look!
> > >>> > >
> > >>> > > The Beam job server does not currently have the ability to just
> > >>> output
> > >>> > the
> > >>> > > job graph (and related artifacts) that could then be used with
> the
> > >>> > > JobSubmitHandler. It is itself using StreamExecutionEnvironment,
> > >>> which in
> > >>> > > turn will lead to a REST API submission.
> > >>> > >
> > >>> > > Here I'm looking at what happens before the Beam job server gets
> > >>> > involved:
> > >>> > > the interaction of the k8s operator with the Flink deployment.
> The
> > >>> jar
> > >>> > run
> > >>> > > endpoint (ignoring the current handler implementation) is generic
> > and
> > >>> > > pretty much exactly matches what we would need for a uniform
> entry
> > >>> point.
> > >>> > > It's just that in the Beam case the jar file would itself be a
> > >>> "launcher"
> > >>> > > that doesn't provide the job graph itself, but the dependencies
> and
> > >>> > > mechanism to invoke the actual client.
> > >>> > >
> > >>> > > I could accomplish what I'm looking for by creating a separate
> REST
> > >>> > > endpoint that looks almost the same. But I would prefer to reuse
> > the
> > >>> > Flink
> > >>> > > REST API interaction that is already implemented for the Flink
> Java
> > >>> jobs
> > >>> > to
> > >>> > > reduce the complexity of the deployment.
> > >>> > >
> > >>> > > Thomas
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
> > [hidden email]>
> > >>> > wrote:
> > >>> > >
> > >>> > >> Hi Thomas,
> > >>> > >>
> > >>> > >> quick question: Why do you wanna use the JarRunHandler? If
> another
> > >>> > process
> > >>> > >> is building the JobGraph, then one could use the
> JobSubmitHandler
> > >>> which
> > >>> > >> expects a JobGraph and then starts executing it.
> > >>> > >>
> > >>> > >> Cheers,
> > >>> > >> Till
> > >>> > >>
> > >>> > >> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]>
> > >>> wrote:
> > >>> > >>
> > >>> > >>> Hi,
> > >>> > >>>
> > >>> > >>> While considering different options to launch Beam jobs through
> > the
> > >>> > Flink
> > >>> > >>> REST API, I noticed that the implementation of JarRunHandler
> > places
> > >>> > >> quite a
> > >>> > >>> few restrictions on how the entry point shall construct a Flink
> > >>> job, by
> > >>> > >>> extracting and manipulating the job graph.
> > >>> > >>>
> > >>> > >>> That's normally not a problem for Flink Java programs, but in
> the
> > >>> > >> scenario
> > >>> > >>> I'm looking at, the job graph would be constructed by a
> different
> > >>> > process
> > >>> > >>> and isn't available to the REST handler. Instead, I would like
> to
> > >>> be
> > >>> > able
> > >>> > >>> to just respond with the job ID of the already launched job.
> > >>> > >>>
> > >>> > >>> For context, please see:
> > >>> > >>>
> > >>> > >>>
> > >>> > >>>
> > >>> > >>
> > >>> >
> > >>>
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> > >>> > >>> The current JarRunHandler code is here:
> > >>> > >>>
> > >>> > >>>
> > >>> > >>>
> > >>> > >>
> > >>> >
> > >>>
> >
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> > >>> > >>> It would be nice if there was an option to delegate the
> > >>> responsibility
> > >>> > >> for
> > >>> > >>> job submission to the user code / entry point. That would be
> > >>> useful for
> > >>> > >>> Beam and other frameworks built on top of Flink that
> dynamically
> > >>> > create a
> > >>> > >>> job graph from a different representation.
> > >>> > >>>
> > >>> > >>> Possible ways to get there:
> > >>> > >>>
> > >>> > >>> * an interface that the main class can be implement end when
> > >>> present,
> > >>> > the
> > >>> > >>> jar run handler calls instead of main.
> > >>> > >>>
> > >>> > >>> * an annotated method
> > >>> > >>>
> > >>> > >>> Either way query parameters like savepoint path and parallelism
> > >>> would
> > >>> > be
> > >>> > >>> forwarded to the user code and the result would be the ID of
> the
> > >>> > launched
> > >>> > >>> job.
> > >>> > >>>
> > >>> > >>> Thougths?
> > >>> > >>>
> > >>> > >>> Thanks,
> > >>> > >>> Thomas
> > >>> > >>>
> > >>> >
> > >>> >
> > >>>
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

tison
It sounds like a request to change the interface Program into

public interface Program {
  JobGraph getJobGraph(String... args);
}

Also, given that JobGraph is said as internal interface or
cannot be relied on, we might introduce and use a
representation that allows for cross version compatibility.


Thomas Weise <[hidden email]> 于2019年8月6日周二 上午12:11写道:

> If the goal is to keep job creation and job submission separate and we
> agree that there should be more flexibility for the job construction, then
> JobGraph and friends should be stable API that the user can depend on. If
> that's the case, the path Chesnay pointed to may become viable.
>
> There was discussion in the past that JobGraph cannot be relied on WRT
> backward compatibility and I would expect that at some point we want to
> move to a representation that allows for cross version compatibility. Beam
> is an example how this could be accomplished (with its pipeline proto).
>
> So if the Beam job server was able to produce the JobGraph, is there
> agreement that we should provide a mechanism that allows the program entry
> point to return the JobGraph directly (without using the
> ExecutionEnvironment to build it)?
>
>
> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <[hidden email]> wrote:
>
> > Hi Thomas,
> >
> > If REST handler calls main(), the behavior inside main() is
> > unpredictable.
> >
> > Now the jar run handler extract the job graph and submit
> > it with the job id configured in REST request. If REST
> > handler calls main() we can hardly even know how much
> > jobs are executed.
> >
> > A new environment, as you said,
> > ExtractJobGraphAndSubmitToDispatcherEnvironment can be
> > added to satisfy your requirement. However, it is a bit
> > out of Flink scope. It might be better to write your own
> > REST handler.
> >
> > WebMonitorExtension is for extending REST handlers but
> > it seems also unable to customize...
> >
> > Best,
> > tison.
> >
> >
> > Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:
> >
> > > Thanks for looking into this.
> > >
> > > I see the "Jar run handler" as function that takes few parameters and
> > > returns a job ID. I think it would be nice if the handler doesn't hard
> > code
> > > the function. Perhaps this could be accomplished by pushing the code
> into
> > > something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that
> the
> > > main method could also bypass if it has an alternative way to provide
> the
> > > jobId via a context variable?
> > >
> > > Zili: I looked at the client API proposal and left a few comments. I
> > think
> > > it is important to improve programmatic job submission. But it also
> seems
> > > orthogonal to how the jar run handler operates (i.e. these issues could
> > be
> > > addressed independently).
> > >
> > > Chesnay: You are right that the Beam job sever could be hacked to
> extract
> > > job graph and other ingredients. This isn't desirable though because
> > these
> > > Flink internals should not be exposed downstream. But even if we went
> > down
> > > that route we would still need a way to let the jar run handler know to
> > > just return the ID of an already submitted job vs. trying to submit one
> > > from OptimizerPlanEnvironment.
> > >
> > > The intended sequence would be:
> > >
> > > REST client provides a launcher jar
> > > REST client "runs jar"
> > > REST handler calls main()
> > > main launches Beam job server, runs Beam pipeline construction code
> > against
> > > that job server
> > > job server uses RemoteEnvironment to submit real job
> > > main "returns job id"
> > > REST handler returns job id
> > >
> > > Thomas
> > >
> > >
> > > On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]>
> wrote:
> > >
> > > > By the way, currently Dispatcher implements RestfulGateway
> > > > and delegate resource request to ResourceManager. If we can,
> > > > semantically, let WebMonitor implement RestfulGateway,
> > > > and delegate job request to Dispatcher, resource request to
> > > > ResourceManager, it seems reasonable that when WebMonitor
> > > > receives a JarRun request, it spawns a process and run
> > > > the main method of the main class of that jar.
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
> > > >
> > > >> I don't think the `Program` interface could solve the problem.
> > > >>
> > > >> The launcher launches the job server which creates the job graph,
> > > >> submits it and keeps monitoring. Even if user program implement
> > > >> `Program` Flink still extracts the JobGraph from `getPlan` and
> > > >> submits it, instead of really execute codes in main method of
> > > >> user program, so that the launcher is not started.
> > > >>
> > > >> @Thomas,
> > > >>
> > > >> Here is an ongoing discussion on client refactoring[1] as Till
> > > >> mentioned. However, I'm afraid that with current jar run semantic,
> > > >> i.e., extract the job graph and submit it to the Dispatcher, it
> cannot
> > > >> fits your requirement. The problem is that REST API directly
> > > >> communicates with Dispatcher and thus it's strange to tell the
> > > >> Dispatcher "just run a program in a process".
> > > >>
> > > >> As you mentioned in the document, with CLI in session mode the
> > > >> whole program would be executed sequentially. I'll appreciate it
> > > >> if you can participant the thread on client refactor[1]. In the
> > > >> design document[2], we propose to provide rich interfaces for
> > > >> downstream projects integration. You can customize your CLI for
> > > >> executing your program arbitrarily. Any requirement or advise
> > > >> would be help.
> > > >>
> > > >> Best,
> > > >> tison.
> > > >>
> > > >> [1]
> > > >>
> > >
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > >> [2]
> > > >>
> > >
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
> > > >>
> > > >>> Are you looking for something similar to the `Program` interface?
> > This
> > > >>> interface, even though it is a bit outdated and might get removed
> in
> > > the
> > > >>> future, offers a `getPlan` method which is called in order to
> > generate
> > > >>> the
> > > >>> `JobGraph`. In the client refactoring discussion thread it is
> > currently
> > > >>> being discussed what to do with this interface.
> > > >>>
> > > >>> Cheers,
> > > >>> Till
> > > >>>
> > > >>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
> > [hidden email]>
> > > >>> wrote:
> > > >>>
> > > >>> > Couldn't the beam job server use the same work-around we're using
> > in
> > > >>> the
> > > >>> > JarRunHandler to get access to the JobGraph?
> > > >>> >
> > > >>> > On 26/07/2019 17:38, Thomas Weise wrote:
> > > >>> > > Hi Till,
> > > >>> > >
> > > >>> > > Thanks for taking a look!
> > > >>> > >
> > > >>> > > The Beam job server does not currently have the ability to just
> > > >>> output
> > > >>> > the
> > > >>> > > job graph (and related artifacts) that could then be used with
> > the
> > > >>> > > JobSubmitHandler. It is itself using
> StreamExecutionEnvironment,
> > > >>> which in
> > > >>> > > turn will lead to a REST API submission.
> > > >>> > >
> > > >>> > > Here I'm looking at what happens before the Beam job server
> gets
> > > >>> > involved:
> > > >>> > > the interaction of the k8s operator with the Flink deployment.
> > The
> > > >>> jar
> > > >>> > run
> > > >>> > > endpoint (ignoring the current handler implementation) is
> generic
> > > and
> > > >>> > > pretty much exactly matches what we would need for a uniform
> > entry
> > > >>> point.
> > > >>> > > It's just that in the Beam case the jar file would itself be a
> > > >>> "launcher"
> > > >>> > > that doesn't provide the job graph itself, but the dependencies
> > and
> > > >>> > > mechanism to invoke the actual client.
> > > >>> > >
> > > >>> > > I could accomplish what I'm looking for by creating a separate
> > REST
> > > >>> > > endpoint that looks almost the same. But I would prefer to
> reuse
> > > the
> > > >>> > Flink
> > > >>> > > REST API interaction that is already implemented for the Flink
> > Java
> > > >>> jobs
> > > >>> > to
> > > >>> > > reduce the complexity of the deployment.
> > > >>> > >
> > > >>> > > Thomas
> > > >>> > >
> > > >>> > >
> > > >>> > >
> > > >>> > >
> > > >>> > > On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
> > > [hidden email]>
> > > >>> > wrote:
> > > >>> > >
> > > >>> > >> Hi Thomas,
> > > >>> > >>
> > > >>> > >> quick question: Why do you wanna use the JarRunHandler? If
> > another
> > > >>> > process
> > > >>> > >> is building the JobGraph, then one could use the
> > JobSubmitHandler
> > > >>> which
> > > >>> > >> expects a JobGraph and then starts executing it.
> > > >>> > >>
> > > >>> > >> Cheers,
> > > >>> > >> Till
> > > >>> > >>
> > > >>> > >> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <[hidden email]>
> > > >>> wrote:
> > > >>> > >>
> > > >>> > >>> Hi,
> > > >>> > >>>
> > > >>> > >>> While considering different options to launch Beam jobs
> through
> > > the
> > > >>> > Flink
> > > >>> > >>> REST API, I noticed that the implementation of JarRunHandler
> > > places
> > > >>> > >> quite a
> > > >>> > >>> few restrictions on how the entry point shall construct a
> Flink
> > > >>> job, by
> > > >>> > >>> extracting and manipulating the job graph.
> > > >>> > >>>
> > > >>> > >>> That's normally not a problem for Flink Java programs, but in
> > the
> > > >>> > >> scenario
> > > >>> > >>> I'm looking at, the job graph would be constructed by a
> > different
> > > >>> > process
> > > >>> > >>> and isn't available to the REST handler. Instead, I would
> like
> > to
> > > >>> be
> > > >>> > able
> > > >>> > >>> to just respond with the job ID of the already launched job.
> > > >>> > >>>
> > > >>> > >>> For context, please see:
> > > >>> > >>>
> > > >>> > >>>
> > > >>> > >>>
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> > > >>> > >>> The current JarRunHandler code is here:
> > > >>> > >>>
> > > >>> > >>>
> > > >>> > >>>
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> > > >>> > >>> It would be nice if there was an option to delegate the
> > > >>> responsibility
> > > >>> > >> for
> > > >>> > >>> job submission to the user code / entry point. That would be
> > > >>> useful for
> > > >>> > >>> Beam and other frameworks built on top of Flink that
> > dynamically
> > > >>> > create a
> > > >>> > >>> job graph from a different representation.
> > > >>> > >>>
> > > >>> > >>> Possible ways to get there:
> > > >>> > >>>
> > > >>> > >>> * an interface that the main class can be implement end when
> > > >>> present,
> > > >>> > the
> > > >>> > >>> jar run handler calls instead of main.
> > > >>> > >>>
> > > >>> > >>> * an annotated method
> > > >>> > >>>
> > > >>> > >>> Either way query parameters like savepoint path and
> parallelism
> > > >>> would
> > > >>> > be
> > > >>> > >>> forwarded to the user code and the result would be the ID of
> > the
> > > >>> > launched
> > > >>> > >>> job.
> > > >>> > >>>
> > > >>> > >>> Thougths?
> > > >>> > >>>
> > > >>> > >>> Thanks,
> > > >>> > >>> Thomas
> > > >>> > >>>
> > > >>> >
> > > >>> >
> > > >>>
> > > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Till Rohrmann
I think there was the idea to make the JobGraph a "public"/stable interface
other projects can rely on at some point. If I remember correctly, then we
wanted to define a proto buf definition for the JobGraph so that clients
written in a different language can submit JobGraphs and we could extend
the data structure. As far as I know, this effort hasn't been started yet
and is still in the backlog (I think there doesn't exist a JIRA issue yet).

The problem came up when discussing additions to the JobGraph because they
need to be backwards compatible otherwise newer version of Flink would not
be able to recover jobs. I think so far Flink provides backwards
compatibility between different versions of the JobGraph. However, this is
not officially guaranteed.

Cheers,
Till

On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <[hidden email]> wrote:

> It sounds like a request to change the interface Program into
>
> public interface Program {
>   JobGraph getJobGraph(String... args);
> }
>
> Also, given that JobGraph is said as internal interface or
> cannot be relied on, we might introduce and use a
> representation that allows for cross version compatibility.
>
>
> Thomas Weise <[hidden email]> 于2019年8月6日周二 上午12:11写道:
>
> > If the goal is to keep job creation and job submission separate and we
> > agree that there should be more flexibility for the job construction,
> then
> > JobGraph and friends should be stable API that the user can depend on. If
> > that's the case, the path Chesnay pointed to may become viable.
> >
> > There was discussion in the past that JobGraph cannot be relied on WRT
> > backward compatibility and I would expect that at some point we want to
> > move to a representation that allows for cross version compatibility.
> Beam
> > is an example how this could be accomplished (with its pipeline proto).
> >
> > So if the Beam job server was able to produce the JobGraph, is there
> > agreement that we should provide a mechanism that allows the program
> entry
> > point to return the JobGraph directly (without using the
> > ExecutionEnvironment to build it)?
> >
> >
> > On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <[hidden email]> wrote:
> >
> > > Hi Thomas,
> > >
> > > If REST handler calls main(), the behavior inside main() is
> > > unpredictable.
> > >
> > > Now the jar run handler extract the job graph and submit
> > > it with the job id configured in REST request. If REST
> > > handler calls main() we can hardly even know how much
> > > jobs are executed.
> > >
> > > A new environment, as you said,
> > > ExtractJobGraphAndSubmitToDispatcherEnvironment can be
> > > added to satisfy your requirement. However, it is a bit
> > > out of Flink scope. It might be better to write your own
> > > REST handler.
> > >
> > > WebMonitorExtension is for extending REST handlers but
> > > it seems also unable to customize...
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:
> > >
> > > > Thanks for looking into this.
> > > >
> > > > I see the "Jar run handler" as function that takes few parameters and
> > > > returns a job ID. I think it would be nice if the handler doesn't
> hard
> > > code
> > > > the function. Perhaps this could be accomplished by pushing the code
> > into
> > > > something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that
> > the
> > > > main method could also bypass if it has an alternative way to provide
> > the
> > > > jobId via a context variable?
> > > >
> > > > Zili: I looked at the client API proposal and left a few comments. I
> > > think
> > > > it is important to improve programmatic job submission. But it also
> > seems
> > > > orthogonal to how the jar run handler operates (i.e. these issues
> could
> > > be
> > > > addressed independently).
> > > >
> > > > Chesnay: You are right that the Beam job sever could be hacked to
> > extract
> > > > job graph and other ingredients. This isn't desirable though because
> > > these
> > > > Flink internals should not be exposed downstream. But even if we went
> > > down
> > > > that route we would still need a way to let the jar run handler know
> to
> > > > just return the ID of an already submitted job vs. trying to submit
> one
> > > > from OptimizerPlanEnvironment.
> > > >
> > > > The intended sequence would be:
> > > >
> > > > REST client provides a launcher jar
> > > > REST client "runs jar"
> > > > REST handler calls main()
> > > > main launches Beam job server, runs Beam pipeline construction code
> > > against
> > > > that job server
> > > > job server uses RemoteEnvironment to submit real job
> > > > main "returns job id"
> > > > REST handler returns job id
> > > >
> > > > Thomas
> > > >
> > > >
> > > > On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]>
> > wrote:
> > > >
> > > > > By the way, currently Dispatcher implements RestfulGateway
> > > > > and delegate resource request to ResourceManager. If we can,
> > > > > semantically, let WebMonitor implement RestfulGateway,
> > > > > and delegate job request to Dispatcher, resource request to
> > > > > ResourceManager, it seems reasonable that when WebMonitor
> > > > > receives a JarRun request, it spawns a process and run
> > > > > the main method of the main class of that jar.
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
> > > > >
> > > > >> I don't think the `Program` interface could solve the problem.
> > > > >>
> > > > >> The launcher launches the job server which creates the job graph,
> > > > >> submits it and keeps monitoring. Even if user program implement
> > > > >> `Program` Flink still extracts the JobGraph from `getPlan` and
> > > > >> submits it, instead of really execute codes in main method of
> > > > >> user program, so that the launcher is not started.
> > > > >>
> > > > >> @Thomas,
> > > > >>
> > > > >> Here is an ongoing discussion on client refactoring[1] as Till
> > > > >> mentioned. However, I'm afraid that with current jar run semantic,
> > > > >> i.e., extract the job graph and submit it to the Dispatcher, it
> > cannot
> > > > >> fits your requirement. The problem is that REST API directly
> > > > >> communicates with Dispatcher and thus it's strange to tell the
> > > > >> Dispatcher "just run a program in a process".
> > > > >>
> > > > >> As you mentioned in the document, with CLI in session mode the
> > > > >> whole program would be executed sequentially. I'll appreciate it
> > > > >> if you can participant the thread on client refactor[1]. In the
> > > > >> design document[2], we propose to provide rich interfaces for
> > > > >> downstream projects integration. You can customize your CLI for
> > > > >> executing your program arbitrarily. Any requirement or advise
> > > > >> would be help.
> > > > >>
> > > > >> Best,
> > > > >> tison.
> > > > >>
> > > > >> [1]
> > > > >>
> > > >
> > >
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > > >> [2]
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
> > > > >>
> > > > >>> Are you looking for something similar to the `Program` interface?
> > > This
> > > > >>> interface, even though it is a bit outdated and might get removed
> > in
> > > > the
> > > > >>> future, offers a `getPlan` method which is called in order to
> > > generate
> > > > >>> the
> > > > >>> `JobGraph`. In the client refactoring discussion thread it is
> > > currently
> > > > >>> being discussed what to do with this interface.
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Till
> > > > >>>
> > > > >>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
> > > [hidden email]>
> > > > >>> wrote:
> > > > >>>
> > > > >>> > Couldn't the beam job server use the same work-around we're
> using
> > > in
> > > > >>> the
> > > > >>> > JarRunHandler to get access to the JobGraph?
> > > > >>> >
> > > > >>> > On 26/07/2019 17:38, Thomas Weise wrote:
> > > > >>> > > Hi Till,
> > > > >>> > >
> > > > >>> > > Thanks for taking a look!
> > > > >>> > >
> > > > >>> > > The Beam job server does not currently have the ability to
> just
> > > > >>> output
> > > > >>> > the
> > > > >>> > > job graph (and related artifacts) that could then be used
> with
> > > the
> > > > >>> > > JobSubmitHandler. It is itself using
> > StreamExecutionEnvironment,
> > > > >>> which in
> > > > >>> > > turn will lead to a REST API submission.
> > > > >>> > >
> > > > >>> > > Here I'm looking at what happens before the Beam job server
> > gets
> > > > >>> > involved:
> > > > >>> > > the interaction of the k8s operator with the Flink
> deployment.
> > > The
> > > > >>> jar
> > > > >>> > run
> > > > >>> > > endpoint (ignoring the current handler implementation) is
> > generic
> > > > and
> > > > >>> > > pretty much exactly matches what we would need for a uniform
> > > entry
> > > > >>> point.
> > > > >>> > > It's just that in the Beam case the jar file would itself be
> a
> > > > >>> "launcher"
> > > > >>> > > that doesn't provide the job graph itself, but the
> dependencies
> > > and
> > > > >>> > > mechanism to invoke the actual client.
> > > > >>> > >
> > > > >>> > > I could accomplish what I'm looking for by creating a
> separate
> > > REST
> > > > >>> > > endpoint that looks almost the same. But I would prefer to
> > reuse
> > > > the
> > > > >>> > Flink
> > > > >>> > > REST API interaction that is already implemented for the
> Flink
> > > Java
> > > > >>> jobs
> > > > >>> > to
> > > > >>> > > reduce the complexity of the deployment.
> > > > >>> > >
> > > > >>> > > Thomas
> > > > >>> > >
> > > > >>> > >
> > > > >>> > >
> > > > >>> > >
> > > > >>> > > On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
> > > > [hidden email]>
> > > > >>> > wrote:
> > > > >>> > >
> > > > >>> > >> Hi Thomas,
> > > > >>> > >>
> > > > >>> > >> quick question: Why do you wanna use the JarRunHandler? If
> > > another
> > > > >>> > process
> > > > >>> > >> is building the JobGraph, then one could use the
> > > JobSubmitHandler
> > > > >>> which
> > > > >>> > >> expects a JobGraph and then starts executing it.
> > > > >>> > >>
> > > > >>> > >> Cheers,
> > > > >>> > >> Till
> > > > >>> > >>
> > > > >>> > >> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <
> [hidden email]>
> > > > >>> wrote:
> > > > >>> > >>
> > > > >>> > >>> Hi,
> > > > >>> > >>>
> > > > >>> > >>> While considering different options to launch Beam jobs
> > through
> > > > the
> > > > >>> > Flink
> > > > >>> > >>> REST API, I noticed that the implementation of
> JarRunHandler
> > > > places
> > > > >>> > >> quite a
> > > > >>> > >>> few restrictions on how the entry point shall construct a
> > Flink
> > > > >>> job, by
> > > > >>> > >>> extracting and manipulating the job graph.
> > > > >>> > >>>
> > > > >>> > >>> That's normally not a problem for Flink Java programs, but
> in
> > > the
> > > > >>> > >> scenario
> > > > >>> > >>> I'm looking at, the job graph would be constructed by a
> > > different
> > > > >>> > process
> > > > >>> > >>> and isn't available to the REST handler. Instead, I would
> > like
> > > to
> > > > >>> be
> > > > >>> > able
> > > > >>> > >>> to just respond with the job ID of the already launched
> job.
> > > > >>> > >>>
> > > > >>> > >>> For context, please see:
> > > > >>> > >>>
> > > > >>> > >>>
> > > > >>> > >>>
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> > > > >>> > >>> The current JarRunHandler code is here:
> > > > >>> > >>>
> > > > >>> > >>>
> > > > >>> > >>>
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> > > > >>> > >>> It would be nice if there was an option to delegate the
> > > > >>> responsibility
> > > > >>> > >> for
> > > > >>> > >>> job submission to the user code / entry point. That would
> be
> > > > >>> useful for
> > > > >>> > >>> Beam and other frameworks built on top of Flink that
> > > dynamically
> > > > >>> > create a
> > > > >>> > >>> job graph from a different representation.
> > > > >>> > >>>
> > > > >>> > >>> Possible ways to get there:
> > > > >>> > >>>
> > > > >>> > >>> * an interface that the main class can be implement end
> when
> > > > >>> present,
> > > > >>> > the
> > > > >>> > >>> jar run handler calls instead of main.
> > > > >>> > >>>
> > > > >>> > >>> * an annotated method
> > > > >>> > >>>
> > > > >>> > >>> Either way query parameters like savepoint path and
> > parallelism
> > > > >>> would
> > > > >>> > be
> > > > >>> > >>> forwarded to the user code and the result would be the ID
> of
> > > the
> > > > >>> > launched
> > > > >>> > >>> job.
> > > > >>> > >>>
> > > > >>> > >>> Thougths?
> > > > >>> > >>>
> > > > >>> > >>> Thanks,
> > > > >>> > >>> Thomas
> > > > >>> > >>>
> > > > >>> >
> > > > >>> >
> > > > >>>
> > > > >>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Aljoscha Krettek-2
Hi,

Regarding the original proposal: I don’t think spawning another process inside the JarHandler.runJar() is the way to go here. Looking at the bigger picture, the proposal would get us to roughly this situation:

1. Spawn Kubernetes containers (JobManager and TaskManagers)
2. User does a REST call to JobManager.runJar() to submit the user job
3. JobManager.runJar() opens a port that waits for job submission
4. JobMananger.runJar() invokes UserProgram.main()
5. UserProgram.main() launches a process (BeamJobService) that opens a port to wait for a Python process to connect to it
6. UserProgram.main() launches another process (the Python code, or any language, really) that connects to BeamJobService to submit the Pipeline
7. BeamJobService receives the Pipeline and talks to the port open on JobManager (via REST service, maybe) to submit the Job
8. Job is executed
9. Where is UserProgram.main() at this point?

I think that even running UserProgram.main() in the JobManager is already too much. A JobManager should accept JobGraphs (or something) and execute them, nothing more. Running UserProgram.main() makes some things complicated or weird. For example, what happens when that UserProgram.main() creates a RemoteEnvironment and uses that? What happens when the user code calls execute() multiple times.

I think a good solution for the motivating use case is to

a) run BeamJobService as a separate service that talks to a running JobManager via REST for submitting jobs that it receives

b) Spawning a JobManager inside the BeamJobService, i.e. the BeamJobService is like the entry point in a per-job Kubernetes model. Something that the new Executor work ([1], [2]) will enable.

Any thoughts? I’m happy to jump on a call about this because these things are very tricky to figure out and I might be wrong.

Best,
Aljoscha

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
[2] https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d88e631

> On 6. Aug 2019, at 09:51, Till Rohrmann <[hidden email]> wrote:
>
> I think there was the idea to make the JobGraph a "public"/stable interface
> other projects can rely on at some point. If I remember correctly, then we
> wanted to define a proto buf definition for the JobGraph so that clients
> written in a different language can submit JobGraphs and we could extend
> the data structure. As far as I know, this effort hasn't been started yet
> and is still in the backlog (I think there doesn't exist a JIRA issue yet).
>
> The problem came up when discussing additions to the JobGraph because they
> need to be backwards compatible otherwise newer version of Flink would not
> be able to recover jobs. I think so far Flink provides backwards
> compatibility between different versions of the JobGraph. However, this is
> not officially guaranteed.
>
> Cheers,
> Till
>
> On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <[hidden email]> wrote:
>
>> It sounds like a request to change the interface Program into
>>
>> public interface Program {
>>  JobGraph getJobGraph(String... args);
>> }
>>
>> Also, given that JobGraph is said as internal interface or
>> cannot be relied on, we might introduce and use a
>> representation that allows for cross version compatibility.
>>
>>
>> Thomas Weise <[hidden email]> 于2019年8月6日周二 上午12:11写道:
>>
>>> If the goal is to keep job creation and job submission separate and we
>>> agree that there should be more flexibility for the job construction,
>> then
>>> JobGraph and friends should be stable API that the user can depend on. If
>>> that's the case, the path Chesnay pointed to may become viable.
>>>
>>> There was discussion in the past that JobGraph cannot be relied on WRT
>>> backward compatibility and I would expect that at some point we want to
>>> move to a representation that allows for cross version compatibility.
>> Beam
>>> is an example how this could be accomplished (with its pipeline proto).
>>>
>>> So if the Beam job server was able to produce the JobGraph, is there
>>> agreement that we should provide a mechanism that allows the program
>> entry
>>> point to return the JobGraph directly (without using the
>>> ExecutionEnvironment to build it)?
>>>
>>>
>>> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <[hidden email]> wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> If REST handler calls main(), the behavior inside main() is
>>>> unpredictable.
>>>>
>>>> Now the jar run handler extract the job graph and submit
>>>> it with the job id configured in REST request. If REST
>>>> handler calls main() we can hardly even know how much
>>>> jobs are executed.
>>>>
>>>> A new environment, as you said,
>>>> ExtractJobGraphAndSubmitToDispatcherEnvironment can be
>>>> added to satisfy your requirement. However, it is a bit
>>>> out of Flink scope. It might be better to write your own
>>>> REST handler.
>>>>
>>>> WebMonitorExtension is for extending REST handlers but
>>>> it seems also unable to customize...
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>>
>>>> Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:
>>>>
>>>>> Thanks for looking into this.
>>>>>
>>>>> I see the "Jar run handler" as function that takes few parameters and
>>>>> returns a job ID. I think it would be nice if the handler doesn't
>> hard
>>>> code
>>>>> the function. Perhaps this could be accomplished by pushing the code
>>> into
>>>>> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that
>>> the
>>>>> main method could also bypass if it has an alternative way to provide
>>> the
>>>>> jobId via a context variable?
>>>>>
>>>>> Zili: I looked at the client API proposal and left a few comments. I
>>>> think
>>>>> it is important to improve programmatic job submission. But it also
>>> seems
>>>>> orthogonal to how the jar run handler operates (i.e. these issues
>> could
>>>> be
>>>>> addressed independently).
>>>>>
>>>>> Chesnay: You are right that the Beam job sever could be hacked to
>>> extract
>>>>> job graph and other ingredients. This isn't desirable though because
>>>> these
>>>>> Flink internals should not be exposed downstream. But even if we went
>>>> down
>>>>> that route we would still need a way to let the jar run handler know
>> to
>>>>> just return the ID of an already submitted job vs. trying to submit
>> one
>>>>> from OptimizerPlanEnvironment.
>>>>>
>>>>> The intended sequence would be:
>>>>>
>>>>> REST client provides a launcher jar
>>>>> REST client "runs jar"
>>>>> REST handler calls main()
>>>>> main launches Beam job server, runs Beam pipeline construction code
>>>> against
>>>>> that job server
>>>>> job server uses RemoteEnvironment to submit real job
>>>>> main "returns job id"
>>>>> REST handler returns job id
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]>
>>> wrote:
>>>>>
>>>>>> By the way, currently Dispatcher implements RestfulGateway
>>>>>> and delegate resource request to ResourceManager. If we can,
>>>>>> semantically, let WebMonitor implement RestfulGateway,
>>>>>> and delegate job request to Dispatcher, resource request to
>>>>>> ResourceManager, it seems reasonable that when WebMonitor
>>>>>> receives a JarRun request, it spawns a process and run
>>>>>> the main method of the main class of that jar.
>>>>>>
>>>>>> Best,
>>>>>> tison.
>>>>>>
>>>>>>
>>>>>> Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
>>>>>>
>>>>>>> I don't think the `Program` interface could solve the problem.
>>>>>>>
>>>>>>> The launcher launches the job server which creates the job graph,
>>>>>>> submits it and keeps monitoring. Even if user program implement
>>>>>>> `Program` Flink still extracts the JobGraph from `getPlan` and
>>>>>>> submits it, instead of really execute codes in main method of
>>>>>>> user program, so that the launcher is not started.
>>>>>>>
>>>>>>> @Thomas,
>>>>>>>
>>>>>>> Here is an ongoing discussion on client refactoring[1] as Till
>>>>>>> mentioned. However, I'm afraid that with current jar run semantic,
>>>>>>> i.e., extract the job graph and submit it to the Dispatcher, it
>>> cannot
>>>>>>> fits your requirement. The problem is that REST API directly
>>>>>>> communicates with Dispatcher and thus it's strange to tell the
>>>>>>> Dispatcher "just run a program in a process".
>>>>>>>
>>>>>>> As you mentioned in the document, with CLI in session mode the
>>>>>>> whole program would be executed sequentially. I'll appreciate it
>>>>>>> if you can participant the thread on client refactor[1]. In the
>>>>>>> design document[2], we propose to provide rich interfaces for
>>>>>>> downstream projects integration. You can customize your CLI for
>>>>>>> executing your program arbitrarily. Any requirement or advise
>>>>>>> would be help.
>>>>>>>
>>>>>>> Best,
>>>>>>> tison.
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>
>>>>
>>>
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>> [2]
>>>>>>>
>>>>>
>>>>
>>>
>> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
>>>>>>>
>>>>>>>> Are you looking for something similar to the `Program` interface?
>>>> This
>>>>>>>> interface, even though it is a bit outdated and might get removed
>>> in
>>>>> the
>>>>>>>> future, offers a `getPlan` method which is called in order to
>>>> generate
>>>>>>>> the
>>>>>>>> `JobGraph`. In the client refactoring discussion thread it is
>>>> currently
>>>>>>>> being discussed what to do with this interface.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
>>>> [hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Couldn't the beam job server use the same work-around we're
>> using
>>>> in
>>>>>>>> the
>>>>>>>>> JarRunHandler to get access to the JobGraph?
>>>>>>>>>
>>>>>>>>> On 26/07/2019 17:38, Thomas Weise wrote:
>>>>>>>>>> Hi Till,
>>>>>>>>>>
>>>>>>>>>> Thanks for taking a look!
>>>>>>>>>>
>>>>>>>>>> The Beam job server does not currently have the ability to
>> just
>>>>>>>> output
>>>>>>>>> the
>>>>>>>>>> job graph (and related artifacts) that could then be used
>> with
>>>> the
>>>>>>>>>> JobSubmitHandler. It is itself using
>>> StreamExecutionEnvironment,
>>>>>>>> which in
>>>>>>>>>> turn will lead to a REST API submission.
>>>>>>>>>>
>>>>>>>>>> Here I'm looking at what happens before the Beam job server
>>> gets
>>>>>>>>> involved:
>>>>>>>>>> the interaction of the k8s operator with the Flink
>> deployment.
>>>> The
>>>>>>>> jar
>>>>>>>>> run
>>>>>>>>>> endpoint (ignoring the current handler implementation) is
>>> generic
>>>>> and
>>>>>>>>>> pretty much exactly matches what we would need for a uniform
>>>> entry
>>>>>>>> point.
>>>>>>>>>> It's just that in the Beam case the jar file would itself be
>> a
>>>>>>>> "launcher"
>>>>>>>>>> that doesn't provide the job graph itself, but the
>> dependencies
>>>> and
>>>>>>>>>> mechanism to invoke the actual client.
>>>>>>>>>>
>>>>>>>>>> I could accomplish what I'm looking for by creating a
>> separate
>>>> REST
>>>>>>>>>> endpoint that looks almost the same. But I would prefer to
>>> reuse
>>>>> the
>>>>>>>>> Flink
>>>>>>>>>> REST API interaction that is already implemented for the
>> Flink
>>>> Java
>>>>>>>> jobs
>>>>>>>>> to
>>>>>>>>>> reduce the complexity of the deployment.
>>>>>>>>>>
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
>>>>> [hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> quick question: Why do you wanna use the JarRunHandler? If
>>>> another
>>>>>>>>> process
>>>>>>>>>>> is building the JobGraph, then one could use the
>>>> JobSubmitHandler
>>>>>>>> which
>>>>>>>>>>> expects a JobGraph and then starts executing it.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <
>> [hidden email]>
>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> While considering different options to launch Beam jobs
>>> through
>>>>> the
>>>>>>>>> Flink
>>>>>>>>>>>> REST API, I noticed that the implementation of
>> JarRunHandler
>>>>> places
>>>>>>>>>>> quite a
>>>>>>>>>>>> few restrictions on how the entry point shall construct a
>>> Flink
>>>>>>>> job, by
>>>>>>>>>>>> extracting and manipulating the job graph.
>>>>>>>>>>>>
>>>>>>>>>>>> That's normally not a problem for Flink Java programs, but
>> in
>>>> the
>>>>>>>>>>> scenario
>>>>>>>>>>>> I'm looking at, the job graph would be constructed by a
>>>> different
>>>>>>>>> process
>>>>>>>>>>>> and isn't available to the REST handler. Instead, I would
>>> like
>>>> to
>>>>>>>> be
>>>>>>>>> able
>>>>>>>>>>>> to just respond with the job ID of the already launched
>> job.
>>>>>>>>>>>>
>>>>>>>>>>>> For context, please see:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>
>>>
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>>>>>>>>>>>> The current JarRunHandler code is here:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>
>>>
>> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
>>>>>>>>>>>> It would be nice if there was an option to delegate the
>>>>>>>> responsibility
>>>>>>>>>>> for
>>>>>>>>>>>> job submission to the user code / entry point. That would
>> be
>>>>>>>> useful for
>>>>>>>>>>>> Beam and other frameworks built on top of Flink that
>>>> dynamically
>>>>>>>>> create a
>>>>>>>>>>>> job graph from a different representation.
>>>>>>>>>>>>
>>>>>>>>>>>> Possible ways to get there:
>>>>>>>>>>>>
>>>>>>>>>>>> * an interface that the main class can be implement end
>> when
>>>>>>>> present,
>>>>>>>>> the
>>>>>>>>>>>> jar run handler calls instead of main.
>>>>>>>>>>>>
>>>>>>>>>>>> * an annotated method
>>>>>>>>>>>>
>>>>>>>>>>>> Either way query parameters like savepoint path and
>>> parallelism
>>>>>>>> would
>>>>>>>>> be
>>>>>>>>>>>> forwarded to the user code and the result would be the ID
>> of
>>>> the
>>>>>>>>> launched
>>>>>>>>>>>> job.
>>>>>>>>>>>>
>>>>>>>>>>>> Thougths?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Thomas Weise
Hi Aljoscha,

Thanks for taking a look!

Multiple options to approach the submission part for the Beam use case are
discussed in [1].

I'm actually now working on a different approach that creates a Flink jar
at build time.

To the point of whether UserProgram.main() should be called in JobManager
or not: I believe it is important to provide the user an option to submit a
job without running a Java client. Today that necessitates that the Java
entry point will be called on the JM.

The executor related work as such won't change that. However, it would be
nice to have a separate mechanism that allows the user to specify an entry
point that produces the FlinkPipeline/plan, w/o having to "execute" through
the kind of hacky context environment.

Thanks,
Thomas

[1]
https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d




On Thu, Sep 26, 2019 at 7:15 AM Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
>
> Regarding the original proposal: I don’t think spawning another process
> inside the JarHandler.runJar() is the way to go here. Looking at the bigger
> picture, the proposal would get us to roughly this situation:
>
> 1. Spawn Kubernetes containers (JobManager and TaskManagers)
> 2. User does a REST call to JobManager.runJar() to submit the user job
> 3. JobManager.runJar() opens a port that waits for job submission
> 4. JobMananger.runJar() invokes UserProgram.main()
> 5. UserProgram.main() launches a process (BeamJobService) that opens a
> port to wait for a Python process to connect to it
> 6. UserProgram.main() launches another process (the Python code, or any
> language, really) that connects to BeamJobService to submit the Pipeline
> 7. BeamJobService receives the Pipeline and talks to the port open on
> JobManager (via REST service, maybe) to submit the Job
> 8. Job is executed
> 9. Where is UserProgram.main() at this point?
>
> I think that even running UserProgram.main() in the JobManager is already
> too much. A JobManager should accept JobGraphs (or something) and execute
> them, nothing more. Running UserProgram.main() makes some things
> complicated or weird. For example, what happens when that
> UserProgram.main() creates a RemoteEnvironment and uses that? What happens
> when the user code calls execute() multiple times.
>
> I think a good solution for the motivating use case is to
>
> a) run BeamJobService as a separate service that talks to a running
> JobManager via REST for submitting jobs that it receives
>
> b) Spawning a JobManager inside the BeamJobService, i.e. the
> BeamJobService is like the entry point in a per-job Kubernetes model.
> Something that the new Executor work ([1], [2]) will enable.
>
> Any thoughts? I’m happy to jump on a call about this because these things
> are very tricky to figure out and I might be wrong.
>
> Best,
> Aljoscha
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
> [2]
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d88e631
>
> > On 6. Aug 2019, at 09:51, Till Rohrmann <[hidden email]> wrote:
> >
> > I think there was the idea to make the JobGraph a "public"/stable
> interface
> > other projects can rely on at some point. If I remember correctly, then
> we
> > wanted to define a proto buf definition for the JobGraph so that clients
> > written in a different language can submit JobGraphs and we could extend
> > the data structure. As far as I know, this effort hasn't been started yet
> > and is still in the backlog (I think there doesn't exist a JIRA issue
> yet).
> >
> > The problem came up when discussing additions to the JobGraph because
> they
> > need to be backwards compatible otherwise newer version of Flink would
> not
> > be able to recover jobs. I think so far Flink provides backwards
> > compatibility between different versions of the JobGraph. However, this
> is
> > not officially guaranteed.
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <[hidden email]> wrote:
> >
> >> It sounds like a request to change the interface Program into
> >>
> >> public interface Program {
> >>  JobGraph getJobGraph(String... args);
> >> }
> >>
> >> Also, given that JobGraph is said as internal interface or
> >> cannot be relied on, we might introduce and use a
> >> representation that allows for cross version compatibility.
> >>
> >>
> >> Thomas Weise <[hidden email]> 于2019年8月6日周二 上午12:11写道:
> >>
> >>> If the goal is to keep job creation and job submission separate and we
> >>> agree that there should be more flexibility for the job construction,
> >> then
> >>> JobGraph and friends should be stable API that the user can depend on.
> If
> >>> that's the case, the path Chesnay pointed to may become viable.
> >>>
> >>> There was discussion in the past that JobGraph cannot be relied on WRT
> >>> backward compatibility and I would expect that at some point we want to
> >>> move to a representation that allows for cross version compatibility.
> >> Beam
> >>> is an example how this could be accomplished (with its pipeline proto).
> >>>
> >>> So if the Beam job server was able to produce the JobGraph, is there
> >>> agreement that we should provide a mechanism that allows the program
> >> entry
> >>> point to return the JobGraph directly (without using the
> >>> ExecutionEnvironment to build it)?
> >>>
> >>>
> >>> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <[hidden email]> wrote:
> >>>
> >>>> Hi Thomas,
> >>>>
> >>>> If REST handler calls main(), the behavior inside main() is
> >>>> unpredictable.
> >>>>
> >>>> Now the jar run handler extract the job graph and submit
> >>>> it with the job id configured in REST request. If REST
> >>>> handler calls main() we can hardly even know how much
> >>>> jobs are executed.
> >>>>
> >>>> A new environment, as you said,
> >>>> ExtractJobGraphAndSubmitToDispatcherEnvironment can be
> >>>> added to satisfy your requirement. However, it is a bit
> >>>> out of Flink scope. It might be better to write your own
> >>>> REST handler.
> >>>>
> >>>> WebMonitorExtension is for extending REST handlers but
> >>>> it seems also unable to customize...
> >>>>
> >>>> Best,
> >>>> tison.
> >>>>
> >>>>
> >>>> Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:
> >>>>
> >>>>> Thanks for looking into this.
> >>>>>
> >>>>> I see the "Jar run handler" as function that takes few parameters and
> >>>>> returns a job ID. I think it would be nice if the handler doesn't
> >> hard
> >>>> code
> >>>>> the function. Perhaps this could be accomplished by pushing the code
> >>> into
> >>>>> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that
> >>> the
> >>>>> main method could also bypass if it has an alternative way to provide
> >>> the
> >>>>> jobId via a context variable?
> >>>>>
> >>>>> Zili: I looked at the client API proposal and left a few comments. I
> >>>> think
> >>>>> it is important to improve programmatic job submission. But it also
> >>> seems
> >>>>> orthogonal to how the jar run handler operates (i.e. these issues
> >> could
> >>>> be
> >>>>> addressed independently).
> >>>>>
> >>>>> Chesnay: You are right that the Beam job sever could be hacked to
> >>> extract
> >>>>> job graph and other ingredients. This isn't desirable though because
> >>>> these
> >>>>> Flink internals should not be exposed downstream. But even if we went
> >>>> down
> >>>>> that route we would still need a way to let the jar run handler know
> >> to
> >>>>> just return the ID of an already submitted job vs. trying to submit
> >> one
> >>>>> from OptimizerPlanEnvironment.
> >>>>>
> >>>>> The intended sequence would be:
> >>>>>
> >>>>> REST client provides a launcher jar
> >>>>> REST client "runs jar"
> >>>>> REST handler calls main()
> >>>>> main launches Beam job server, runs Beam pipeline construction code
> >>>> against
> >>>>> that job server
> >>>>> job server uses RemoteEnvironment to submit real job
> >>>>> main "returns job id"
> >>>>> REST handler returns job id
> >>>>>
> >>>>> Thomas
> >>>>>
> >>>>>
> >>>>> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]>
> >>> wrote:
> >>>>>
> >>>>>> By the way, currently Dispatcher implements RestfulGateway
> >>>>>> and delegate resource request to ResourceManager. If we can,
> >>>>>> semantically, let WebMonitor implement RestfulGateway,
> >>>>>> and delegate job request to Dispatcher, resource request to
> >>>>>> ResourceManager, it seems reasonable that when WebMonitor
> >>>>>> receives a JarRun request, it spawns a process and run
> >>>>>> the main method of the main class of that jar.
> >>>>>>
> >>>>>> Best,
> >>>>>> tison.
> >>>>>>
> >>>>>>
> >>>>>> Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
> >>>>>>
> >>>>>>> I don't think the `Program` interface could solve the problem.
> >>>>>>>
> >>>>>>> The launcher launches the job server which creates the job graph,
> >>>>>>> submits it and keeps monitoring. Even if user program implement
> >>>>>>> `Program` Flink still extracts the JobGraph from `getPlan` and
> >>>>>>> submits it, instead of really execute codes in main method of
> >>>>>>> user program, so that the launcher is not started.
> >>>>>>>
> >>>>>>> @Thomas,
> >>>>>>>
> >>>>>>> Here is an ongoing discussion on client refactoring[1] as Till
> >>>>>>> mentioned. However, I'm afraid that with current jar run semantic,
> >>>>>>> i.e., extract the job graph and submit it to the Dispatcher, it
> >>> cannot
> >>>>>>> fits your requirement. The problem is that REST API directly
> >>>>>>> communicates with Dispatcher and thus it's strange to tell the
> >>>>>>> Dispatcher "just run a program in a process".
> >>>>>>>
> >>>>>>> As you mentioned in the document, with CLI in session mode the
> >>>>>>> whole program would be executed sequentially. I'll appreciate it
> >>>>>>> if you can participant the thread on client refactor[1]. In the
> >>>>>>> design document[2], we propose to provide rich interfaces for
> >>>>>>> downstream projects integration. You can customize your CLI for
> >>>>>>> executing your program arbitrarily. Any requirement or advise
> >>>>>>> would be help.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> tison.
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >>>>>>> [2]
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
> >>>>>>>
> >>>>>>>> Are you looking for something similar to the `Program` interface?
> >>>> This
> >>>>>>>> interface, even though it is a bit outdated and might get removed
> >>> in
> >>>>> the
> >>>>>>>> future, offers a `getPlan` method which is called in order to
> >>>> generate
> >>>>>>>> the
> >>>>>>>> `JobGraph`. In the client refactoring discussion thread it is
> >>>> currently
> >>>>>>>> being discussed what to do with this interface.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Till
> >>>>>>>>
> >>>>>>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
> >>>> [hidden email]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Couldn't the beam job server use the same work-around we're
> >> using
> >>>> in
> >>>>>>>> the
> >>>>>>>>> JarRunHandler to get access to the JobGraph?
> >>>>>>>>>
> >>>>>>>>> On 26/07/2019 17:38, Thomas Weise wrote:
> >>>>>>>>>> Hi Till,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for taking a look!
> >>>>>>>>>>
> >>>>>>>>>> The Beam job server does not currently have the ability to
> >> just
> >>>>>>>> output
> >>>>>>>>> the
> >>>>>>>>>> job graph (and related artifacts) that could then be used
> >> with
> >>>> the
> >>>>>>>>>> JobSubmitHandler. It is itself using
> >>> StreamExecutionEnvironment,
> >>>>>>>> which in
> >>>>>>>>>> turn will lead to a REST API submission.
> >>>>>>>>>>
> >>>>>>>>>> Here I'm looking at what happens before the Beam job server
> >>> gets
> >>>>>>>>> involved:
> >>>>>>>>>> the interaction of the k8s operator with the Flink
> >> deployment.
> >>>> The
> >>>>>>>> jar
> >>>>>>>>> run
> >>>>>>>>>> endpoint (ignoring the current handler implementation) is
> >>> generic
> >>>>> and
> >>>>>>>>>> pretty much exactly matches what we would need for a uniform
> >>>> entry
> >>>>>>>> point.
> >>>>>>>>>> It's just that in the Beam case the jar file would itself be
> >> a
> >>>>>>>> "launcher"
> >>>>>>>>>> that doesn't provide the job graph itself, but the
> >> dependencies
> >>>> and
> >>>>>>>>>> mechanism to invoke the actual client.
> >>>>>>>>>>
> >>>>>>>>>> I could accomplish what I'm looking for by creating a
> >> separate
> >>>> REST
> >>>>>>>>>> endpoint that looks almost the same. But I would prefer to
> >>> reuse
> >>>>> the
> >>>>>>>>> Flink
> >>>>>>>>>> REST API interaction that is already implemented for the
> >> Flink
> >>>> Java
> >>>>>>>> jobs
> >>>>>>>>> to
> >>>>>>>>>> reduce the complexity of the deployment.
> >>>>>>>>>>
> >>>>>>>>>> Thomas
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
> >>>>> [hidden email]>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>
> >>>>>>>>>>> quick question: Why do you wanna use the JarRunHandler? If
> >>>> another
> >>>>>>>>> process
> >>>>>>>>>>> is building the JobGraph, then one could use the
> >>>> JobSubmitHandler
> >>>>>>>> which
> >>>>>>>>>>> expects a JobGraph and then starts executing it.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Till
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <
> >> [hidden email]>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>>
> >>>>>>>>>>>> While considering different options to launch Beam jobs
> >>> through
> >>>>> the
> >>>>>>>>> Flink
> >>>>>>>>>>>> REST API, I noticed that the implementation of
> >> JarRunHandler
> >>>>> places
> >>>>>>>>>>> quite a
> >>>>>>>>>>>> few restrictions on how the entry point shall construct a
> >>> Flink
> >>>>>>>> job, by
> >>>>>>>>>>>> extracting and manipulating the job graph.
> >>>>>>>>>>>>
> >>>>>>>>>>>> That's normally not a problem for Flink Java programs, but
> >> in
> >>>> the
> >>>>>>>>>>> scenario
> >>>>>>>>>>>> I'm looking at, the job graph would be constructed by a
> >>>> different
> >>>>>>>>> process
> >>>>>>>>>>>> and isn't available to the REST handler. Instead, I would
> >>> like
> >>>> to
> >>>>>>>> be
> >>>>>>>>> able
> >>>>>>>>>>>> to just respond with the job ID of the already launched
> >> job.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For context, please see:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> >>>>>>>>>>>> The current JarRunHandler code is here:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> >>>>>>>>>>>> It would be nice if there was an option to delegate the
> >>>>>>>> responsibility
> >>>>>>>>>>> for
> >>>>>>>>>>>> job submission to the user code / entry point. That would
> >> be
> >>>>>>>> useful for
> >>>>>>>>>>>> Beam and other frameworks built on top of Flink that
> >>>> dynamically
> >>>>>>>>> create a
> >>>>>>>>>>>> job graph from a different representation.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Possible ways to get there:
> >>>>>>>>>>>>
> >>>>>>>>>>>> * an interface that the main class can be implement end
> >> when
> >>>>>>>> present,
> >>>>>>>>> the
> >>>>>>>>>>>> jar run handler calls instead of main.
> >>>>>>>>>>>>
> >>>>>>>>>>>> * an annotated method
> >>>>>>>>>>>>
> >>>>>>>>>>>> Either way query parameters like savepoint path and
> >>> parallelism
> >>>>>>>> would
> >>>>>>>>> be
> >>>>>>>>>>>> forwarded to the user code and the result would be the ID
> >> of
> >>>> the
> >>>>>>>>> launched
> >>>>>>>>>>>> job.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thougths?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Thomas
> >>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Aljoscha Krettek-2
Hi Thomas,

Ok, I agree with the necessity of running a job without a Java Client. FLIP-73 aims at introducing a Pipeline (or FlinkPipeline) interface that is the common interface of StreamGraph and Plan. Maybe we could reintroduce something like Program that returns a Pipeline for this purpose.

Best,
Aljoscha

> On 26. Sep 2019, at 19:30, Thomas Weise <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> Thanks for taking a look!
>
> Multiple options to approach the submission part for the Beam use case are
> discussed in [1].
>
> I'm actually now working on a different approach that creates a Flink jar
> at build time.
>
> To the point of whether UserProgram.main() should be called in JobManager
> or not: I believe it is important to provide the user an option to submit a
> job without running a Java client. Today that necessitates that the Java
> entry point will be called on the JM.
>
> The executor related work as such won't change that. However, it would be
> nice to have a separate mechanism that allows the user to specify an entry
> point that produces the FlinkPipeline/plan, w/o having to "execute" through
> the kind of hacky context environment.
>
> Thanks,
> Thomas
>
> [1]
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>
>
>
>
> On Thu, Sep 26, 2019 at 7:15 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Hi,
>>
>> Regarding the original proposal: I don’t think spawning another process
>> inside the JarHandler.runJar() is the way to go here. Looking at the bigger
>> picture, the proposal would get us to roughly this situation:
>>
>> 1. Spawn Kubernetes containers (JobManager and TaskManagers)
>> 2. User does a REST call to JobManager.runJar() to submit the user job
>> 3. JobManager.runJar() opens a port that waits for job submission
>> 4. JobMananger.runJar() invokes UserProgram.main()
>> 5. UserProgram.main() launches a process (BeamJobService) that opens a
>> port to wait for a Python process to connect to it
>> 6. UserProgram.main() launches another process (the Python code, or any
>> language, really) that connects to BeamJobService to submit the Pipeline
>> 7. BeamJobService receives the Pipeline and talks to the port open on
>> JobManager (via REST service, maybe) to submit the Job
>> 8. Job is executed
>> 9. Where is UserProgram.main() at this point?
>>
>> I think that even running UserProgram.main() in the JobManager is already
>> too much. A JobManager should accept JobGraphs (or something) and execute
>> them, nothing more. Running UserProgram.main() makes some things
>> complicated or weird. For example, what happens when that
>> UserProgram.main() creates a RemoteEnvironment and uses that? What happens
>> when the user code calls execute() multiple times.
>>
>> I think a good solution for the motivating use case is to
>>
>> a) run BeamJobService as a separate service that talks to a running
>> JobManager via REST for submitting jobs that it receives
>>
>> b) Spawning a JobManager inside the BeamJobService, i.e. the
>> BeamJobService is like the entry point in a per-job Kubernetes model.
>> Something that the new Executor work ([1], [2]) will enable.
>>
>> Any thoughts? I’m happy to jump on a call about this because these things
>> are very tricky to figure out and I might be wrong.
>>
>> Best,
>> Aljoscha
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
>> [2]
>> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d88e631
>>
>>> On 6. Aug 2019, at 09:51, Till Rohrmann <[hidden email]> wrote:
>>>
>>> I think there was the idea to make the JobGraph a "public"/stable
>> interface
>>> other projects can rely on at some point. If I remember correctly, then
>> we
>>> wanted to define a proto buf definition for the JobGraph so that clients
>>> written in a different language can submit JobGraphs and we could extend
>>> the data structure. As far as I know, this effort hasn't been started yet
>>> and is still in the backlog (I think there doesn't exist a JIRA issue
>> yet).
>>>
>>> The problem came up when discussing additions to the JobGraph because
>> they
>>> need to be backwards compatible otherwise newer version of Flink would
>> not
>>> be able to recover jobs. I think so far Flink provides backwards
>>> compatibility between different versions of the JobGraph. However, this
>> is
>>> not officially guaranteed.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <[hidden email]> wrote:
>>>
>>>> It sounds like a request to change the interface Program into
>>>>
>>>> public interface Program {
>>>> JobGraph getJobGraph(String... args);
>>>> }
>>>>
>>>> Also, given that JobGraph is said as internal interface or
>>>> cannot be relied on, we might introduce and use a
>>>> representation that allows for cross version compatibility.
>>>>
>>>>
>>>> Thomas Weise <[hidden email]> 于2019年8月6日周二 上午12:11写道:
>>>>
>>>>> If the goal is to keep job creation and job submission separate and we
>>>>> agree that there should be more flexibility for the job construction,
>>>> then
>>>>> JobGraph and friends should be stable API that the user can depend on.
>> If
>>>>> that's the case, the path Chesnay pointed to may become viable.
>>>>>
>>>>> There was discussion in the past that JobGraph cannot be relied on WRT
>>>>> backward compatibility and I would expect that at some point we want to
>>>>> move to a representation that allows for cross version compatibility.
>>>> Beam
>>>>> is an example how this could be accomplished (with its pipeline proto).
>>>>>
>>>>> So if the Beam job server was able to produce the JobGraph, is there
>>>>> agreement that we should provide a mechanism that allows the program
>>>> entry
>>>>> point to return the JobGraph directly (without using the
>>>>> ExecutionEnvironment to build it)?
>>>>>
>>>>>
>>>>> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <[hidden email]> wrote:
>>>>>
>>>>>> Hi Thomas,
>>>>>>
>>>>>> If REST handler calls main(), the behavior inside main() is
>>>>>> unpredictable.
>>>>>>
>>>>>> Now the jar run handler extract the job graph and submit
>>>>>> it with the job id configured in REST request. If REST
>>>>>> handler calls main() we can hardly even know how much
>>>>>> jobs are executed.
>>>>>>
>>>>>> A new environment, as you said,
>>>>>> ExtractJobGraphAndSubmitToDispatcherEnvironment can be
>>>>>> added to satisfy your requirement. However, it is a bit
>>>>>> out of Flink scope. It might be better to write your own
>>>>>> REST handler.
>>>>>>
>>>>>> WebMonitorExtension is for extending REST handlers but
>>>>>> it seems also unable to customize...
>>>>>>
>>>>>> Best,
>>>>>> tison.
>>>>>>
>>>>>>
>>>>>> Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:
>>>>>>
>>>>>>> Thanks for looking into this.
>>>>>>>
>>>>>>> I see the "Jar run handler" as function that takes few parameters and
>>>>>>> returns a job ID. I think it would be nice if the handler doesn't
>>>> hard
>>>>>> code
>>>>>>> the function. Perhaps this could be accomplished by pushing the code
>>>>> into
>>>>>>> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that
>>>>> the
>>>>>>> main method could also bypass if it has an alternative way to provide
>>>>> the
>>>>>>> jobId via a context variable?
>>>>>>>
>>>>>>> Zili: I looked at the client API proposal and left a few comments. I
>>>>>> think
>>>>>>> it is important to improve programmatic job submission. But it also
>>>>> seems
>>>>>>> orthogonal to how the jar run handler operates (i.e. these issues
>>>> could
>>>>>> be
>>>>>>> addressed independently).
>>>>>>>
>>>>>>> Chesnay: You are right that the Beam job sever could be hacked to
>>>>> extract
>>>>>>> job graph and other ingredients. This isn't desirable though because
>>>>>> these
>>>>>>> Flink internals should not be exposed downstream. But even if we went
>>>>>> down
>>>>>>> that route we would still need a way to let the jar run handler know
>>>> to
>>>>>>> just return the ID of an already submitted job vs. trying to submit
>>>> one
>>>>>>> from OptimizerPlanEnvironment.
>>>>>>>
>>>>>>> The intended sequence would be:
>>>>>>>
>>>>>>> REST client provides a launcher jar
>>>>>>> REST client "runs jar"
>>>>>>> REST handler calls main()
>>>>>>> main launches Beam job server, runs Beam pipeline construction code
>>>>>> against
>>>>>>> that job server
>>>>>>> job server uses RemoteEnvironment to submit real job
>>>>>>> main "returns job id"
>>>>>>> REST handler returns job id
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]>
>>>>> wrote:
>>>>>>>
>>>>>>>> By the way, currently Dispatcher implements RestfulGateway
>>>>>>>> and delegate resource request to ResourceManager. If we can,
>>>>>>>> semantically, let WebMonitor implement RestfulGateway,
>>>>>>>> and delegate job request to Dispatcher, resource request to
>>>>>>>> ResourceManager, it seems reasonable that when WebMonitor
>>>>>>>> receives a JarRun request, it spawns a process and run
>>>>>>>> the main method of the main class of that jar.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> tison.
>>>>>>>>
>>>>>>>>
>>>>>>>> Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
>>>>>>>>
>>>>>>>>> I don't think the `Program` interface could solve the problem.
>>>>>>>>>
>>>>>>>>> The launcher launches the job server which creates the job graph,
>>>>>>>>> submits it and keeps monitoring. Even if user program implement
>>>>>>>>> `Program` Flink still extracts the JobGraph from `getPlan` and
>>>>>>>>> submits it, instead of really execute codes in main method of
>>>>>>>>> user program, so that the launcher is not started.
>>>>>>>>>
>>>>>>>>> @Thomas,
>>>>>>>>>
>>>>>>>>> Here is an ongoing discussion on client refactoring[1] as Till
>>>>>>>>> mentioned. However, I'm afraid that with current jar run semantic,
>>>>>>>>> i.e., extract the job graph and submit it to the Dispatcher, it
>>>>> cannot
>>>>>>>>> fits your requirement. The problem is that REST API directly
>>>>>>>>> communicates with Dispatcher and thus it's strange to tell the
>>>>>>>>> Dispatcher "just run a program in a process".
>>>>>>>>>
>>>>>>>>> As you mentioned in the document, with CLI in session mode the
>>>>>>>>> whole program would be executed sequentially. I'll appreciate it
>>>>>>>>> if you can participant the thread on client refactor[1]. In the
>>>>>>>>> design document[2], we propose to provide rich interfaces for
>>>>>>>>> downstream projects integration. You can customize your CLI for
>>>>>>>>> executing your program arbitrarily. Any requirement or advise
>>>>>>>>> would be help.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> tison.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>>>> [2]
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
>>>>>>>>>
>>>>>>>>>> Are you looking for something similar to the `Program` interface?
>>>>>> This
>>>>>>>>>> interface, even though it is a bit outdated and might get removed
>>>>> in
>>>>>>> the
>>>>>>>>>> future, offers a `getPlan` method which is called in order to
>>>>>> generate
>>>>>>>>>> the
>>>>>>>>>> `JobGraph`. In the client refactoring discussion thread it is
>>>>>> currently
>>>>>>>>>> being discussed what to do with this interface.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
>>>>>> [hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Couldn't the beam job server use the same work-around we're
>>>> using
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>> JarRunHandler to get access to the JobGraph?
>>>>>>>>>>>
>>>>>>>>>>> On 26/07/2019 17:38, Thomas Weise wrote:
>>>>>>>>>>>> Hi Till,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for taking a look!
>>>>>>>>>>>>
>>>>>>>>>>>> The Beam job server does not currently have the ability to
>>>> just
>>>>>>>>>> output
>>>>>>>>>>> the
>>>>>>>>>>>> job graph (and related artifacts) that could then be used
>>>> with
>>>>>> the
>>>>>>>>>>>> JobSubmitHandler. It is itself using
>>>>> StreamExecutionEnvironment,
>>>>>>>>>> which in
>>>>>>>>>>>> turn will lead to a REST API submission.
>>>>>>>>>>>>
>>>>>>>>>>>> Here I'm looking at what happens before the Beam job server
>>>>> gets
>>>>>>>>>>> involved:
>>>>>>>>>>>> the interaction of the k8s operator with the Flink
>>>> deployment.
>>>>>> The
>>>>>>>>>> jar
>>>>>>>>>>> run
>>>>>>>>>>>> endpoint (ignoring the current handler implementation) is
>>>>> generic
>>>>>>> and
>>>>>>>>>>>> pretty much exactly matches what we would need for a uniform
>>>>>> entry
>>>>>>>>>> point.
>>>>>>>>>>>> It's just that in the Beam case the jar file would itself be
>>>> a
>>>>>>>>>> "launcher"
>>>>>>>>>>>> that doesn't provide the job graph itself, but the
>>>> dependencies
>>>>>> and
>>>>>>>>>>>> mechanism to invoke the actual client.
>>>>>>>>>>>>
>>>>>>>>>>>> I could accomplish what I'm looking for by creating a
>>>> separate
>>>>>> REST
>>>>>>>>>>>> endpoint that looks almost the same. But I would prefer to
>>>>> reuse
>>>>>>> the
>>>>>>>>>>> Flink
>>>>>>>>>>>> REST API interaction that is already implemented for the
>>>> Flink
>>>>>> Java
>>>>>>>>>> jobs
>>>>>>>>>>> to
>>>>>>>>>>>> reduce the complexity of the deployment.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
>>>>>>> [hidden email]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> quick question: Why do you wanna use the JarRunHandler? If
>>>>>> another
>>>>>>>>>>> process
>>>>>>>>>>>>> is building the JobGraph, then one could use the
>>>>>> JobSubmitHandler
>>>>>>>>>> which
>>>>>>>>>>>>> expects a JobGraph and then starts executing it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Till
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <
>>>> [hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> While considering different options to launch Beam jobs
>>>>> through
>>>>>>> the
>>>>>>>>>>> Flink
>>>>>>>>>>>>>> REST API, I noticed that the implementation of
>>>> JarRunHandler
>>>>>>> places
>>>>>>>>>>>>> quite a
>>>>>>>>>>>>>> few restrictions on how the entry point shall construct a
>>>>> Flink
>>>>>>>>>> job, by
>>>>>>>>>>>>>> extracting and manipulating the job graph.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That's normally not a problem for Flink Java programs, but
>>>> in
>>>>>> the
>>>>>>>>>>>>> scenario
>>>>>>>>>>>>>> I'm looking at, the job graph would be constructed by a
>>>>>> different
>>>>>>>>>>> process
>>>>>>>>>>>>>> and isn't available to the REST handler. Instead, I would
>>>>> like
>>>>>> to
>>>>>>>>>> be
>>>>>>>>>>> able
>>>>>>>>>>>>>> to just respond with the job ID of the already launched
>>>> job.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For context, please see:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>>>>>>>>>>>>>> The current JarRunHandler code is here:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
>>>>>>>>>>>>>> It would be nice if there was an option to delegate the
>>>>>>>>>> responsibility
>>>>>>>>>>>>> for
>>>>>>>>>>>>>> job submission to the user code / entry point. That would
>>>> be
>>>>>>>>>> useful for
>>>>>>>>>>>>>> Beam and other frameworks built on top of Flink that
>>>>>> dynamically
>>>>>>>>>>> create a
>>>>>>>>>>>>>> job graph from a different representation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Possible ways to get there:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * an interface that the main class can be implement end
>>>> when
>>>>>>>>>> present,
>>>>>>>>>>> the
>>>>>>>>>>>>>> jar run handler calls instead of main.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * an annotated method
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Either way query parameters like savepoint path and
>>>>> parallelism
>>>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>>>>> forwarded to the user code and the result would be the ID
>>>> of
>>>>>> the
>>>>>>>>>>> launched
>>>>>>>>>>>>>> job.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thougths?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

tison
Aljoscha & Thomas,

The Program way won't work without exposing pipeline compiler supports
because otherwise there
is no way user creates a Pipeline by himself.

I am not sure what it means above "running a job without a Java Client", if
it means the server/cluster
is Dispatcher, then the other language of client should know JobGraph level
concept too.

Hi Thomas, I'll highly appreciate it if you can, forget interface audience
issue from Flink for a while,
describe what Beam exactly want to bridge between

- a program written by Beam's user, describing Beam Pipeline and execute
Pipeline#run

and

- a flink running job

For example,

When in Beam scope, on Pipeline#run executed, Beam create a Flink <what>
and submit to <where>
by <what>.

If with FLIP-73 Pipeline is a user-facing Flink job description I would
suggest expose Executor and a
method to get Pipeline from Environment so that Beam create a Flink
Pipeline and deploy the job
by Executor.

Best,
tison.


Aljoscha Krettek <[hidden email]> 于2019年9月27日周五 下午4:45写道:

> Hi Thomas,
>
> Ok, I agree with the necessity of running a job without a Java Client.
> FLIP-73 aims at introducing a Pipeline (or FlinkPipeline) interface that is
> the common interface of StreamGraph and Plan. Maybe we could reintroduce
> something like Program that returns a Pipeline for this purpose.
>
> Best,
> Aljoscha
>
> > On 26. Sep 2019, at 19:30, Thomas Weise <[hidden email]> wrote:
> >
> > Hi Aljoscha,
> >
> > Thanks for taking a look!
> >
> > Multiple options to approach the submission part for the Beam use case
> are
> > discussed in [1].
> >
> > I'm actually now working on a different approach that creates a Flink jar
> > at build time.
> >
> > To the point of whether UserProgram.main() should be called in JobManager
> > or not: I believe it is important to provide the user an option to
> submit a
> > job without running a Java client. Today that necessitates that the Java
> > entry point will be called on the JM.
> >
> > The executor related work as such won't change that. However, it would be
> > nice to have a separate mechanism that allows the user to specify an
> entry
> > point that produces the FlinkPipeline/plan, w/o having to "execute"
> through
> > the kind of hacky context environment.
> >
> > Thanks,
> > Thomas
> >
> > [1]
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> >
> >
> >
> >
> > On Thu, Sep 26, 2019 at 7:15 AM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> >> Hi,
> >>
> >> Regarding the original proposal: I don’t think spawning another process
> >> inside the JarHandler.runJar() is the way to go here. Looking at the
> bigger
> >> picture, the proposal would get us to roughly this situation:
> >>
> >> 1. Spawn Kubernetes containers (JobManager and TaskManagers)
> >> 2. User does a REST call to JobManager.runJar() to submit the user job
> >> 3. JobManager.runJar() opens a port that waits for job submission
> >> 4. JobMananger.runJar() invokes UserProgram.main()
> >> 5. UserProgram.main() launches a process (BeamJobService) that opens a
> >> port to wait for a Python process to connect to it
> >> 6. UserProgram.main() launches another process (the Python code, or any
> >> language, really) that connects to BeamJobService to submit the Pipeline
> >> 7. BeamJobService receives the Pipeline and talks to the port open on
> >> JobManager (via REST service, maybe) to submit the Job
> >> 8. Job is executed
> >> 9. Where is UserProgram.main() at this point?
> >>
> >> I think that even running UserProgram.main() in the JobManager is
> already
> >> too much. A JobManager should accept JobGraphs (or something) and
> execute
> >> them, nothing more. Running UserProgram.main() makes some things
> >> complicated or weird. For example, what happens when that
> >> UserProgram.main() creates a RemoteEnvironment and uses that? What
> happens
> >> when the user code calls execute() multiple times.
> >>
> >> I think a good solution for the motivating use case is to
> >>
> >> a) run BeamJobService as a separate service that talks to a running
> >> JobManager via REST for submitting jobs that it receives
> >>
> >> b) Spawning a JobManager inside the BeamJobService, i.e. the
> >> BeamJobService is like the entry point in a per-job Kubernetes model.
> >> Something that the new Executor work ([1], [2]) will enable.
> >>
> >> Any thoughts? I’m happy to jump on a call about this because these
> things
> >> are very tricky to figure out and I might be wrong.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
> >> [2]
> >>
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d88e631
> >>
> >>> On 6. Aug 2019, at 09:51, Till Rohrmann <[hidden email]> wrote:
> >>>
> >>> I think there was the idea to make the JobGraph a "public"/stable
> >> interface
> >>> other projects can rely on at some point. If I remember correctly, then
> >> we
> >>> wanted to define a proto buf definition for the JobGraph so that
> clients
> >>> written in a different language can submit JobGraphs and we could
> extend
> >>> the data structure. As far as I know, this effort hasn't been started
> yet
> >>> and is still in the backlog (I think there doesn't exist a JIRA issue
> >> yet).
> >>>
> >>> The problem came up when discussing additions to the JobGraph because
> >> they
> >>> need to be backwards compatible otherwise newer version of Flink would
> >> not
> >>> be able to recover jobs. I think so far Flink provides backwards
> >>> compatibility between different versions of the JobGraph. However, this
> >> is
> >>> not officially guaranteed.
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <[hidden email]> wrote:
> >>>
> >>>> It sounds like a request to change the interface Program into
> >>>>
> >>>> public interface Program {
> >>>> JobGraph getJobGraph(String... args);
> >>>> }
> >>>>
> >>>> Also, given that JobGraph is said as internal interface or
> >>>> cannot be relied on, we might introduce and use a
> >>>> representation that allows for cross version compatibility.
> >>>>
> >>>>
> >>>> Thomas Weise <[hidden email]> 于2019年8月6日周二 上午12:11写道:
> >>>>
> >>>>> If the goal is to keep job creation and job submission separate and
> we
> >>>>> agree that there should be more flexibility for the job construction,
> >>>> then
> >>>>> JobGraph and friends should be stable API that the user can depend
> on.
> >> If
> >>>>> that's the case, the path Chesnay pointed to may become viable.
> >>>>>
> >>>>> There was discussion in the past that JobGraph cannot be relied on
> WRT
> >>>>> backward compatibility and I would expect that at some point we want
> to
> >>>>> move to a representation that allows for cross version compatibility.
> >>>> Beam
> >>>>> is an example how this could be accomplished (with its pipeline
> proto).
> >>>>>
> >>>>> So if the Beam job server was able to produce the JobGraph, is there
> >>>>> agreement that we should provide a mechanism that allows the program
> >>>> entry
> >>>>> point to return the JobGraph directly (without using the
> >>>>> ExecutionEnvironment to build it)?
> >>>>>
> >>>>>
> >>>>> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <[hidden email]>
> wrote:
> >>>>>
> >>>>>> Hi Thomas,
> >>>>>>
> >>>>>> If REST handler calls main(), the behavior inside main() is
> >>>>>> unpredictable.
> >>>>>>
> >>>>>> Now the jar run handler extract the job graph and submit
> >>>>>> it with the job id configured in REST request. If REST
> >>>>>> handler calls main() we can hardly even know how much
> >>>>>> jobs are executed.
> >>>>>>
> >>>>>> A new environment, as you said,
> >>>>>> ExtractJobGraphAndSubmitToDispatcherEnvironment can be
> >>>>>> added to satisfy your requirement. However, it is a bit
> >>>>>> out of Flink scope. It might be better to write your own
> >>>>>> REST handler.
> >>>>>>
> >>>>>> WebMonitorExtension is for extending REST handlers but
> >>>>>> it seems also unable to customize...
> >>>>>>
> >>>>>> Best,
> >>>>>> tison.
> >>>>>>
> >>>>>>
> >>>>>> Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:
> >>>>>>
> >>>>>>> Thanks for looking into this.
> >>>>>>>
> >>>>>>> I see the "Jar run handler" as function that takes few parameters
> and
> >>>>>>> returns a job ID. I think it would be nice if the handler doesn't
> >>>> hard
> >>>>>> code
> >>>>>>> the function. Perhaps this could be accomplished by pushing the
> code
> >>>>> into
> >>>>>>> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment"
> that
> >>>>> the
> >>>>>>> main method could also bypass if it has an alternative way to
> provide
> >>>>> the
> >>>>>>> jobId via a context variable?
> >>>>>>>
> >>>>>>> Zili: I looked at the client API proposal and left a few comments.
> I
> >>>>>> think
> >>>>>>> it is important to improve programmatic job submission. But it also
> >>>>> seems
> >>>>>>> orthogonal to how the jar run handler operates (i.e. these issues
> >>>> could
> >>>>>> be
> >>>>>>> addressed independently).
> >>>>>>>
> >>>>>>> Chesnay: You are right that the Beam job sever could be hacked to
> >>>>> extract
> >>>>>>> job graph and other ingredients. This isn't desirable though
> because
> >>>>>> these
> >>>>>>> Flink internals should not be exposed downstream. But even if we
> went
> >>>>>> down
> >>>>>>> that route we would still need a way to let the jar run handler
> know
> >>>> to
> >>>>>>> just return the ID of an already submitted job vs. trying to submit
> >>>> one
> >>>>>>> from OptimizerPlanEnvironment.
> >>>>>>>
> >>>>>>> The intended sequence would be:
> >>>>>>>
> >>>>>>> REST client provides a launcher jar
> >>>>>>> REST client "runs jar"
> >>>>>>> REST handler calls main()
> >>>>>>> main launches Beam job server, runs Beam pipeline construction code
> >>>>>> against
> >>>>>>> that job server
> >>>>>>> job server uses RemoteEnvironment to submit real job
> >>>>>>> main "returns job id"
> >>>>>>> REST handler returns job id
> >>>>>>>
> >>>>>>> Thomas
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> By the way, currently Dispatcher implements RestfulGateway
> >>>>>>>> and delegate resource request to ResourceManager. If we can,
> >>>>>>>> semantically, let WebMonitor implement RestfulGateway,
> >>>>>>>> and delegate job request to Dispatcher, resource request to
> >>>>>>>> ResourceManager, it seems reasonable that when WebMonitor
> >>>>>>>> receives a JarRun request, it spawns a process and run
> >>>>>>>> the main method of the main class of that jar.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> tison.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
> >>>>>>>>
> >>>>>>>>> I don't think the `Program` interface could solve the problem.
> >>>>>>>>>
> >>>>>>>>> The launcher launches the job server which creates the job graph,
> >>>>>>>>> submits it and keeps monitoring. Even if user program implement
> >>>>>>>>> `Program` Flink still extracts the JobGraph from `getPlan` and
> >>>>>>>>> submits it, instead of really execute codes in main method of
> >>>>>>>>> user program, so that the launcher is not started.
> >>>>>>>>>
> >>>>>>>>> @Thomas,
> >>>>>>>>>
> >>>>>>>>> Here is an ongoing discussion on client refactoring[1] as Till
> >>>>>>>>> mentioned. However, I'm afraid that with current jar run
> semantic,
> >>>>>>>>> i.e., extract the job graph and submit it to the Dispatcher, it
> >>>>> cannot
> >>>>>>>>> fits your requirement. The problem is that REST API directly
> >>>>>>>>> communicates with Dispatcher and thus it's strange to tell the
> >>>>>>>>> Dispatcher "just run a program in a process".
> >>>>>>>>>
> >>>>>>>>> As you mentioned in the document, with CLI in session mode the
> >>>>>>>>> whole program would be executed sequentially. I'll appreciate it
> >>>>>>>>> if you can participant the thread on client refactor[1]. In the
> >>>>>>>>> design document[2], we propose to provide rich interfaces for
> >>>>>>>>> downstream projects integration. You can customize your CLI for
> >>>>>>>>> executing your program arbitrarily. Any requirement or advise
> >>>>>>>>> would be help.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> tison.
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >>>>>>>>> [2]
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
> >>>>>>>>>
> >>>>>>>>>> Are you looking for something similar to the `Program`
> interface?
> >>>>>> This
> >>>>>>>>>> interface, even though it is a bit outdated and might get
> removed
> >>>>> in
> >>>>>>> the
> >>>>>>>>>> future, offers a `getPlan` method which is called in order to
> >>>>>> generate
> >>>>>>>>>> the
> >>>>>>>>>> `JobGraph`. In the client refactoring discussion thread it is
> >>>>>> currently
> >>>>>>>>>> being discussed what to do with this interface.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Till
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
> >>>>>> [hidden email]>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Couldn't the beam job server use the same work-around we're
> >>>> using
> >>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>> JarRunHandler to get access to the JobGraph?
> >>>>>>>>>>>
> >>>>>>>>>>> On 26/07/2019 17:38, Thomas Weise wrote:
> >>>>>>>>>>>> Hi Till,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for taking a look!
> >>>>>>>>>>>>
> >>>>>>>>>>>> The Beam job server does not currently have the ability to
> >>>> just
> >>>>>>>>>> output
> >>>>>>>>>>> the
> >>>>>>>>>>>> job graph (and related artifacts) that could then be used
> >>>> with
> >>>>>> the
> >>>>>>>>>>>> JobSubmitHandler. It is itself using
> >>>>> StreamExecutionEnvironment,
> >>>>>>>>>> which in
> >>>>>>>>>>>> turn will lead to a REST API submission.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Here I'm looking at what happens before the Beam job server
> >>>>> gets
> >>>>>>>>>>> involved:
> >>>>>>>>>>>> the interaction of the k8s operator with the Flink
> >>>> deployment.
> >>>>>> The
> >>>>>>>>>> jar
> >>>>>>>>>>> run
> >>>>>>>>>>>> endpoint (ignoring the current handler implementation) is
> >>>>> generic
> >>>>>>> and
> >>>>>>>>>>>> pretty much exactly matches what we would need for a uniform
> >>>>>> entry
> >>>>>>>>>> point.
> >>>>>>>>>>>> It's just that in the Beam case the jar file would itself be
> >>>> a
> >>>>>>>>>> "launcher"
> >>>>>>>>>>>> that doesn't provide the job graph itself, but the
> >>>> dependencies
> >>>>>> and
> >>>>>>>>>>>> mechanism to invoke the actual client.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I could accomplish what I'm looking for by creating a
> >>>> separate
> >>>>>> REST
> >>>>>>>>>>>> endpoint that looks almost the same. But I would prefer to
> >>>>> reuse
> >>>>>>> the
> >>>>>>>>>>> Flink
> >>>>>>>>>>>> REST API interaction that is already implemented for the
> >>>> Flink
> >>>>>> Java
> >>>>>>>>>> jobs
> >>>>>>>>>>> to
> >>>>>>>>>>>> reduce the complexity of the deployment.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thomas
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
> >>>>>>> [hidden email]>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> quick question: Why do you wanna use the JarRunHandler? If
> >>>>>> another
> >>>>>>>>>>> process
> >>>>>>>>>>>>> is building the JobGraph, then one could use the
> >>>>>> JobSubmitHandler
> >>>>>>>>>> which
> >>>>>>>>>>>>> expects a JobGraph and then starts executing it.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Till
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <
> >>>> [hidden email]>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> While considering different options to launch Beam jobs
> >>>>> through
> >>>>>>> the
> >>>>>>>>>>> Flink
> >>>>>>>>>>>>>> REST API, I noticed that the implementation of
> >>>> JarRunHandler
> >>>>>>> places
> >>>>>>>>>>>>> quite a
> >>>>>>>>>>>>>> few restrictions on how the entry point shall construct a
> >>>>> Flink
> >>>>>>>>>> job, by
> >>>>>>>>>>>>>> extracting and manipulating the job graph.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> That's normally not a problem for Flink Java programs, but
> >>>> in
> >>>>>> the
> >>>>>>>>>>>>> scenario
> >>>>>>>>>>>>>> I'm looking at, the job graph would be constructed by a
> >>>>>> different
> >>>>>>>>>>> process
> >>>>>>>>>>>>>> and isn't available to the REST handler. Instead, I would
> >>>>> like
> >>>>>> to
> >>>>>>>>>> be
> >>>>>>>>>>> able
> >>>>>>>>>>>>>> to just respond with the job ID of the already launched
> >>>> job.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For context, please see:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> >>>>>>>>>>>>>> The current JarRunHandler code is here:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> >>>>>>>>>>>>>> It would be nice if there was an option to delegate the
> >>>>>>>>>> responsibility
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>> job submission to the user code / entry point. That would
> >>>> be
> >>>>>>>>>> useful for
> >>>>>>>>>>>>>> Beam and other frameworks built on top of Flink that
> >>>>>> dynamically
> >>>>>>>>>>> create a
> >>>>>>>>>>>>>> job graph from a different representation.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Possible ways to get there:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> * an interface that the main class can be implement end
> >>>> when
> >>>>>>>>>> present,
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> jar run handler calls instead of main.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> * an annotated method
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Either way query parameters like savepoint path and
> >>>>> parallelism
> >>>>>>>>>> would
> >>>>>>>>>>> be
> >>>>>>>>>>>>>> forwarded to the user code and the result would be the ID
> >>>> of
> >>>>>> the
> >>>>>>>>>>> launched
> >>>>>>>>>>>>>> job.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thougths?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: REST API / JarRunHandler: More flexibility for launching jobs

Thomas Weise
The following methods of job submission are relevant (but first two are
just general Flink):

1) Flink CLI that submits the job from a Java client environment.

2) Non-JVM tools like the FlinkK8sOperator that use the REST API to launch
the job (and optionally also upload the jar, which otherwise could be baked
into an image at build time).

3) The Beam runner also has the option to use the Remote(Stream)Environment
directly to submit the job. This scenario applies when the Beam job server
is the client, as shown in the doc. In this case it is important that
settings like parallelism and savepoint are supported by the programmatic
API.

HTH,
Thomas


On Sun, Sep 29, 2019 at 11:04 AM Zili Chen <[hidden email]> wrote:

> Aljoscha & Thomas,
>
> The Program way won't work without exposing pipeline compiler supports
> because otherwise there
> is no way user creates a Pipeline by himself.
>
> I am not sure what it means above "running a job without a Java Client", if
> it means the server/cluster
> is Dispatcher, then the other language of client should know JobGraph level
> concept too.
>
> Hi Thomas, I'll highly appreciate it if you can, forget interface audience
> issue from Flink for a while,
> describe what Beam exactly want to bridge between
>
> - a program written by Beam's user, describing Beam Pipeline and execute
> Pipeline#run
>
> and
>
> - a flink running job
>
> For example,
>
> When in Beam scope, on Pipeline#run executed, Beam create a Flink <what>
> and submit to <where>
> by <what>.
>
> If with FLIP-73 Pipeline is a user-facing Flink job description I would
> suggest expose Executor and a
> method to get Pipeline from Environment so that Beam create a Flink
> Pipeline and deploy the job
> by Executor.
>
> Best,
> tison.
>
>
> Aljoscha Krettek <[hidden email]> 于2019年9月27日周五 下午4:45写道:
>
> > Hi Thomas,
> >
> > Ok, I agree with the necessity of running a job without a Java Client.
> > FLIP-73 aims at introducing a Pipeline (or FlinkPipeline) interface that
> is
> > the common interface of StreamGraph and Plan. Maybe we could reintroduce
> > something like Program that returns a Pipeline for this purpose.
> >
> > Best,
> > Aljoscha
> >
> > > On 26. Sep 2019, at 19:30, Thomas Weise <[hidden email]> wrote:
> > >
> > > Hi Aljoscha,
> > >
> > > Thanks for taking a look!
> > >
> > > Multiple options to approach the submission part for the Beam use case
> > are
> > > discussed in [1].
> > >
> > > I'm actually now working on a different approach that creates a Flink
> jar
> > > at build time.
> > >
> > > To the point of whether UserProgram.main() should be called in
> JobManager
> > > or not: I believe it is important to provide the user an option to
> > submit a
> > > job without running a Java client. Today that necessitates that the
> Java
> > > entry point will be called on the JM.
> > >
> > > The executor related work as such won't change that. However, it would
> be
> > > nice to have a separate mechanism that allows the user to specify an
> > entry
> > > point that produces the FlinkPipeline/plan, w/o having to "execute"
> > through
> > > the kind of hacky context environment.
> > >
> > > Thanks,
> > > Thomas
> > >
> > > [1]
> > >
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> > >
> > >
> > >
> > >
> > > On Thu, Sep 26, 2019 at 7:15 AM Aljoscha Krettek <[hidden email]>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> Regarding the original proposal: I don’t think spawning another
> process
> > >> inside the JarHandler.runJar() is the way to go here. Looking at the
> > bigger
> > >> picture, the proposal would get us to roughly this situation:
> > >>
> > >> 1. Spawn Kubernetes containers (JobManager and TaskManagers)
> > >> 2. User does a REST call to JobManager.runJar() to submit the user job
> > >> 3. JobManager.runJar() opens a port that waits for job submission
> > >> 4. JobMananger.runJar() invokes UserProgram.main()
> > >> 5. UserProgram.main() launches a process (BeamJobService) that opens a
> > >> port to wait for a Python process to connect to it
> > >> 6. UserProgram.main() launches another process (the Python code, or
> any
> > >> language, really) that connects to BeamJobService to submit the
> Pipeline
> > >> 7. BeamJobService receives the Pipeline and talks to the port open on
> > >> JobManager (via REST service, maybe) to submit the Job
> > >> 8. Job is executed
> > >> 9. Where is UserProgram.main() at this point?
> > >>
> > >> I think that even running UserProgram.main() in the JobManager is
> > already
> > >> too much. A JobManager should accept JobGraphs (or something) and
> > execute
> > >> them, nothing more. Running UserProgram.main() makes some things
> > >> complicated or weird. For example, what happens when that
> > >> UserProgram.main() creates a RemoteEnvironment and uses that? What
> > happens
> > >> when the user code calls execute() multiple times.
> > >>
> > >> I think a good solution for the motivating use case is to
> > >>
> > >> a) run BeamJobService as a separate service that talks to a running
> > >> JobManager via REST for submitting jobs that it receives
> > >>
> > >> b) Spawning a JobManager inside the BeamJobService, i.e. the
> > >> BeamJobService is like the entry point in a per-job Kubernetes model.
> > >> Something that the new Executor work ([1], [2]) will enable.
> > >>
> > >> Any thoughts? I’m happy to jump on a call about this because these
> > things
> > >> are very tricky to figure out and I might be wrong.
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
> > >> [2]
> > >>
> >
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d88e631
> > >>
> > >>> On 6. Aug 2019, at 09:51, Till Rohrmann <[hidden email]>
> wrote:
> > >>>
> > >>> I think there was the idea to make the JobGraph a "public"/stable
> > >> interface
> > >>> other projects can rely on at some point. If I remember correctly,
> then
> > >> we
> > >>> wanted to define a proto buf definition for the JobGraph so that
> > clients
> > >>> written in a different language can submit JobGraphs and we could
> > extend
> > >>> the data structure. As far as I know, this effort hasn't been started
> > yet
> > >>> and is still in the backlog (I think there doesn't exist a JIRA issue
> > >> yet).
> > >>>
> > >>> The problem came up when discussing additions to the JobGraph because
> > >> they
> > >>> need to be backwards compatible otherwise newer version of Flink
> would
> > >> not
> > >>> be able to recover jobs. I think so far Flink provides backwards
> > >>> compatibility between different versions of the JobGraph. However,
> this
> > >> is
> > >>> not officially guaranteed.
> > >>>
> > >>> Cheers,
> > >>> Till
> > >>>
> > >>> On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <[hidden email]>
> wrote:
> > >>>
> > >>>> It sounds like a request to change the interface Program into
> > >>>>
> > >>>> public interface Program {
> > >>>> JobGraph getJobGraph(String... args);
> > >>>> }
> > >>>>
> > >>>> Also, given that JobGraph is said as internal interface or
> > >>>> cannot be relied on, we might introduce and use a
> > >>>> representation that allows for cross version compatibility.
> > >>>>
> > >>>>
> > >>>> Thomas Weise <[hidden email]> 于2019年8月6日周二 上午12:11写道:
> > >>>>
> > >>>>> If the goal is to keep job creation and job submission separate and
> > we
> > >>>>> agree that there should be more flexibility for the job
> construction,
> > >>>> then
> > >>>>> JobGraph and friends should be stable API that the user can depend
> > on.
> > >> If
> > >>>>> that's the case, the path Chesnay pointed to may become viable.
> > >>>>>
> > >>>>> There was discussion in the past that JobGraph cannot be relied on
> > WRT
> > >>>>> backward compatibility and I would expect that at some point we
> want
> > to
> > >>>>> move to a representation that allows for cross version
> compatibility.
> > >>>> Beam
> > >>>>> is an example how this could be accomplished (with its pipeline
> > proto).
> > >>>>>
> > >>>>> So if the Beam job server was able to produce the JobGraph, is
> there
> > >>>>> agreement that we should provide a mechanism that allows the
> program
> > >>>> entry
> > >>>>> point to return the JobGraph directly (without using the
> > >>>>> ExecutionEnvironment to build it)?
> > >>>>>
> > >>>>>
> > >>>>> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <[hidden email]>
> > wrote:
> > >>>>>
> > >>>>>> Hi Thomas,
> > >>>>>>
> > >>>>>> If REST handler calls main(), the behavior inside main() is
> > >>>>>> unpredictable.
> > >>>>>>
> > >>>>>> Now the jar run handler extract the job graph and submit
> > >>>>>> it with the job id configured in REST request. If REST
> > >>>>>> handler calls main() we can hardly even know how much
> > >>>>>> jobs are executed.
> > >>>>>>
> > >>>>>> A new environment, as you said,
> > >>>>>> ExtractJobGraphAndSubmitToDispatcherEnvironment can be
> > >>>>>> added to satisfy your requirement. However, it is a bit
> > >>>>>> out of Flink scope. It might be better to write your own
> > >>>>>> REST handler.
> > >>>>>>
> > >>>>>> WebMonitorExtension is for extending REST handlers but
> > >>>>>> it seems also unable to customize...
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> tison.
> > >>>>>>
> > >>>>>>
> > >>>>>> Thomas Weise <[hidden email]> 于2019年8月3日周六 上午4:09写道:
> > >>>>>>
> > >>>>>>> Thanks for looking into this.
> > >>>>>>>
> > >>>>>>> I see the "Jar run handler" as function that takes few parameters
> > and
> > >>>>>>> returns a job ID. I think it would be nice if the handler doesn't
> > >>>> hard
> > >>>>>> code
> > >>>>>>> the function. Perhaps this could be accomplished by pushing the
> > code
> > >>>>> into
> > >>>>>>> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment"
> > that
> > >>>>> the
> > >>>>>>> main method could also bypass if it has an alternative way to
> > provide
> > >>>>> the
> > >>>>>>> jobId via a context variable?
> > >>>>>>>
> > >>>>>>> Zili: I looked at the client API proposal and left a few
> comments.
> > I
> > >>>>>> think
> > >>>>>>> it is important to improve programmatic job submission. But it
> also
> > >>>>> seems
> > >>>>>>> orthogonal to how the jar run handler operates (i.e. these issues
> > >>>> could
> > >>>>>> be
> > >>>>>>> addressed independently).
> > >>>>>>>
> > >>>>>>> Chesnay: You are right that the Beam job sever could be hacked to
> > >>>>> extract
> > >>>>>>> job graph and other ingredients. This isn't desirable though
> > because
> > >>>>>> these
> > >>>>>>> Flink internals should not be exposed downstream. But even if we
> > went
> > >>>>>> down
> > >>>>>>> that route we would still need a way to let the jar run handler
> > know
> > >>>> to
> > >>>>>>> just return the ID of an already submitted job vs. trying to
> submit
> > >>>> one
> > >>>>>>> from OptimizerPlanEnvironment.
> > >>>>>>>
> > >>>>>>> The intended sequence would be:
> > >>>>>>>
> > >>>>>>> REST client provides a launcher jar
> > >>>>>>> REST client "runs jar"
> > >>>>>>> REST handler calls main()
> > >>>>>>> main launches Beam job server, runs Beam pipeline construction
> code
> > >>>>>> against
> > >>>>>>> that job server
> > >>>>>>> job server uses RemoteEnvironment to submit real job
> > >>>>>>> main "returns job id"
> > >>>>>>> REST handler returns job id
> > >>>>>>>
> > >>>>>>> Thomas
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <[hidden email]>
> > >>>>> wrote:
> > >>>>>>>
> > >>>>>>>> By the way, currently Dispatcher implements RestfulGateway
> > >>>>>>>> and delegate resource request to ResourceManager. If we can,
> > >>>>>>>> semantically, let WebMonitor implement RestfulGateway,
> > >>>>>>>> and delegate job request to Dispatcher, resource request to
> > >>>>>>>> ResourceManager, it seems reasonable that when WebMonitor
> > >>>>>>>> receives a JarRun request, it spawns a process and run
> > >>>>>>>> the main method of the main class of that jar.
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> tison.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Zili Chen <[hidden email]> 于2019年7月31日周三 下午7:10写道:
> > >>>>>>>>
> > >>>>>>>>> I don't think the `Program` interface could solve the problem.
> > >>>>>>>>>
> > >>>>>>>>> The launcher launches the job server which creates the job
> graph,
> > >>>>>>>>> submits it and keeps monitoring. Even if user program implement
> > >>>>>>>>> `Program` Flink still extracts the JobGraph from `getPlan` and
> > >>>>>>>>> submits it, instead of really execute codes in main method of
> > >>>>>>>>> user program, so that the launcher is not started.
> > >>>>>>>>>
> > >>>>>>>>> @Thomas,
> > >>>>>>>>>
> > >>>>>>>>> Here is an ongoing discussion on client refactoring[1] as Till
> > >>>>>>>>> mentioned. However, I'm afraid that with current jar run
> > semantic,
> > >>>>>>>>> i.e., extract the job graph and submit it to the Dispatcher, it
> > >>>>> cannot
> > >>>>>>>>> fits your requirement. The problem is that REST API directly
> > >>>>>>>>> communicates with Dispatcher and thus it's strange to tell the
> > >>>>>>>>> Dispatcher "just run a program in a process".
> > >>>>>>>>>
> > >>>>>>>>> As you mentioned in the document, with CLI in session mode the
> > >>>>>>>>> whole program would be executed sequentially. I'll appreciate
> it
> > >>>>>>>>> if you can participant the thread on client refactor[1]. In the
> > >>>>>>>>> design document[2], we propose to provide rich interfaces for
> > >>>>>>>>> downstream projects integration. You can customize your CLI for
> > >>>>>>>>> executing your program arbitrarily. Any requirement or advise
> > >>>>>>>>> would be help.
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>> tison.
> > >>>>>>>>>
> > >>>>>>>>> [1]
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > >>>>>>>>> [2]
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Till Rohrmann <[hidden email]> 于2019年7月31日周三 下午4:50写道:
> > >>>>>>>>>
> > >>>>>>>>>> Are you looking for something similar to the `Program`
> > interface?
> > >>>>>> This
> > >>>>>>>>>> interface, even though it is a bit outdated and might get
> > removed
> > >>>>> in
> > >>>>>>> the
> > >>>>>>>>>> future, offers a `getPlan` method which is called in order to
> > >>>>>> generate
> > >>>>>>>>>> the
> > >>>>>>>>>> `JobGraph`. In the client refactoring discussion thread it is
> > >>>>>> currently
> > >>>>>>>>>> being discussed what to do with this interface.
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> Till
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
> > >>>>>> [hidden email]>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Couldn't the beam job server use the same work-around we're
> > >>>> using
> > >>>>>> in
> > >>>>>>>>>> the
> > >>>>>>>>>>> JarRunHandler to get access to the JobGraph?
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 26/07/2019 17:38, Thomas Weise wrote:
> > >>>>>>>>>>>> Hi Till,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks for taking a look!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The Beam job server does not currently have the ability to
> > >>>> just
> > >>>>>>>>>> output
> > >>>>>>>>>>> the
> > >>>>>>>>>>>> job graph (and related artifacts) that could then be used
> > >>>> with
> > >>>>>> the
> > >>>>>>>>>>>> JobSubmitHandler. It is itself using
> > >>>>> StreamExecutionEnvironment,
> > >>>>>>>>>> which in
> > >>>>>>>>>>>> turn will lead to a REST API submission.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Here I'm looking at what happens before the Beam job server
> > >>>>> gets
> > >>>>>>>>>>> involved:
> > >>>>>>>>>>>> the interaction of the k8s operator with the Flink
> > >>>> deployment.
> > >>>>>> The
> > >>>>>>>>>> jar
> > >>>>>>>>>>> run
> > >>>>>>>>>>>> endpoint (ignoring the current handler implementation) is
> > >>>>> generic
> > >>>>>>> and
> > >>>>>>>>>>>> pretty much exactly matches what we would need for a uniform
> > >>>>>> entry
> > >>>>>>>>>> point.
> > >>>>>>>>>>>> It's just that in the Beam case the jar file would itself be
> > >>>> a
> > >>>>>>>>>> "launcher"
> > >>>>>>>>>>>> that doesn't provide the job graph itself, but the
> > >>>> dependencies
> > >>>>>> and
> > >>>>>>>>>>>> mechanism to invoke the actual client.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I could accomplish what I'm looking for by creating a
> > >>>> separate
> > >>>>>> REST
> > >>>>>>>>>>>> endpoint that looks almost the same. But I would prefer to
> > >>>>> reuse
> > >>>>>>> the
> > >>>>>>>>>>> Flink
> > >>>>>>>>>>>> REST API interaction that is already implemented for the
> > >>>> Flink
> > >>>>>> Java
> > >>>>>>>>>> jobs
> > >>>>>>>>>>> to
> > >>>>>>>>>>>> reduce the complexity of the deployment.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thomas
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
> > >>>>>>> [hidden email]>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Thomas,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> quick question: Why do you wanna use the JarRunHandler? If
> > >>>>>> another
> > >>>>>>>>>>> process
> > >>>>>>>>>>>>> is building the JobGraph, then one could use the
> > >>>>>> JobSubmitHandler
> > >>>>>>>>>> which
> > >>>>>>>>>>>>> expects a JobGraph and then starts executing it.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>> Till
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <
> > >>>> [hidden email]>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> While considering different options to launch Beam jobs
> > >>>>> through
> > >>>>>>> the
> > >>>>>>>>>>> Flink
> > >>>>>>>>>>>>>> REST API, I noticed that the implementation of
> > >>>> JarRunHandler
> > >>>>>>> places
> > >>>>>>>>>>>>> quite a
> > >>>>>>>>>>>>>> few restrictions on how the entry point shall construct a
> > >>>>> Flink
> > >>>>>>>>>> job, by
> > >>>>>>>>>>>>>> extracting and manipulating the job graph.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> That's normally not a problem for Flink Java programs, but
> > >>>> in
> > >>>>>> the
> > >>>>>>>>>>>>> scenario
> > >>>>>>>>>>>>>> I'm looking at, the job graph would be constructed by a
> > >>>>>> different
> > >>>>>>>>>>> process
> > >>>>>>>>>>>>>> and isn't available to the REST handler. Instead, I would
> > >>>>> like
> > >>>>>> to
> > >>>>>>>>>> be
> > >>>>>>>>>>> able
> > >>>>>>>>>>>>>> to just respond with the job ID of the already launched
> > >>>> job.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> For context, please see:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
> > >>>>>>>>>>>>>> The current JarRunHandler code is here:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
> > >>>>>>>>>>>>>> It would be nice if there was an option to delegate the
> > >>>>>>>>>> responsibility
> > >>>>>>>>>>>>> for
> > >>>>>>>>>>>>>> job submission to the user code / entry point. That would
> > >>>> be
> > >>>>>>>>>> useful for
> > >>>>>>>>>>>>>> Beam and other frameworks built on top of Flink that
> > >>>>>> dynamically
> > >>>>>>>>>>> create a
> > >>>>>>>>>>>>>> job graph from a different representation.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Possible ways to get there:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> * an interface that the main class can be implement end
> > >>>> when
> > >>>>>>>>>> present,
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>> jar run handler calls instead of main.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> * an annotated method
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Either way query parameters like savepoint path and
> > >>>>> parallelism
> > >>>>>>>>>> would
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>>> forwarded to the user code and the result would be the ID
> > >>>> of
> > >>>>>> the
> > >>>>>>>>>>> launched
> > >>>>>>>>>>>>>> job.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thougths?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>> Thomas
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> > >>
> >
> >
>