Hi,
I'm trying to stop derailing the discussion in FLINK-14807 [1], that I started but didn't want to go that far. Let's try and continue that here if we need to. So far, the timeline is this: Stephan: Have we ever considered putting the job name onto the Environment, rather than in "execute()" or "collect()"? Aljoscha: I think tying the job name to the environment would be problematic. I have some thoughts on this but they span multiple things and are a bit more general than this issue. I'll post them here nevertheless: One environment can spawn multiple jobs if you call execute() multiple times. For batch jobs, this is sometimes not a problem. It becomes a problem when the component (or user) that runs the Flink program expects there to be only one job. For example, if you bin/flink run --fromSavepoint, which execute() should "pick up" the savepoint. Currently, it will be the first execute call that happens, this might or might not work depending on whether it's the right savepoint for that one. Subsequent execute() calls will also try and restore from that savepoint which, again, might or might not fail. Another scenario where this will be problematic is "driver mode", or a mode where we run the main() method on the "JobManager", for example in the per-job standalone entrypoint or potential future other modes where the main() method is run in the cluster. In general, I now think that the "execute-style" of writing jobs does not work well for streaming programs and we might have to re-introduce an interface like interface FlinkJob { Pipeline getPipeline(); } for streaming scenarios. Kostas: I have to think about the whole issue more, but definitely an interface like the one Aljoscha Krettek described (like the old and now deleted Program) could at least make easier a lot of things. Currently, the only "entrypoint" of the framework to the user code is the execute() method and this often limits our alternatives for implementing things. Stephan: I would like to understand this a bit more. Having two ways of doing things is always tricky - more complexity for maintainers, harder to understand for users, etc. Given the savepoint resuming - fair enough, I see that this is something that has subtle semantics. Since that is a parameter on the executor (the environment), would it be fair to say always the first execution uses that savepoint? If you want the driver to have independent jobs resuming from the same savepoint, you need different environments. That sounds like quite well defined behavior. For the "driver mode" or the "run main() in cluster", I don't fully understand the issues. This should work the same way that the often discussed "library mode" works. One thing that has caused frequent confusion about these issues here is the assumption that somehow the "execute()" method needs to produce a job graph that can be passed to another component that was previously started. I think that is not necessary, the "execute()" method would just inside create a JobMaster (against a ResourceManager that is created in the environment) and block while this JobMaster is executing. Best, Aljoscha [1] https://issues.apache.org/jira/browse/FLINK-14807 |
@Stephan: yes, I agree about how embedded (and driver) mode should work.
I have a PoC from last year that does just that: https://github.com/aljoscha/flink/commit/ba0931cd307c5e7e68e903318e116ed550bb20eb#diff-7752e46061b390a7a71328dac4be54ff But that doesn't solve the weirdness around specifying a savepoint on the CLI. Passing multiple savepoints to individual environments would be necessary but I don't know what would be a good solution. To me, it feels like multi-execute() only makes sense for batch programs. Best, Aljoscha On 23.01.20 17:03, Aljoscha Krettek wrote: > Hi, > > I'm trying to stop derailing the discussion in FLINK-14807 [1], that I > started but didn't want to go that far. Let's try and continue that here > if we need to. > > So far, the timeline is this: > > Stephan: > Have we ever considered putting the job name onto the Environment, > rather than in "execute()" or "collect()"? > > Aljoscha: > I think tying the job name to the environment would be problematic. I > have some thoughts on this but they span multiple things and are a bit > more general than this issue. I'll post them here nevertheless: > > One environment can spawn multiple jobs if you call execute() multiple > times. For batch jobs, this is sometimes not a problem. It becomes a > problem when the component (or user) that runs the Flink program expects > there to be only one job. For example, if you bin/flink run > --fromSavepoint, which execute() should "pick up" the savepoint. > Currently, it will be the first execute call that happens, this might or > might not work depending on whether it's the right savepoint for that > one. Subsequent execute() calls will also try and restore from that > savepoint which, again, might or might not fail. > > Another scenario where this will be problematic is "driver mode", or a > mode where we run the main() method on the "JobManager", for example in > the per-job standalone entrypoint or potential future other modes where > the main() method is run in the cluster. > > In general, I now think that the "execute-style" of writing jobs does > not work well for streaming programs and we might have to re-introduce > an interface like > > interface FlinkJob { > Pipeline getPipeline(); > } > for streaming scenarios. > > Kostas: > I have to think about the whole issue more, but definitely an interface > like the one Aljoscha Krettek described (like the old and now deleted > Program) could at least make easier a lot of things. Currently, the only > "entrypoint" of the framework to the user code is the execute() method > and this often limits our alternatives for implementing things. > > Stephan: > I would like to understand this a bit more. Having two ways of doing > things is always tricky - more complexity for maintainers, harder to > understand for users, etc. > > Given the savepoint resuming - fair enough, I see that this is something > that has subtle semantics. Since that is a parameter on the executor > (the environment), would it be fair to say always the first execution > uses that savepoint? If you want the driver to have independent jobs > resuming from the same savepoint, you need different environments. That > sounds like quite well defined behavior. > > For the "driver mode" or the "run main() in cluster", I don't fully > understand the issues. This should work the same way that the often > discussed "library mode" works. One thing that has caused frequent > confusion about these issues here is the assumption that somehow the > "execute()" method needs to produce a job graph that can be passed to > another component that was previously started. I think that is not > necessary, the "execute()" method would just inside create a JobMaster > (against a ResourceManager that is created in the environment) and block > while this JobMaster is executing. > > Best, > Aljoscha > > [1] https://issues.apache.org/jira/browse/FLINK-14807 |
Hi all,
I agree with Stephan that it is reasonable to assume that multiple independent jobs should not be in the same "main()" or "application" or "program" or environment. But if/when dataStream subsumes dataSet, then batch programs will also have checkpoints/savepoints and in that case it is not unreasonable to assume that users will write programs that will have a flow like: res = statefulJobA.collect() if (condition in statefulJobA) statefulJobB else statefulJobC Here there is an explicit "happens-before" dependency but still a savepoint, or even a checkpoint, should know where it belongs so that, e.g. Flink knows that jobA is done and we are executing B or C. In addition, I think that to some extent, the decision we make on seeing the above a single job (one jobId) or many jobs (multiple jobIds), will have implications on how the user can interact with it through the client. Cheers, Kostas On Thu, Jan 23, 2020 at 5:11 PM Aljoscha Krettek <[hidden email]> wrote: > > @Stephan: yes, I agree about how embedded (and driver) mode should work. > I have a PoC from last year that does just that: > https://github.com/aljoscha/flink/commit/ba0931cd307c5e7e68e903318e116ed550bb20eb#diff-7752e46061b390a7a71328dac4be54ff > > But that doesn't solve the weirdness around specifying a savepoint on > the CLI. Passing multiple savepoints to individual environments would be > necessary but I don't know what would be a good solution. > > To me, it feels like multi-execute() only makes sense for batch programs. > > Best, > Aljoscha > > On 23.01.20 17:03, Aljoscha Krettek wrote: > > Hi, > > > > I'm trying to stop derailing the discussion in FLINK-14807 [1], that I > > started but didn't want to go that far. Let's try and continue that here > > if we need to. > > > > So far, the timeline is this: > > > > Stephan: > > Have we ever considered putting the job name onto the Environment, > > rather than in "execute()" or "collect()"? > > > > Aljoscha: > > I think tying the job name to the environment would be problematic. I > > have some thoughts on this but they span multiple things and are a bit > > more general than this issue. I'll post them here nevertheless: > > > > One environment can spawn multiple jobs if you call execute() multiple > > times. For batch jobs, this is sometimes not a problem. It becomes a > > problem when the component (or user) that runs the Flink program expects > > there to be only one job. For example, if you bin/flink run > > --fromSavepoint, which execute() should "pick up" the savepoint. > > Currently, it will be the first execute call that happens, this might or > > might not work depending on whether it's the right savepoint for that > > one. Subsequent execute() calls will also try and restore from that > > savepoint which, again, might or might not fail. > > > > Another scenario where this will be problematic is "driver mode", or a > > mode where we run the main() method on the "JobManager", for example in > > the per-job standalone entrypoint or potential future other modes where > > the main() method is run in the cluster. > > > > In general, I now think that the "execute-style" of writing jobs does > > not work well for streaming programs and we might have to re-introduce > > an interface like > > > > interface FlinkJob { > > Pipeline getPipeline(); > > } > > for streaming scenarios. > > > > Kostas: > > I have to think about the whole issue more, but definitely an interface > > like the one Aljoscha Krettek described (like the old and now deleted > > Program) could at least make easier a lot of things. Currently, the only > > "entrypoint" of the framework to the user code is the execute() method > > and this often limits our alternatives for implementing things. > > > > Stephan: > > I would like to understand this a bit more. Having two ways of doing > > things is always tricky - more complexity for maintainers, harder to > > understand for users, etc. > > > > Given the savepoint resuming - fair enough, I see that this is something > > that has subtle semantics. Since that is a parameter on the executor > > (the environment), would it be fair to say always the first execution > > uses that savepoint? If you want the driver to have independent jobs > > resuming from the same savepoint, you need different environments. That > > sounds like quite well defined behavior. > > > > For the "driver mode" or the "run main() in cluster", I don't fully > > understand the issues. This should work the same way that the often > > discussed "library mode" works. One thing that has caused frequent > > confusion about these issues here is the assumption that somehow the > > "execute()" method needs to produce a job graph that can be passed to > > another component that was previously started. I think that is not > > necessary, the "execute()" method would just inside create a JobMaster > > (against a ResourceManager that is created in the environment) and block > > while this JobMaster is executing. > > > > Best, > > Aljoscha > > > > [1] https://issues.apache.org/jira/browse/FLINK-14807 |
Free forum by Nabble | Edit this page |