Submitting job with savepoint through StreamExecutionEnvironment

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Submitting job with savepoint through StreamExecutionEnvironment

Thomas Weise
Hi,

Currently it is not possible to submit a job with savepoint restore option
through the execution environment. I found that while attempting to add the
support to the Flink runner in Beam (
https://issues.apache.org/jira/browse/BEAM-5396)

I also found https://issues.apache.org/jira/browse/FLINK-9644 - but is
there a plan to support restore from savepoint
through StreamExecutionEnvironment in general?

Thanks,
Thomas
Reply | Threaded
Open this post in threaded view
|

Re: Submitting job with savepoint through StreamExecutionEnvironment

Chesnay Schepler-3
I'm not aware of any plans to expose this in the StreamExecutionEnvironment.

The issue would be that we would start mixing submission details with
the job definition, which results in redundancy and weird semantics,
e.g., which savepoint configuration takes priority if both a job and CLI
job submission specify it?

Looking at the Beam JIRA it would be sufficient to have this in the
RemoteStreamEnvironment (which would be /less /problematic since the
issue above is baked into this class anyway), however I would recommend
migrating to a ClusterClient for these use-cases.

On 29.11.2018 08:18, Thomas Weise wrote:

> Hi,
>
> Currently it is not possible to submit a job with savepoint restore option
> through the execution environment. I found that while attempting to add the
> support to the Flink runner in Beam (
> https://issues.apache.org/jira/browse/BEAM-5396)
>
> I also found https://issues.apache.org/jira/browse/FLINK-9644 - but is
> there a plan to support restore from savepoint
> through StreamExecutionEnvironment in general?
>
> Thanks,
> Thomas
>

Reply | Threaded
Open this post in threaded view
|

Re: Submitting job with savepoint through StreamExecutionEnvironment

Thomas Weise
Thanks for taking a look.

Are you saying that the longer term direction is to get rid of the execute
method from StreamExecutionEnvironment and instead construct the cluster
client outside?

That would currently expose even more internals to the user. Considering
the current implementation in RemoteStreamEnvironment:

@Override
public JobExecutionResult execute(String jobName) throws
ProgramInvocationException {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
return executeRemotely(streamGraph, jarFiles);
}

We would use
env.getStreamGraph().getJobGraph().setSavepointRestoreSettings(..) in the
Beam code and then use the cluster client directly.

If we wanted to keep this hidden from users, we could add
setSavePointRestoreSettings to RemoteStreamEnvironment and
LocalStreamEnvironment and deal with it internally.

Alternatively, the remote environment could serve just as cluster client
factory.

WDYT?


On Thu, Nov 29, 2018 at 2:35 AM Chesnay Schepler <[hidden email]> wrote:

> I'm not aware of any plans to expose this in the
> StreamExecutionEnvironment.
>
> The issue would be that we would start mixing submission details with the
> job definition, which results in redundancy and weird semantics, e.g.,
> which savepoint configuration takes priority if both a job and CLI job
> submission specify it?
>
> Looking at the Beam JIRA it would be sufficient to have this in the
> RemoteStreamEnvironment (which would be *less *problematic since the
> issue above is baked into this class anyway), however I would recommend
> migrating to a ClusterClient for these use-cases.
>
> On 29.11.2018 08:18, Thomas Weise wrote:
>
> Hi,
>
> Currently it is not possible to submit a job with savepoint restore option
> through the execution environment. I found that while attempting to add the
> support to the Flink runner in Beam (https://issues.apache.org/jira/browse/BEAM-5396)
>
> I also found https://issues.apache.org/jira/browse/FLINK-9644 - but is
> there a plan to support restore from savepoint
> through StreamExecutionEnvironment in general?
>
> Thanks,
> Thomas
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Submitting job with savepoint through StreamExecutionEnvironment

Chesnay Schepler-3
I'm only voicing my opinion here; these do not reflect in any way
long-term directions.

I wouldn't remove the execute() method; it's too important for a
convenient execution of jobs via the CLI/WebUI.

But I would like to get rid of this distinction of environments as their
existence implies that there is not one way to /write/ a Flink job as
they can differ in the environment they use, which affects whether you
can even run them via the CLI.

Have you ever tried setting up a jar that you can both run via the CLI
on the associated cluster, but also in the IDE on said cluster? You
would need 2 entry points, which create the environment and pass this
environment to your job-defining method. This is quite different from a
normal job, where the environment is created right before you define
your job.
Yet at the same time you can write a job that simply uses a
StreamExecutionEnvironment, that can run locally in the IDE, or can be
submitted to the CLI.
This just seems highly inconsistent to me.

There are some ways how one could deal with this; for example in our
tests we basically inject a job executor into the default
StreamExecutionEnvironment. One could also resort to a approach that
uses system properties to determine how they should be executed.
However, I haven't thought about these thoroughly.

Regardless if you were to use the RestClusterClient explicitly,
then yes, currently you would access rather obscure semi-internal code.
(that is very much subject to change)

But encapsulating this into a Execution.executeRemotely(env, host, port,
savepointRestoreSettings) method (as a replacement for execute()) would
be feasible imo.

On 29.11.2018 17:00, Thomas Weise wrote:

> Thanks for taking a look.
>
> Are you saying that the longer term direction is to get rid of the
> execute method from StreamExecutionEnvironment and instead construct
> the cluster client outside?
>
> That would currently expose even more internals to the user.
> Considering the current implementation in RemoteStreamEnvironment:
>
> @Override
> public JobExecutionResult execute(String jobName) throws
> ProgramInvocationException {
> StreamGraph streamGraph = getStreamGraph();
> streamGraph.setJobName(jobName);
> transformations.clear();
> return executeRemotely(streamGraph, jarFiles);
> }
>
> We would use
> env.getStreamGraph().getJobGraph().setSavepointRestoreSettings(..) in
> the Beam code and then use the cluster client directly.
>
> If we wanted to keep this hidden from users, we could add
> setSavePointRestoreSettings to RemoteStreamEnvironment and
> LocalStreamEnvironment and deal with it internally.
>
> Alternatively, the remote environment could serve just as cluster
> client factory.
>
> WDYT?
>
>
> On Thu, Nov 29, 2018 at 2:35 AM Chesnay Schepler <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I'm not aware of any plans to expose this in the
>     StreamExecutionEnvironment.
>
>     The issue would be that we would start mixing submission details
>     with the job definition, which results in redundancy and weird
>     semantics, e.g., which savepoint configuration takes priority if
>     both a job and CLI job submission specify it?
>
>     Looking at the Beam JIRA it would be sufficient to have this in
>     the RemoteStreamEnvironment (which would be /less /problematic
>     since the issue above is baked into this class anyway), however I
>     would recommend migrating to a ClusterClient for these use-cases.
>
>     On 29.11.2018 08:18, Thomas Weise wrote:
>>     Hi,
>>
>>     Currently it is not possible to submit a job with savepoint restore option
>>     through the execution environment. I found that while attempting to add the
>>     support to the Flink runner in Beam (
>>     https://issues.apache.org/jira/browse/BEAM-5396)
>>
>>     I also foundhttps://issues.apache.org/jira/browse/FLINK-9644  - but is
>>     there a plan to support restore from savepoint
>>     through StreamExecutionEnvironment in general?
>>
>>     Thanks,
>>     Thomas
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Submitting job with savepoint through StreamExecutionEnvironment

Thomas Weise
I agree with your take regarding superficial stream environment distinction
and the difficulties that introduces for users.

To fix the immediate issue in Beam, it was necessary to duplicate
RemoteStreamEnvironment.executeRemotely

https://github.com/apache/beam/pull/7169/files#diff-6acb0479d563cfc121ac04e789f4bc6dR294

To address this in Flink, it would make sense to turn that piece of code
into a utility method that can be used directly and from
RemoteStreamEnvironment.execute for compatibility.

If there isn't any other feedback then I would create a JIRA and work on
this.

Thanks,
Thomas

On Thu, Nov 29, 2018 at 10:00 AM Chesnay Schepler <[hidden email]>
wrote:

> I'm only voicing my opinion here; these do not reflect in any way
> long-term directions.
>
> I wouldn't remove the execute() method; it's too important for a
> convenient execution of jobs via the CLI/WebUI.
>
> But I would like to get rid of this distinction of environments as their
> existence implies that there is not one way to /write/ a Flink job as
> they can differ in the environment they use, which affects whether you
> can even run them via the CLI.
>
> Have you ever tried setting up a jar that you can both run via the CLI
> on the associated cluster, but also in the IDE on said cluster? You
> would need 2 entry points, which create the environment and pass this
> environment to your job-defining method. This is quite different from a
> normal job, where the environment is created right before you define
> your job.
> Yet at the same time you can write a job that simply uses a
> StreamExecutionEnvironment, that can run locally in the IDE, or can be
> submitted to the CLI.
> This just seems highly inconsistent to me.
>
> There are some ways how one could deal with this; for example in our
> tests we basically inject a job executor into the default
> StreamExecutionEnvironment. One could also resort to a approach that
> uses system properties to determine how they should be executed.
> However, I haven't thought about these thoroughly.
>
> Regardless if you were to use the RestClusterClient explicitly,
> then yes, currently you would access rather obscure semi-internal code.
> (that is very much subject to change)
>
> But encapsulating this into a Execution.executeRemotely(env, host, port,
> savepointRestoreSettings) method (as a replacement for execute()) would
> be feasible imo.
>
> On 29.11.2018 17:00, Thomas Weise wrote:
> > Thanks for taking a look.
> >
> > Are you saying that the longer term direction is to get rid of the
> > execute method from StreamExecutionEnvironment and instead construct
> > the cluster client outside?
> >
> > That would currently expose even more internals to the user.
> > Considering the current implementation in RemoteStreamEnvironment:
> >
> > @Override
> > public JobExecutionResult execute(String jobName) throws
> > ProgramInvocationException {
> > StreamGraph streamGraph = getStreamGraph();
> > streamGraph.setJobName(jobName);
> > transformations.clear();
> > return executeRemotely(streamGraph, jarFiles);
> > }
> >
> > We would use
> > env.getStreamGraph().getJobGraph().setSavepointRestoreSettings(..) in
> > the Beam code and then use the cluster client directly.
> >
> > If we wanted to keep this hidden from users, we could add
> > setSavePointRestoreSettings to RemoteStreamEnvironment and
> > LocalStreamEnvironment and deal with it internally.
> >
> > Alternatively, the remote environment could serve just as cluster
> > client factory.
> >
> > WDYT?
> >
> >
> > On Thu, Nov 29, 2018 at 2:35 AM Chesnay Schepler <[hidden email]
> > <mailto:[hidden email]>> wrote:
> >
> >     I'm not aware of any plans to expose this in the
> >     StreamExecutionEnvironment.
> >
> >     The issue would be that we would start mixing submission details
> >     with the job definition, which results in redundancy and weird
> >     semantics, e.g., which savepoint configuration takes priority if
> >     both a job and CLI job submission specify it?
> >
> >     Looking at the Beam JIRA it would be sufficient to have this in
> >     the RemoteStreamEnvironment (which would be /less /problematic
> >     since the issue above is baked into this class anyway), however I
> >     would recommend migrating to a ClusterClient for these use-cases.
> >
> >     On 29.11.2018 08:18, Thomas Weise wrote:
> >>     Hi,
> >>
> >>     Currently it is not possible to submit a job with savepoint restore
> option
> >>     through the execution environment. I found that while attempting to
> add the
> >>     support to the Flink runner in Beam (
> >>     https://issues.apache.org/jira/browse/BEAM-5396)
> >>
> >>     I also foundhttps://issues.apache.org/jira/browse/FLINK-9644  -
> but is
> >>     there a plan to support restore from savepoint
> >>     through StreamExecutionEnvironment in general?
> >>
> >>     Thanks,
> >>     Thomas
> >>
> >
>
>