[DISCUSS] Programmatically submit Flink job jar to session cluster

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Programmatically submit Flink job jar to session cluster

Fabian Paul-2
Hi all,

Currently, the most convenient way of programmatically submitting a job to a running session cluster is using Flink’s RestClusterClient.
Unfortunately, it is only supported, as of now, to submit a job graph.[1] To construct a job graph from a jar file, additional Flink dependencies are required, which is not ideal.

It is also possible to directly use the Flink rest API and first upload the jar file via /jars/upload[2] and then run it via /jar/:jarId/run[3]. It has the downside that it is impossible to set a Flink execution configuration, and it will always take the underlying session cluster configuration.

I know changing the ClusterClient has already been discussed. It would involve a lot of effort, so what do you think of making the jar execution more prominent via the rest endpoint by adding the option to pass an execution configuration?

Best,
Fabian

[1] https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95
[2] https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 <https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56>
[3] https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Programmatically submit Flink job jar to session cluster

Till Rohrmann
Hi Fabian,

thanks for starting this discussion. In general I would be a bit hesitant
to build upon Flink's web UI submission because it suffers from a couple of
drawbacks.

1) The web UI submission only supports single job applications.
2) The JobGraph is generated from within the web UI Netty thread. Hence, if
the user code blocks, then this can make the web UI unresponsive.
3) Uploaded jars are not persisted. Hence, if a JobManager failover occurs
between uploading and running the job, then you might have lost the
uploaded jars.

The reason for some of these problems is that the feature was actually
implemented for some conference and almost remained untouched ever since.
Building more functionality on top of it will mean that it will be harder
to remove in the future.

Cheers,
Till

On Tue, Dec 8, 2020 at 12:00 PM Fabian Paul <[hidden email]>
wrote:

> Hi all,
>
> Currently, the most convenient way of programmatically submitting a job to
> a running session cluster is using Flink’s RestClusterClient.
> Unfortunately, it is only supported, as of now, to submit a job graph.[1]
> To construct a job graph from a jar file, additional Flink dependencies are
> required, which is not ideal.
>
> It is also possible to directly use the Flink rest API and first upload
> the jar file via /jars/upload[2] and then run it via /jar/:jarId/run[3]. It
> has the downside that it is impossible to set a Flink execution
> configuration, and it will always take the underlying session cluster
> configuration.
>
> I know changing the ClusterClient has already been discussed. It would
> involve a lot of effort, so what do you think of making the jar execution
> more prominent via the rest endpoint by adding the option to pass an
> execution configuration?
>
> Best,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95
> [2]
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> <
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> >
> [3]
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Programmatically submit Flink job jar to session cluster

Arvid Heise-3
I'm surprised that this is not possible currently. Seems like a glaring
missing feature to me.

I'd assume the best way would be to extend the REST API to /jar/:jarId/run
with an option to overwrite configuration values. I'm not sure how to map
json well to the yaml structure of the config, but I guess we mostly have
simple key/value pairs anyways.

On Tue, Dec 8, 2020 at 1:31 PM Till Rohrmann <[hidden email]> wrote:

> Hi Fabian,
>
> thanks for starting this discussion. In general I would be a bit hesitant
> to build upon Flink's web UI submission because it suffers from a couple of
> drawbacks.
>
> 1) The web UI submission only supports single job applications.
> 2) The JobGraph is generated from within the web UI Netty thread. Hence, if
> the user code blocks, then this can make the web UI unresponsive.
> 3) Uploaded jars are not persisted. Hence, if a JobManager failover occurs
> between uploading and running the job, then you might have lost the
> uploaded jars.
>
> The reason for some of these problems is that the feature was actually
> implemented for some conference and almost remained untouched ever since.
> Building more functionality on top of it will mean that it will be harder
> to remove in the future.
>
> Cheers,
> Till
>
> On Tue, Dec 8, 2020 at 12:00 PM Fabian Paul <[hidden email]>
> wrote:
>
> > Hi all,
> >
> > Currently, the most convenient way of programmatically submitting a job
> to
> > a running session cluster is using Flink’s RestClusterClient.
> > Unfortunately, it is only supported, as of now, to submit a job graph.[1]
> > To construct a job graph from a jar file, additional Flink dependencies
> are
> > required, which is not ideal.
> >
> > It is also possible to directly use the Flink rest API and first upload
> > the jar file via /jars/upload[2] and then run it via /jar/:jarId/run[3].
> It
> > has the downside that it is impossible to set a Flink execution
> > configuration, and it will always take the underlying session cluster
> > configuration.
> >
> > I know changing the ClusterClient has already been discussed. It would
> > involve a lot of effort, so what do you think of making the jar execution
> > more prominent via the rest endpoint by adding the option to pass an
> > execution configuration?
> >
> > Best,
> > Fabian
> >
> > [1]
> >
> https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95
> > [2]
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> > <
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> > >
> > [3]
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
>


--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Programmatically submit Flink job jar to session cluster

Yang Wang
Actually, I think the key point is that the Flink client is not friendly to
the deployers.
Most companies have their own deployers and I believe many of them depend on
the cli commands(e.g. "flink run/run-application").

I am not sure whether using the rest cluster client is the best choice. But
we could
have an alternative as follows.

# Set the configuration based on the deployment mode(session, perjob)
Configuration flinkConfig = new Configuration();
... ...
flinkConfig.set("execution.target", "kubernetes-session");
flinkConfig.set("kubernetes.cluster-id", "my-flink-k8s-session");
# Build a packaged program
PackagedProgram program =
PackagedProgram.newBuilder().setConfiguration(flinkConfig)...build();
# Run the packaged program on the deployment. Maybe we also need to set the
Context.
program.invokeInteractiveModeForExecution();

In my opinion, the PackagedProgram is more appropriate for jar submission.


Best,
Yang


Arvid Heise <[hidden email]> 于2020年12月8日周二 下午9:39写道:

> I'm surprised that this is not possible currently. Seems like a glaring
> missing feature to me.
>
> I'd assume the best way would be to extend the REST API to /jar/:jarId/run
> with an option to overwrite configuration values. I'm not sure how to map
> json well to the yaml structure of the config, but I guess we mostly have
> simple key/value pairs anyways.
>
> On Tue, Dec 8, 2020 at 1:31 PM Till Rohrmann <[hidden email]> wrote:
>
> > Hi Fabian,
> >
> > thanks for starting this discussion. In general I would be a bit hesitant
> > to build upon Flink's web UI submission because it suffers from a couple
> of
> > drawbacks.
> >
> > 1) The web UI submission only supports single job applications.
> > 2) The JobGraph is generated from within the web UI Netty thread. Hence,
> if
> > the user code blocks, then this can make the web UI unresponsive.
> > 3) Uploaded jars are not persisted. Hence, if a JobManager failover
> occurs
> > between uploading and running the job, then you might have lost the
> > uploaded jars.
> >
> > The reason for some of these problems is that the feature was actually
> > implemented for some conference and almost remained untouched ever since.
> > Building more functionality on top of it will mean that it will be harder
> > to remove in the future.
> >
> > Cheers,
> > Till
> >
> > On Tue, Dec 8, 2020 at 12:00 PM Fabian Paul <
> [hidden email]>
> > wrote:
> >
> > > Hi all,
> > >
> > > Currently, the most convenient way of programmatically submitting a job
> > to
> > > a running session cluster is using Flink’s RestClusterClient.
> > > Unfortunately, it is only supported, as of now, to submit a job
> graph.[1]
> > > To construct a job graph from a jar file, additional Flink dependencies
> > are
> > > required, which is not ideal.
> > >
> > > It is also possible to directly use the Flink rest API and first upload
> > > the jar file via /jars/upload[2] and then run it via
> /jar/:jarId/run[3].
> > It
> > > has the downside that it is impossible to set a Flink execution
> > > configuration, and it will always take the underlying session cluster
> > > configuration.
> > >
> > > I know changing the ClusterClient has already been discussed. It would
> > > involve a lot of effort, so what do you think of making the jar
> execution
> > > more prominent via the rest endpoint by adding the option to pass an
> > > execution configuration?
> > >
> > > Best,
> > > Fabian
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95
> > > [2]
> > >
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> > > <
> > >
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> > > >
> > > [3]
> > >
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Programmatically submit Flink job jar to session cluster

Flavio Pompermaier
To me creating the PackagedProgram on the client side is very bad, at least
for 2 things:
   1. You must ensure to have almost the same classpath of the Flink
cluster otherwise you can face problems in deserializing the submitted job
graph (for example jackson automatically tries to create modules that can
be found on the client classpath if using spring but not on the job
manager...that's exactly what happened to me initially)
   2. Also if you manage to create the PackagedProgram correctly, Job
listeners are not fired

So I ended up extending the RestClusterClient in order to use uploadJar +
runJob..you can look at the extended class at [1].
Unfortunately I still have to understand how to understand if dynamic
classloading il closed correctly or not by Job managers and Task managers
because I suspect that Tasks are not finalized correctly as detected for
Python at [2]

[1]
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java
[2] https://issues.apache.org/jira/browse/FLINK-20333

Best,
Flavio

On Wed, Dec 9, 2020 at 12:43 PM Yang Wang <[hidden email]> wrote:

> Actually, I think the key point is that the Flink client is not friendly to
> the deployers.
> Most companies have their own deployers and I believe many of them depend
> on
> the cli commands(e.g. "flink run/run-application").
>
> I am not sure whether using the rest cluster client is the best choice. But
> we could
> have an alternative as follows.
>
> # Set the configuration based on the deployment mode(session, perjob)
> Configuration flinkConfig = new Configuration();
> ... ...
> flinkConfig.set("execution.target", "kubernetes-session");
> flinkConfig.set("kubernetes.cluster-id", "my-flink-k8s-session");
> # Build a packaged program
> PackagedProgram program =
> PackagedProgram.newBuilder().setConfiguration(flinkConfig)...build();
> # Run the packaged program on the deployment. Maybe we also need to set the
> Context.
> program.invokeInteractiveModeForExecution();
>
> In my opinion, the PackagedProgram is more appropriate for jar submission.
>
>
> Best,
> Yang
>
>
> Arvid Heise <[hidden email]> 于2020年12月8日周二 下午9:39写道:
>
> > I'm surprised that this is not possible currently. Seems like a glaring
> > missing feature to me.
> >
> > I'd assume the best way would be to extend the REST API to
> /jar/:jarId/run
> > with an option to overwrite configuration values. I'm not sure how to map
> > json well to the yaml structure of the config, but I guess we mostly have
> > simple key/value pairs anyways.
> >
> > On Tue, Dec 8, 2020 at 1:31 PM Till Rohrmann <[hidden email]>
> wrote:
> >
> > > Hi Fabian,
> > >
> > > thanks for starting this discussion. In general I would be a bit
> hesitant
> > > to build upon Flink's web UI submission because it suffers from a
> couple
> > of
> > > drawbacks.
> > >
> > > 1) The web UI submission only supports single job applications.
> > > 2) The JobGraph is generated from within the web UI Netty thread.
> Hence,
> > if
> > > the user code blocks, then this can make the web UI unresponsive.
> > > 3) Uploaded jars are not persisted. Hence, if a JobManager failover
> > occurs
> > > between uploading and running the job, then you might have lost the
> > > uploaded jars.
> > >
> > > The reason for some of these problems is that the feature was actually
> > > implemented for some conference and almost remained untouched ever
> since.
> > > Building more functionality on top of it will mean that it will be
> harder
> > > to remove in the future.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Dec 8, 2020 at 12:00 PM Fabian Paul <
> > [hidden email]>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Currently, the most convenient way of programmatically submitting a
> job
> > > to
> > > > a running session cluster is using Flink’s RestClusterClient.
> > > > Unfortunately, it is only supported, as of now, to submit a job
> > graph.[1]
> > > > To construct a job graph from a jar file, additional Flink
> dependencies
> > > are
> > > > required, which is not ideal.
> > > >
> > > > It is also possible to directly use the Flink rest API and first
> upload
> > > > the jar file via /jars/upload[2] and then run it via
> > /jar/:jarId/run[3].
> > > It
> > > > has the downside that it is impossible to set a Flink execution
> > > > configuration, and it will always take the underlying session cluster
> > > > configuration.
> > > >
> > > > I know changing the ClusterClient has already been discussed. It
> would
> > > > involve a lot of effort, so what do you think of making the jar
> > execution
> > > > more prominent via the rest endpoint by adding the option to pass an
> > > > execution configuration?
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95
> > > > [2]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> > > > <
> > > >
> > >
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> > > > >
> > > > [3]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56
> > >
> >
> >
> > --
> >
> > Arvid Heise | Senior Java Developer
> >
> > <https://www.ververica.com/>
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Toni) Cheng
> >