Caveats of multi-execute() Flink programs

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Caveats of multi-execute() Flink programs

Aljoscha Krettek-2
Hi,

I'm trying to stop derailing the discussion in FLINK-14807 [1], that I
started but didn't want to go that far. Let's try and continue that here
if we need to.

So far, the timeline is this:

Stephan:
Have we ever considered putting the job name onto the Environment,
rather than in "execute()" or "collect()"?

Aljoscha:
I think tying the job name to the environment would be problematic. I
have some thoughts on this but they span multiple things and are a bit
more general than this issue. I'll post them here nevertheless:

One environment can spawn multiple jobs if you call execute() multiple
times. For batch jobs, this is sometimes not a problem. It becomes a
problem when the component (or user) that runs the Flink program expects
there to be only one job. For example, if you bin/flink run
--fromSavepoint, which execute() should "pick up" the savepoint.
Currently, it will be the first execute call that happens, this might or
might not work depending on whether it's the right savepoint for that
one. Subsequent execute() calls will also try and restore from that
savepoint which, again, might or might not fail.

Another scenario where this will be problematic is "driver mode", or a
mode where we run the main() method on the "JobManager", for example in
the per-job standalone entrypoint or potential future other modes where
the main() method is run in the cluster.

In general, I now think that the "execute-style" of writing jobs does
not work well for streaming programs and we might have to re-introduce
an interface like

interface FlinkJob {
   Pipeline getPipeline();
}
for streaming scenarios.

Kostas:
I have to think about the whole issue more, but definitely an interface
like the one Aljoscha Krettek described (like the old and now deleted
Program) could at least make easier a lot of things. Currently, the only
"entrypoint" of the framework to the user code is the execute() method
and this often limits our alternatives for implementing things.

Stephan:
I would like to understand this a bit more. Having two ways of doing
things is always tricky - more complexity for maintainers, harder to
understand for users, etc.

Given the savepoint resuming - fair enough, I see that this is something
that has subtle semantics. Since that is a parameter on the executor
(the environment), would it be fair to say always the first execution
uses that savepoint? If you want the driver to have independent jobs
resuming from the same savepoint, you need different environments. That
sounds like quite well defined behavior.

For the "driver mode" or the "run main() in cluster", I don't fully
understand the issues. This should work the same way that the often
discussed "library mode" works. One thing that has caused frequent
confusion about these issues here is the assumption that somehow the
"execute()" method needs to produce a job graph that can be passed to
another component that was previously started. I think that is not
necessary, the "execute()" method would just inside create a JobMaster
(against a ResourceManager that is created in the environment) and block
while this JobMaster is executing.

Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-14807
Reply | Threaded
Open this post in threaded view
|

Re: Caveats of multi-execute() Flink programs

Aljoscha Krettek-2
@Stephan: yes, I agree about how embedded (and driver) mode should work.
I have a PoC from last year that does just that:
https://github.com/aljoscha/flink/commit/ba0931cd307c5e7e68e903318e116ed550bb20eb#diff-7752e46061b390a7a71328dac4be54ff

But that doesn't solve the weirdness around specifying a savepoint on
the CLI. Passing multiple savepoints to individual environments would be
necessary but I don't know what would be a good solution.

To me, it feels like multi-execute() only makes sense for batch programs.

Best,
Aljoscha

On 23.01.20 17:03, Aljoscha Krettek wrote:

> Hi,
>
> I'm trying to stop derailing the discussion in FLINK-14807 [1], that I
> started but didn't want to go that far. Let's try and continue that here
> if we need to.
>
> So far, the timeline is this:
>
> Stephan:
> Have we ever considered putting the job name onto the Environment,
> rather than in "execute()" or "collect()"?
>
> Aljoscha:
> I think tying the job name to the environment would be problematic. I
> have some thoughts on this but they span multiple things and are a bit
> more general than this issue. I'll post them here nevertheless:
>
> One environment can spawn multiple jobs if you call execute() multiple
> times. For batch jobs, this is sometimes not a problem. It becomes a
> problem when the component (or user) that runs the Flink program expects
> there to be only one job. For example, if you bin/flink run
> --fromSavepoint, which execute() should "pick up" the savepoint.
> Currently, it will be the first execute call that happens, this might or
> might not work depending on whether it's the right savepoint for that
> one. Subsequent execute() calls will also try and restore from that
> savepoint which, again, might or might not fail.
>
> Another scenario where this will be problematic is "driver mode", or a
> mode where we run the main() method on the "JobManager", for example in
> the per-job standalone entrypoint or potential future other modes where
> the main() method is run in the cluster.
>
> In general, I now think that the "execute-style" of writing jobs does
> not work well for streaming programs and we might have to re-introduce
> an interface like
>
> interface FlinkJob {
>    Pipeline getPipeline();
> }
> for streaming scenarios.
>
> Kostas:
> I have to think about the whole issue more, but definitely an interface
> like the one Aljoscha Krettek described (like the old and now deleted
> Program) could at least make easier a lot of things. Currently, the only
> "entrypoint" of the framework to the user code is the execute() method
> and this often limits our alternatives for implementing things.
>
> Stephan:
> I would like to understand this a bit more. Having two ways of doing
> things is always tricky - more complexity for maintainers, harder to
> understand for users, etc.
>
> Given the savepoint resuming - fair enough, I see that this is something
> that has subtle semantics. Since that is a parameter on the executor
> (the environment), would it be fair to say always the first execution
> uses that savepoint? If you want the driver to have independent jobs
> resuming from the same savepoint, you need different environments. That
> sounds like quite well defined behavior.
>
> For the "driver mode" or the "run main() in cluster", I don't fully
> understand the issues. This should work the same way that the often
> discussed "library mode" works. One thing that has caused frequent
> confusion about these issues here is the assumption that somehow the
> "execute()" method needs to produce a job graph that can be passed to
> another component that was previously started. I think that is not
> necessary, the "execute()" method would just inside create a JobMaster
> (against a ResourceManager that is created in the environment) and block
> while this JobMaster is executing.
>
> Best,
> Aljoscha
>
> [1] https://issues.apache.org/jira/browse/FLINK-14807
Reply | Threaded
Open this post in threaded view
|

Re: Caveats of multi-execute() Flink programs

Kostas Kloudas-4
Hi all,

I agree with Stephan that it is reasonable to assume that multiple
independent jobs should not be in the same "main()" or  "application"
or "program" or environment.

But if/when dataStream subsumes dataSet, then batch programs will also
have checkpoints/savepoints and in that case it is not unreasonable to
assume
that users will write programs that will have a flow like:

res = statefulJobA.collect()
if (condition in statefulJobA)
    statefulJobB
else
    statefulJobC

Here there is an explicit "happens-before" dependency but still a
savepoint, or even a checkpoint, should know where it belongs so that,
e.g. Flink knows that
jobA is done and we are executing B or C.

In addition, I think that to some extent, the decision we make on
seeing the above a single job (one jobId) or many jobs (multiple
jobIds), will have implications on
how the user can interact with it through the client.

Cheers,
Kostas

On Thu, Jan 23, 2020 at 5:11 PM Aljoscha Krettek <[hidden email]> wrote:

>
> @Stephan: yes, I agree about how embedded (and driver) mode should work.
> I have a PoC from last year that does just that:
> https://github.com/aljoscha/flink/commit/ba0931cd307c5e7e68e903318e116ed550bb20eb#diff-7752e46061b390a7a71328dac4be54ff
>
> But that doesn't solve the weirdness around specifying a savepoint on
> the CLI. Passing multiple savepoints to individual environments would be
> necessary but I don't know what would be a good solution.
>
> To me, it feels like multi-execute() only makes sense for batch programs.
>
> Best,
> Aljoscha
>
> On 23.01.20 17:03, Aljoscha Krettek wrote:
> > Hi,
> >
> > I'm trying to stop derailing the discussion in FLINK-14807 [1], that I
> > started but didn't want to go that far. Let's try and continue that here
> > if we need to.
> >
> > So far, the timeline is this:
> >
> > Stephan:
> > Have we ever considered putting the job name onto the Environment,
> > rather than in "execute()" or "collect()"?
> >
> > Aljoscha:
> > I think tying the job name to the environment would be problematic. I
> > have some thoughts on this but they span multiple things and are a bit
> > more general than this issue. I'll post them here nevertheless:
> >
> > One environment can spawn multiple jobs if you call execute() multiple
> > times. For batch jobs, this is sometimes not a problem. It becomes a
> > problem when the component (or user) that runs the Flink program expects
> > there to be only one job. For example, if you bin/flink run
> > --fromSavepoint, which execute() should "pick up" the savepoint.
> > Currently, it will be the first execute call that happens, this might or
> > might not work depending on whether it's the right savepoint for that
> > one. Subsequent execute() calls will also try and restore from that
> > savepoint which, again, might or might not fail.
> >
> > Another scenario where this will be problematic is "driver mode", or a
> > mode where we run the main() method on the "JobManager", for example in
> > the per-job standalone entrypoint or potential future other modes where
> > the main() method is run in the cluster.
> >
> > In general, I now think that the "execute-style" of writing jobs does
> > not work well for streaming programs and we might have to re-introduce
> > an interface like
> >
> > interface FlinkJob {
> >    Pipeline getPipeline();
> > }
> > for streaming scenarios.
> >
> > Kostas:
> > I have to think about the whole issue more, but definitely an interface
> > like the one Aljoscha Krettek described (like the old and now deleted
> > Program) could at least make easier a lot of things. Currently, the only
> > "entrypoint" of the framework to the user code is the execute() method
> > and this often limits our alternatives for implementing things.
> >
> > Stephan:
> > I would like to understand this a bit more. Having two ways of doing
> > things is always tricky - more complexity for maintainers, harder to
> > understand for users, etc.
> >
> > Given the savepoint resuming - fair enough, I see that this is something
> > that has subtle semantics. Since that is a parameter on the executor
> > (the environment), would it be fair to say always the first execution
> > uses that savepoint? If you want the driver to have independent jobs
> > resuming from the same savepoint, you need different environments. That
> > sounds like quite well defined behavior.
> >
> > For the "driver mode" or the "run main() in cluster", I don't fully
> > understand the issues. This should work the same way that the often
> > discussed "library mode" works. One thing that has caused frequent
> > confusion about these issues here is the assumption that somehow the
> > "execute()" method needs to produce a job graph that can be passed to
> > another component that was previously started. I think that is not
> > necessary, the "execute()" method would just inside create a JobMaster
> > (against a ResourceManager that is created in the environment) and block
> > while this JobMaster is executing.
> >
> > Best,
> > Aljoscha
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-14807