Hi all,
Currently, the most convenient way of programmatically submitting a job to a running session cluster is using Flink’s RestClusterClient. Unfortunately, it is only supported, as of now, to submit a job graph.[1] To construct a job graph from a jar file, additional Flink dependencies are required, which is not ideal. It is also possible to directly use the Flink rest API and first upload the jar file via /jars/upload[2] and then run it via /jar/:jarId/run[3]. It has the downside that it is impossible to set a Flink execution configuration, and it will always take the underlying session cluster configuration. I know changing the ClusterClient has already been discussed. It would involve a lot of effort, so what do you think of making the jar execution more prominent via the rest endpoint by adding the option to pass an execution configuration? Best, Fabian [1] https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95 [2] https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 <https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56> [3] https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 |
Hi Fabian,
thanks for starting this discussion. In general I would be a bit hesitant to build upon Flink's web UI submission because it suffers from a couple of drawbacks. 1) The web UI submission only supports single job applications. 2) The JobGraph is generated from within the web UI Netty thread. Hence, if the user code blocks, then this can make the web UI unresponsive. 3) Uploaded jars are not persisted. Hence, if a JobManager failover occurs between uploading and running the job, then you might have lost the uploaded jars. The reason for some of these problems is that the feature was actually implemented for some conference and almost remained untouched ever since. Building more functionality on top of it will mean that it will be harder to remove in the future. Cheers, Till On Tue, Dec 8, 2020 at 12:00 PM Fabian Paul <[hidden email]> wrote: > Hi all, > > Currently, the most convenient way of programmatically submitting a job to > a running session cluster is using Flink’s RestClusterClient. > Unfortunately, it is only supported, as of now, to submit a job graph.[1] > To construct a job graph from a jar file, additional Flink dependencies are > required, which is not ideal. > > It is also possible to directly use the Flink rest API and first upload > the jar file via /jars/upload[2] and then run it via /jar/:jarId/run[3]. It > has the downside that it is impossible to set a Flink execution > configuration, and it will always take the underlying session cluster > configuration. > > I know changing the ClusterClient has already been discussed. It would > involve a lot of effort, so what do you think of making the jar execution > more prominent via the rest endpoint by adding the option to pass an > execution configuration? > > Best, > Fabian > > [1] > https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95 > [2] > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > < > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > > [3] > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 |
I'm surprised that this is not possible currently. Seems like a glaring
missing feature to me. I'd assume the best way would be to extend the REST API to /jar/:jarId/run with an option to overwrite configuration values. I'm not sure how to map json well to the yaml structure of the config, but I guess we mostly have simple key/value pairs anyways. On Tue, Dec 8, 2020 at 1:31 PM Till Rohrmann <[hidden email]> wrote: > Hi Fabian, > > thanks for starting this discussion. In general I would be a bit hesitant > to build upon Flink's web UI submission because it suffers from a couple of > drawbacks. > > 1) The web UI submission only supports single job applications. > 2) The JobGraph is generated from within the web UI Netty thread. Hence, if > the user code blocks, then this can make the web UI unresponsive. > 3) Uploaded jars are not persisted. Hence, if a JobManager failover occurs > between uploading and running the job, then you might have lost the > uploaded jars. > > The reason for some of these problems is that the feature was actually > implemented for some conference and almost remained untouched ever since. > Building more functionality on top of it will mean that it will be harder > to remove in the future. > > Cheers, > Till > > On Tue, Dec 8, 2020 at 12:00 PM Fabian Paul <[hidden email]> > wrote: > > > Hi all, > > > > Currently, the most convenient way of programmatically submitting a job > to > > a running session cluster is using Flink’s RestClusterClient. > > Unfortunately, it is only supported, as of now, to submit a job graph.[1] > > To construct a job graph from a jar file, additional Flink dependencies > are > > required, which is not ideal. > > > > It is also possible to directly use the Flink rest API and first upload > > the jar file via /jars/upload[2] and then run it via /jar/:jarId/run[3]. > It > > has the downside that it is impossible to set a Flink execution > > configuration, and it will always take the underlying session cluster > > configuration. > > > > I know changing the ClusterClient has already been discussed. It would > > involve a lot of effort, so what do you think of making the jar execution > > more prominent via the rest endpoint by adding the option to pass an > > execution configuration? > > > > Best, > > Fabian > > > > [1] > > > https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95 > > [2] > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > < > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > > > > [3] > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Actually, I think the key point is that the Flink client is not friendly to
the deployers. Most companies have their own deployers and I believe many of them depend on the cli commands(e.g. "flink run/run-application"). I am not sure whether using the rest cluster client is the best choice. But we could have an alternative as follows. # Set the configuration based on the deployment mode(session, perjob) Configuration flinkConfig = new Configuration(); ... ... flinkConfig.set("execution.target", "kubernetes-session"); flinkConfig.set("kubernetes.cluster-id", "my-flink-k8s-session"); # Build a packaged program PackagedProgram program = PackagedProgram.newBuilder().setConfiguration(flinkConfig)...build(); # Run the packaged program on the deployment. Maybe we also need to set the Context. program.invokeInteractiveModeForExecution(); In my opinion, the PackagedProgram is more appropriate for jar submission. Best, Yang Arvid Heise <[hidden email]> 于2020年12月8日周二 下午9:39写道: > I'm surprised that this is not possible currently. Seems like a glaring > missing feature to me. > > I'd assume the best way would be to extend the REST API to /jar/:jarId/run > with an option to overwrite configuration values. I'm not sure how to map > json well to the yaml structure of the config, but I guess we mostly have > simple key/value pairs anyways. > > On Tue, Dec 8, 2020 at 1:31 PM Till Rohrmann <[hidden email]> wrote: > > > Hi Fabian, > > > > thanks for starting this discussion. In general I would be a bit hesitant > > to build upon Flink's web UI submission because it suffers from a couple > of > > drawbacks. > > > > 1) The web UI submission only supports single job applications. > > 2) The JobGraph is generated from within the web UI Netty thread. Hence, > if > > the user code blocks, then this can make the web UI unresponsive. > > 3) Uploaded jars are not persisted. Hence, if a JobManager failover > occurs > > between uploading and running the job, then you might have lost the > > uploaded jars. > > > > The reason for some of these problems is that the feature was actually > > implemented for some conference and almost remained untouched ever since. > > Building more functionality on top of it will mean that it will be harder > > to remove in the future. > > > > Cheers, > > Till > > > > On Tue, Dec 8, 2020 at 12:00 PM Fabian Paul < > [hidden email]> > > wrote: > > > > > Hi all, > > > > > > Currently, the most convenient way of programmatically submitting a job > > to > > > a running session cluster is using Flink’s RestClusterClient. > > > Unfortunately, it is only supported, as of now, to submit a job > graph.[1] > > > To construct a job graph from a jar file, additional Flink dependencies > > are > > > required, which is not ideal. > > > > > > It is also possible to directly use the Flink rest API and first upload > > > the jar file via /jars/upload[2] and then run it via > /jar/:jarId/run[3]. > > It > > > has the downside that it is impossible to set a Flink execution > > > configuration, and it will always take the underlying session cluster > > > configuration. > > > > > > I know changing the ClusterClient has already been discussed. It would > > > involve a lot of effort, so what do you think of making the jar > execution > > > more prominent via the rest endpoint by adding the option to pass an > > > execution configuration? > > > > > > Best, > > > Fabian > > > > > > [1] > > > > > > https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95 > > > [2] > > > > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > > < > > > > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > > > > > > [3] > > > > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng > |
To me creating the PackagedProgram on the client side is very bad, at least
for 2 things: 1. You must ensure to have almost the same classpath of the Flink cluster otherwise you can face problems in deserializing the submitted job graph (for example jackson automatically tries to create modules that can be found on the client classpath if using spring but not on the job manager...that's exactly what happened to me initially) 2. Also if you manage to create the PackagedProgram correctly, Job listeners are not fired So I ended up extending the RestClusterClient in order to use uploadJar + runJob..you can look at the extended class at [1]. Unfortunately I still have to understand how to understand if dynamic classloading il closed correctly or not by Job managers and Task managers because I suspect that Tasks are not finalized correctly as detected for Python at [2] [1] https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java [2] https://issues.apache.org/jira/browse/FLINK-20333 Best, Flavio On Wed, Dec 9, 2020 at 12:43 PM Yang Wang <[hidden email]> wrote: > Actually, I think the key point is that the Flink client is not friendly to > the deployers. > Most companies have their own deployers and I believe many of them depend > on > the cli commands(e.g. "flink run/run-application"). > > I am not sure whether using the rest cluster client is the best choice. But > we could > have an alternative as follows. > > # Set the configuration based on the deployment mode(session, perjob) > Configuration flinkConfig = new Configuration(); > ... ... > flinkConfig.set("execution.target", "kubernetes-session"); > flinkConfig.set("kubernetes.cluster-id", "my-flink-k8s-session"); > # Build a packaged program > PackagedProgram program = > PackagedProgram.newBuilder().setConfiguration(flinkConfig)...build(); > # Run the packaged program on the deployment. Maybe we also need to set the > Context. > program.invokeInteractiveModeForExecution(); > > In my opinion, the PackagedProgram is more appropriate for jar submission. > > > Best, > Yang > > > Arvid Heise <[hidden email]> 于2020年12月8日周二 下午9:39写道: > > > I'm surprised that this is not possible currently. Seems like a glaring > > missing feature to me. > > > > I'd assume the best way would be to extend the REST API to > /jar/:jarId/run > > with an option to overwrite configuration values. I'm not sure how to map > > json well to the yaml structure of the config, but I guess we mostly have > > simple key/value pairs anyways. > > > > On Tue, Dec 8, 2020 at 1:31 PM Till Rohrmann <[hidden email]> > wrote: > > > > > Hi Fabian, > > > > > > thanks for starting this discussion. In general I would be a bit > hesitant > > > to build upon Flink's web UI submission because it suffers from a > couple > > of > > > drawbacks. > > > > > > 1) The web UI submission only supports single job applications. > > > 2) The JobGraph is generated from within the web UI Netty thread. > Hence, > > if > > > the user code blocks, then this can make the web UI unresponsive. > > > 3) Uploaded jars are not persisted. Hence, if a JobManager failover > > occurs > > > between uploading and running the job, then you might have lost the > > > uploaded jars. > > > > > > The reason for some of these problems is that the feature was actually > > > implemented for some conference and almost remained untouched ever > since. > > > Building more functionality on top of it will mean that it will be > harder > > > to remove in the future. > > > > > > Cheers, > > > Till > > > > > > On Tue, Dec 8, 2020 at 12:00 PM Fabian Paul < > > [hidden email]> > > > wrote: > > > > > > > Hi all, > > > > > > > > Currently, the most convenient way of programmatically submitting a > job > > > to > > > > a running session cluster is using Flink’s RestClusterClient. > > > > Unfortunately, it is only supported, as of now, to submit a job > > graph.[1] > > > > To construct a job graph from a jar file, additional Flink > dependencies > > > are > > > > required, which is not ideal. > > > > > > > > It is also possible to directly use the Flink rest API and first > upload > > > > the jar file via /jars/upload[2] and then run it via > > /jar/:jarId/run[3]. > > > It > > > > has the downside that it is impossible to set a Flink execution > > > > configuration, and it will always take the underlying session cluster > > > > configuration. > > > > > > > > I know changing the ClusterClient has already been discussed. It > would > > > > involve a lot of effort, so what do you think of making the jar > > execution > > > > more prominent via the rest endpoint by adding the option to pass an > > > > execution configuration? > > > > > > > > Best, > > > > Fabian > > > > > > > > [1] > > > > > > > > > > https://github.com/apache/flink/blob/65cd385d7de504a946b17193aceea73b3c8e78a8/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L95 > > > > [2] > > > > > > > > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > > > < > > > > > > > > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > > > > > > > > [3] > > > > > > > > > > https://github.com/apache/flink/blob/c2972b6e336cc3b3a6cbd22c69a6710dab5246e6/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java#L56 > > > > > > > > > -- > > > > Arvid Heise | Senior Java Developer > > > > <https://www.ververica.com/> > > > > Follow us @VervericaData > > > > -- > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > > Conference > > > > Stream Processing | Event Driven | Real Time > > > > -- > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > -- > > Ververica GmbH > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > > (Toni) Cheng > > |
Free forum by Nabble | Edit this page |