[DISCUSS] FLIP-73: Introducing Executors for job submission

classic Classic list List threaded Threaded
35 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

Kostas Kloudas-5
Hi again,

I did not include this to my previous email, as this is related to the
proposal on the FLIP itself.

In the existing proposal, the Executor interface is the following.

public interface Executor {

  JobExecutionResult execute(Pipeline pipeline) throws Exception;

}

This implies that all the necessary information for the execution of a
Pipeline should be included in the Configuration passed in the
ExecutorFactory which instantiates the Executor itself. This should
include, for example, all the parameters currently supplied by the
ProgramOptions, which are conceptually not executor parameters but
rather parameters for the execution of the specific pipeline. To this
end, I would like to propose a change in the current Executor
interface showcased below:


public interface Executor {

  JobExecutionResult execute(Pipeline pipeline, Configuration
executionOptions) throws Exception;

}

The above will allow to have the Executor specific options passed in
the configuration given during executor instantiation, while the
pipeline specific options can be passed in the executionOptions. As a
positive side-effect, this will make Executors re-usable, i.e.
instantiate an executor and use it to execute multiple pipelines, if
in the future we choose to do so.

Let me know what do you think,
Kostas

On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]> wrote:

>
> Hi all,
>
> I agree with Tison that we should disentangle threads so that people
> can work independently.
>
> For FLIP-73:
>  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> Executors work, as they are using the exexute() method because this is
> the only "entry" to the user program. To this regard, I believe we
> should just see the fact that they have their dedicated environment as
> an "implementation detail".
>  - for getting rid of the per-job mode: as a first note, there was
> already a discussion here:
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> with many people, including myself, expressing their opinion. I am
> mentioning that to show that this topic already has some history and
> the discussin does not start from scratch but there are already some
> contradicting opinions. My opinion is that we should not get rid of
> the per-job mode but I agree that we should discuss about the
> semantics in more detail. Although in terms of code it may be tempting
> to "merge" the two submission modes, one of the main benefits of the
> per-job mode is isolation, both for resources and security, as the
> jobGraph to be executed is fixed and the cluster is "locked" just for
> that specific graph. This would be violated by having a session
> cluster launched and having all the infrastrucutre (ports and
> endpoints) set for submittting to that cluster any job.
> - for getting rid of the "detached" mode: I agree with getting rid of
> it but this implies some potential user-facing changes that should be
> discussed.
>
> Given the above, I think that:
> 1) in the context of FLIP-73 we should not change any semantics but
> simply push the existing submission logic behind a reusable
> abstraction and make it usable via public APIs, as Aljoscha said.
> 2) as Till said, changing the semantics is beyond the scope of this
> FLIP and as Tison mentioned we should work towards decoupling
> discussions rather than the opposite. So let's discuss about the
> future of the per-job and detached modes in a separate thread. This
> will also allow to give the proper visibility to such an important
> topic.
>
> Cheers,
> Kostas
>
> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]> wrote:
> >
> > Thanks for your thoughts Aljoscha.
> >
> > Another question since FLIP-73 might contains refactors on Environemnt:
> > shall we support
> > something like PreviewPlanEnvironment? If so, how? From a user perspective
> > preview plan
> > is useful, by give visual view, to modify topos and configure without
> > submit it.
> >
> > Best,
> > tison.
> >
> >
> > Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
> >
> > > I agree with Till that we should not change the semantics of per-job mode.
> > > In my opinion per-job mode means that the cluster (JobManager) is brought
> > > up with one job and it only executes that one job. There should be no open
> > > ports/anything that would allow submitting further jobs. This is very
> > > important for deployments in docker/Kubernetes or other environments were
> > > you bring up jobs without necessarily having the notion of a Flink cluster.
> > >
> > > What this means for a user program that has multiple execute() calls is
> > > that you will get a fresh cluster for each execute call. This also means,
> > > that further execute() calls will only happen if the “client” is still
> > > alive, because it is the one driving execution. Currently, this only works
> > > if you start the job in “attached” mode. If you start in “detached” mode
> > > only the first execute() will happen and the rest will be ignored.
> > >
> > > This brings us to the tricky question about what to do about “detached”
> > > and “attached”. In the long run, I would like to get rid of the distinction
> > > and leave it up to the user program, by either blocking or not on the
> > > Future (or JobClient or whatnot) that job submission returns. This,
> > > however, means that users cannot simply request “detached” execution when
> > > using bin/flink, the user program has to “play along”. On the other hand,
> > > “detached” mode is quite strange for the user program. The execute() call
> > > either returns with a proper job result after the job ran (in “attached”
> > > mode) or with a dummy result (in “detached” mode) right after submission. I
> > > think this can even lead to weird cases where multiple "execute()” run in
> > > parallel. For per-job detached mode we also “throw” out of the first
> > > execute so the rest (including result processing logic) is ignored.
> > >
> > > For this here FLIP-73 we can (and should) ignore these problems, because
> > > FLIP-73 only moves the existing submission logic behind a reusable
> > > abstraction and makes it usable via API. We should closely follow up on the
> > > above points though because I think they are also important.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]> wrote:
> > > >
> > > > Thanks for your clarification Till.
> > > >
> > > > I agree with the current semantics of the per-job mode, one should
> > > deploy a
> > > > new cluster for each part of the job. Apart from the performance concern
> > > > it also means that PerJobExecutor knows how to deploy a cluster actually,
> > > > which is different from the description that Executor submit a job.
> > > >
> > > > Anyway it sounds workable and narrow the changes.
> > >
> > >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

tison
Hi Kostas,

It seems does no harm we have a configuration parameter of Executor#execute
since we can merge this one with the one configured on Executor created and
let this one overwhelm that one.

I can see it is useful that conceptually we can create an Executor for a
series jobs
to the same cluster but with different job configuration per pipeline.

Best,
tison.


Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:

> Hi again,
>
> I did not include this to my previous email, as this is related to the
> proposal on the FLIP itself.
>
> In the existing proposal, the Executor interface is the following.
>
> public interface Executor {
>
>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
>
> }
>
> This implies that all the necessary information for the execution of a
> Pipeline should be included in the Configuration passed in the
> ExecutorFactory which instantiates the Executor itself. This should
> include, for example, all the parameters currently supplied by the
> ProgramOptions, which are conceptually not executor parameters but
> rather parameters for the execution of the specific pipeline. To this
> end, I would like to propose a change in the current Executor
> interface showcased below:
>
>
> public interface Executor {
>
>   JobExecutionResult execute(Pipeline pipeline, Configuration
> executionOptions) throws Exception;
>
> }
>
> The above will allow to have the Executor specific options passed in
> the configuration given during executor instantiation, while the
> pipeline specific options can be passed in the executionOptions. As a
> positive side-effect, this will make Executors re-usable, i.e.
> instantiate an executor and use it to execute multiple pipelines, if
> in the future we choose to do so.
>
> Let me know what do you think,
> Kostas
>
> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]> wrote:
> >
> > Hi all,
> >
> > I agree with Tison that we should disentangle threads so that people
> > can work independently.
> >
> > For FLIP-73:
> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> > Executors work, as they are using the exexute() method because this is
> > the only "entry" to the user program. To this regard, I believe we
> > should just see the fact that they have their dedicated environment as
> > an "implementation detail".
> >  - for getting rid of the per-job mode: as a first note, there was
> > already a discussion here:
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > with many people, including myself, expressing their opinion. I am
> > mentioning that to show that this topic already has some history and
> > the discussin does not start from scratch but there are already some
> > contradicting opinions. My opinion is that we should not get rid of
> > the per-job mode but I agree that we should discuss about the
> > semantics in more detail. Although in terms of code it may be tempting
> > to "merge" the two submission modes, one of the main benefits of the
> > per-job mode is isolation, both for resources and security, as the
> > jobGraph to be executed is fixed and the cluster is "locked" just for
> > that specific graph. This would be violated by having a session
> > cluster launched and having all the infrastrucutre (ports and
> > endpoints) set for submittting to that cluster any job.
> > - for getting rid of the "detached" mode: I agree with getting rid of
> > it but this implies some potential user-facing changes that should be
> > discussed.
> >
> > Given the above, I think that:
> > 1) in the context of FLIP-73 we should not change any semantics but
> > simply push the existing submission logic behind a reusable
> > abstraction and make it usable via public APIs, as Aljoscha said.
> > 2) as Till said, changing the semantics is beyond the scope of this
> > FLIP and as Tison mentioned we should work towards decoupling
> > discussions rather than the opposite. So let's discuss about the
> > future of the per-job and detached modes in a separate thread. This
> > will also allow to give the proper visibility to such an important
> > topic.
> >
> > Cheers,
> > Kostas
> >
> > On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]> wrote:
> > >
> > > Thanks for your thoughts Aljoscha.
> > >
> > > Another question since FLIP-73 might contains refactors on Environemnt:
> > > shall we support
> > > something like PreviewPlanEnvironment? If so, how? From a user
> perspective
> > > preview plan
> > > is useful, by give visual view, to modify topos and configure without
> > > submit it.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
> > >
> > > > I agree with Till that we should not change the semantics of per-job
> mode.
> > > > In my opinion per-job mode means that the cluster (JobManager) is
> brought
> > > > up with one job and it only executes that one job. There should be
> no open
> > > > ports/anything that would allow submitting further jobs. This is very
> > > > important for deployments in docker/Kubernetes or other environments
> were
> > > > you bring up jobs without necessarily having the notion of a Flink
> cluster.
> > > >
> > > > What this means for a user program that has multiple execute() calls
> is
> > > > that you will get a fresh cluster for each execute call. This also
> means,
> > > > that further execute() calls will only happen if the “client” is
> still
> > > > alive, because it is the one driving execution. Currently, this only
> works
> > > > if you start the job in “attached” mode. If you start in “detached”
> mode
> > > > only the first execute() will happen and the rest will be ignored.
> > > >
> > > > This brings us to the tricky question about what to do about
> “detached”
> > > > and “attached”. In the long run, I would like to get rid of the
> distinction
> > > > and leave it up to the user program, by either blocking or not on the
> > > > Future (or JobClient or whatnot) that job submission returns. This,
> > > > however, means that users cannot simply request “detached” execution
> when
> > > > using bin/flink, the user program has to “play along”. On the other
> hand,
> > > > “detached” mode is quite strange for the user program. The execute()
> call
> > > > either returns with a proper job result after the job ran (in
> “attached”
> > > > mode) or with a dummy result (in “detached” mode) right after
> submission. I
> > > > think this can even lead to weird cases where multiple "execute()”
> run in
> > > > parallel. For per-job detached mode we also “throw” out of the first
> > > > execute so the rest (including result processing logic) is ignored.
> > > >
> > > > For this here FLIP-73 we can (and should) ignore these problems,
> because
> > > > FLIP-73 only moves the existing submission logic behind a reusable
> > > > abstraction and makes it usable via API. We should closely follow up
> on the
> > > > above points though because I think they are also important.
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]> wrote:
> > > > >
> > > > > Thanks for your clarification Till.
> > > > >
> > > > > I agree with the current semantics of the per-job mode, one should
> > > > deploy a
> > > > > new cluster for each part of the job. Apart from the performance
> concern
> > > > > it also means that PerJobExecutor knows how to deploy a cluster
> actually,
> > > > > which is different from the description that Executor submit a job.
> > > > >
> > > > > Anyway it sounds workable and narrow the changes.
> > > >
> > > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

tison
BTW, correct me if I misunderstand, now I learn more about our community
way. Since FLIP-73 aimed at introducing an interface with community
consensus the discussion is more about the interface in order to properly
define a useful and extensible API. The integration story could be a follow
up
since this one does not affect current behavior at all.

Best,
tison.


Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:

> Hi Kostas,
>
> It seems does no harm we have a configuration parameter of Executor#execute
> since we can merge this one with the one configured on Executor created and
> let this one overwhelm that one.
>
> I can see it is useful that conceptually we can create an Executor for a
> series jobs
> to the same cluster but with different job configuration per pipeline.
>
> Best,
> tison.
>
>
> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
>
>> Hi again,
>>
>> I did not include this to my previous email, as this is related to the
>> proposal on the FLIP itself.
>>
>> In the existing proposal, the Executor interface is the following.
>>
>> public interface Executor {
>>
>>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>
>> }
>>
>> This implies that all the necessary information for the execution of a
>> Pipeline should be included in the Configuration passed in the
>> ExecutorFactory which instantiates the Executor itself. This should
>> include, for example, all the parameters currently supplied by the
>> ProgramOptions, which are conceptually not executor parameters but
>> rather parameters for the execution of the specific pipeline. To this
>> end, I would like to propose a change in the current Executor
>> interface showcased below:
>>
>>
>> public interface Executor {
>>
>>   JobExecutionResult execute(Pipeline pipeline, Configuration
>> executionOptions) throws Exception;
>>
>> }
>>
>> The above will allow to have the Executor specific options passed in
>> the configuration given during executor instantiation, while the
>> pipeline specific options can be passed in the executionOptions. As a
>> positive side-effect, this will make Executors re-usable, i.e.
>> instantiate an executor and use it to execute multiple pipelines, if
>> in the future we choose to do so.
>>
>> Let me know what do you think,
>> Kostas
>>
>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]>
>> wrote:
>> >
>> > Hi all,
>> >
>> > I agree with Tison that we should disentangle threads so that people
>> > can work independently.
>> >
>> > For FLIP-73:
>> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>> > Executors work, as they are using the exexute() method because this is
>> > the only "entry" to the user program. To this regard, I believe we
>> > should just see the fact that they have their dedicated environment as
>> > an "implementation detail".
>> >  - for getting rid of the per-job mode: as a first note, there was
>> > already a discussion here:
>> >
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>> > with many people, including myself, expressing their opinion. I am
>> > mentioning that to show that this topic already has some history and
>> > the discussin does not start from scratch but there are already some
>> > contradicting opinions. My opinion is that we should not get rid of
>> > the per-job mode but I agree that we should discuss about the
>> > semantics in more detail. Although in terms of code it may be tempting
>> > to "merge" the two submission modes, one of the main benefits of the
>> > per-job mode is isolation, both for resources and security, as the
>> > jobGraph to be executed is fixed and the cluster is "locked" just for
>> > that specific graph. This would be violated by having a session
>> > cluster launched and having all the infrastrucutre (ports and
>> > endpoints) set for submittting to that cluster any job.
>> > - for getting rid of the "detached" mode: I agree with getting rid of
>> > it but this implies some potential user-facing changes that should be
>> > discussed.
>> >
>> > Given the above, I think that:
>> > 1) in the context of FLIP-73 we should not change any semantics but
>> > simply push the existing submission logic behind a reusable
>> > abstraction and make it usable via public APIs, as Aljoscha said.
>> > 2) as Till said, changing the semantics is beyond the scope of this
>> > FLIP and as Tison mentioned we should work towards decoupling
>> > discussions rather than the opposite. So let's discuss about the
>> > future of the per-job and detached modes in a separate thread. This
>> > will also allow to give the proper visibility to such an important
>> > topic.
>> >
>> > Cheers,
>> > Kostas
>> >
>> > On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]> wrote:
>> > >
>> > > Thanks for your thoughts Aljoscha.
>> > >
>> > > Another question since FLIP-73 might contains refactors on
>> Environemnt:
>> > > shall we support
>> > > something like PreviewPlanEnvironment? If so, how? From a user
>> perspective
>> > > preview plan
>> > > is useful, by give visual view, to modify topos and configure without
>> > > submit it.
>> > >
>> > > Best,
>> > > tison.
>> > >
>> > >
>> > > Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
>> > >
>> > > > I agree with Till that we should not change the semantics of
>> per-job mode.
>> > > > In my opinion per-job mode means that the cluster (JobManager) is
>> brought
>> > > > up with one job and it only executes that one job. There should be
>> no open
>> > > > ports/anything that would allow submitting further jobs. This is
>> very
>> > > > important for deployments in docker/Kubernetes or other
>> environments were
>> > > > you bring up jobs without necessarily having the notion of a Flink
>> cluster.
>> > > >
>> > > > What this means for a user program that has multiple execute()
>> calls is
>> > > > that you will get a fresh cluster for each execute call. This also
>> means,
>> > > > that further execute() calls will only happen if the “client” is
>> still
>> > > > alive, because it is the one driving execution. Currently, this
>> only works
>> > > > if you start the job in “attached” mode. If you start in “detached”
>> mode
>> > > > only the first execute() will happen and the rest will be ignored.
>> > > >
>> > > > This brings us to the tricky question about what to do about
>> “detached”
>> > > > and “attached”. In the long run, I would like to get rid of the
>> distinction
>> > > > and leave it up to the user program, by either blocking or not on
>> the
>> > > > Future (or JobClient or whatnot) that job submission returns. This,
>> > > > however, means that users cannot simply request “detached”
>> execution when
>> > > > using bin/flink, the user program has to “play along”. On the other
>> hand,
>> > > > “detached” mode is quite strange for the user program. The
>> execute() call
>> > > > either returns with a proper job result after the job ran (in
>> “attached”
>> > > > mode) or with a dummy result (in “detached” mode) right after
>> submission. I
>> > > > think this can even lead to weird cases where multiple "execute()”
>> run in
>> > > > parallel. For per-job detached mode we also “throw” out of the first
>> > > > execute so the rest (including result processing logic) is ignored.
>> > > >
>> > > > For this here FLIP-73 we can (and should) ignore these problems,
>> because
>> > > > FLIP-73 only moves the existing submission logic behind a reusable
>> > > > abstraction and makes it usable via API. We should closely follow
>> up on the
>> > > > above points though because I think they are also important.
>> > > >
>> > > > Best,
>> > > > Aljoscha
>> > > >
>> > > > > On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]> wrote:
>> > > > >
>> > > > > Thanks for your clarification Till.
>> > > > >
>> > > > > I agree with the current semantics of the per-job mode, one should
>> > > > deploy a
>> > > > > new cluster for each part of the job. Apart from the performance
>> concern
>> > > > > it also means that PerJobExecutor knows how to deploy a cluster
>> actually,
>> > > > > which is different from the description that Executor submit a
>> job.
>> > > > >
>> > > > > Anyway it sounds workable and narrow the changes.
>> > > >
>> > > >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

tison
 - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
Executors work, as they are using the exexute() method because this is
the only "entry" to the user program. To this regard, I believe we
should just see the fact that they have their dedicated environment as
an "implementation detail".

The proposal says

In this document, we propose to abstract away from the Environments the job
submission logic and put it in a newly introduced Executor. This will
allow *each
API to have a single Environment* which, based on the provided
configuration, will decide which executor to use, *e.g.* Yarn, Local, etc.
In addition, it will allow different APIs and downstream projects to re-use
the provided executors, thus limiting the amount of code duplication and
the amount of code that has to be written.

note that This will allow *each API to have a single Environment*  it
seems a bit diverge with you statement above. Or we say a single Environment
as a possible advantage after the introduction of Executor so that we
exclude it
from this pass.

Best,
tison.


Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:

> BTW, correct me if I misunderstand, now I learn more about our community
> way. Since FLIP-73 aimed at introducing an interface with community
> consensus the discussion is more about the interface in order to properly
> define a useful and extensible API. The integration story could be a
> follow up
> since this one does not affect current behavior at all.
>
> Best,
> tison.
>
>
> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
>
>> Hi Kostas,
>>
>> It seems does no harm we have a configuration parameter of
>> Executor#execute
>> since we can merge this one with the one configured on Executor created
>> and
>> let this one overwhelm that one.
>>
>> I can see it is useful that conceptually we can create an Executor for a
>> series jobs
>> to the same cluster but with different job configuration per pipeline.
>>
>> Best,
>> tison.
>>
>>
>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
>>
>>> Hi again,
>>>
>>> I did not include this to my previous email, as this is related to the
>>> proposal on the FLIP itself.
>>>
>>> In the existing proposal, the Executor interface is the following.
>>>
>>> public interface Executor {
>>>
>>>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>>
>>> }
>>>
>>> This implies that all the necessary information for the execution of a
>>> Pipeline should be included in the Configuration passed in the
>>> ExecutorFactory which instantiates the Executor itself. This should
>>> include, for example, all the parameters currently supplied by the
>>> ProgramOptions, which are conceptually not executor parameters but
>>> rather parameters for the execution of the specific pipeline. To this
>>> end, I would like to propose a change in the current Executor
>>> interface showcased below:
>>>
>>>
>>> public interface Executor {
>>>
>>>   JobExecutionResult execute(Pipeline pipeline, Configuration
>>> executionOptions) throws Exception;
>>>
>>> }
>>>
>>> The above will allow to have the Executor specific options passed in
>>> the configuration given during executor instantiation, while the
>>> pipeline specific options can be passed in the executionOptions. As a
>>> positive side-effect, this will make Executors re-usable, i.e.
>>> instantiate an executor and use it to execute multiple pipelines, if
>>> in the future we choose to do so.
>>>
>>> Let me know what do you think,
>>> Kostas
>>>
>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]>
>>> wrote:
>>> >
>>> > Hi all,
>>> >
>>> > I agree with Tison that we should disentangle threads so that people
>>> > can work independently.
>>> >
>>> > For FLIP-73:
>>> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>> > Executors work, as they are using the exexute() method because this is
>>> > the only "entry" to the user program. To this regard, I believe we
>>> > should just see the fact that they have their dedicated environment as
>>> > an "implementation detail".
>>> >  - for getting rid of the per-job mode: as a first note, there was
>>> > already a discussion here:
>>> >
>>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>> > with many people, including myself, expressing their opinion. I am
>>> > mentioning that to show that this topic already has some history and
>>> > the discussin does not start from scratch but there are already some
>>> > contradicting opinions. My opinion is that we should not get rid of
>>> > the per-job mode but I agree that we should discuss about the
>>> > semantics in more detail. Although in terms of code it may be tempting
>>> > to "merge" the two submission modes, one of the main benefits of the
>>> > per-job mode is isolation, both for resources and security, as the
>>> > jobGraph to be executed is fixed and the cluster is "locked" just for
>>> > that specific graph. This would be violated by having a session
>>> > cluster launched and having all the infrastrucutre (ports and
>>> > endpoints) set for submittting to that cluster any job.
>>> > - for getting rid of the "detached" mode: I agree with getting rid of
>>> > it but this implies some potential user-facing changes that should be
>>> > discussed.
>>> >
>>> > Given the above, I think that:
>>> > 1) in the context of FLIP-73 we should not change any semantics but
>>> > simply push the existing submission logic behind a reusable
>>> > abstraction and make it usable via public APIs, as Aljoscha said.
>>> > 2) as Till said, changing the semantics is beyond the scope of this
>>> > FLIP and as Tison mentioned we should work towards decoupling
>>> > discussions rather than the opposite. So let's discuss about the
>>> > future of the per-job and detached modes in a separate thread. This
>>> > will also allow to give the proper visibility to such an important
>>> > topic.
>>> >
>>> > Cheers,
>>> > Kostas
>>> >
>>> > On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]> wrote:
>>> > >
>>> > > Thanks for your thoughts Aljoscha.
>>> > >
>>> > > Another question since FLIP-73 might contains refactors on
>>> Environemnt:
>>> > > shall we support
>>> > > something like PreviewPlanEnvironment? If so, how? From a user
>>> perspective
>>> > > preview plan
>>> > > is useful, by give visual view, to modify topos and configure without
>>> > > submit it.
>>> > >
>>> > > Best,
>>> > > tison.
>>> > >
>>> > >
>>> > > Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
>>> > >
>>> > > > I agree with Till that we should not change the semantics of
>>> per-job mode.
>>> > > > In my opinion per-job mode means that the cluster (JobManager) is
>>> brought
>>> > > > up with one job and it only executes that one job. There should be
>>> no open
>>> > > > ports/anything that would allow submitting further jobs. This is
>>> very
>>> > > > important for deployments in docker/Kubernetes or other
>>> environments were
>>> > > > you bring up jobs without necessarily having the notion of a Flink
>>> cluster.
>>> > > >
>>> > > > What this means for a user program that has multiple execute()
>>> calls is
>>> > > > that you will get a fresh cluster for each execute call. This also
>>> means,
>>> > > > that further execute() calls will only happen if the “client” is
>>> still
>>> > > > alive, because it is the one driving execution. Currently, this
>>> only works
>>> > > > if you start the job in “attached” mode. If you start in
>>> “detached” mode
>>> > > > only the first execute() will happen and the rest will be ignored.
>>> > > >
>>> > > > This brings us to the tricky question about what to do about
>>> “detached”
>>> > > > and “attached”. In the long run, I would like to get rid of the
>>> distinction
>>> > > > and leave it up to the user program, by either blocking or not on
>>> the
>>> > > > Future (or JobClient or whatnot) that job submission returns. This,
>>> > > > however, means that users cannot simply request “detached”
>>> execution when
>>> > > > using bin/flink, the user program has to “play along”. On the
>>> other hand,
>>> > > > “detached” mode is quite strange for the user program. The
>>> execute() call
>>> > > > either returns with a proper job result after the job ran (in
>>> “attached”
>>> > > > mode) or with a dummy result (in “detached” mode) right after
>>> submission. I
>>> > > > think this can even lead to weird cases where multiple "execute()”
>>> run in
>>> > > > parallel. For per-job detached mode we also “throw” out of the
>>> first
>>> > > > execute so the rest (including result processing logic) is ignored.
>>> > > >
>>> > > > For this here FLIP-73 we can (and should) ignore these problems,
>>> because
>>> > > > FLIP-73 only moves the existing submission logic behind a reusable
>>> > > > abstraction and makes it usable via API. We should closely follow
>>> up on the
>>> > > > above points though because I think they are also important.
>>> > > >
>>> > > > Best,
>>> > > > Aljoscha
>>> > > >
>>> > > > > On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
>>> wrote:
>>> > > > >
>>> > > > > Thanks for your clarification Till.
>>> > > > >
>>> > > > > I agree with the current semantics of the per-job mode, one
>>> should
>>> > > > deploy a
>>> > > > > new cluster for each part of the job. Apart from the performance
>>> concern
>>> > > > > it also means that PerJobExecutor knows how to deploy a cluster
>>> actually,
>>> > > > > which is different from the description that Executor submit a
>>> job.
>>> > > > >
>>> > > > > Anyway it sounds workable and narrow the changes.
>>> > > >
>>> > > >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

Kostas Kloudas-4
Hi Tison,

The changes that this FLIP propose are:
- the introduction of the Executor interface
- the fact that everything in the current state of job submission in
Flink can be defined through configuration parameters
- implementation of Executors that do not change any of the semantics
of the currently offered "modes" of job submission

In this, and in the FLIP itself where the
ExecutionEnvironment.execute() method is described, there are details
about parts of the
integration with the existing Flink code-base.

So I am not sure what do you mean by making the "integration a
follow-up discussion".

Cheers,
Kostas

On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]> wrote:

>
>  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> Executors work, as they are using the exexute() method because this is
> the only "entry" to the user program. To this regard, I believe we
> should just see the fact that they have their dedicated environment as
> an "implementation detail".
>
> The proposal says
>
> In this document, we propose to abstract away from the Environments the job
> submission logic and put it in a newly introduced Executor. This will
> allow *each
> API to have a single Environment* which, based on the provided
> configuration, will decide which executor to use, *e.g.* Yarn, Local, etc.
> In addition, it will allow different APIs and downstream projects to re-use
> the provided executors, thus limiting the amount of code duplication and
> the amount of code that has to be written.
>
> note that This will allow *each API to have a single Environment*  it
> seems a bit diverge with you statement above. Or we say a single Environment
> as a possible advantage after the introduction of Executor so that we
> exclude it
> from this pass.
>
> Best,
> tison.
>
>
> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
>
> > BTW, correct me if I misunderstand, now I learn more about our community
> > way. Since FLIP-73 aimed at introducing an interface with community
> > consensus the discussion is more about the interface in order to properly
> > define a useful and extensible API. The integration story could be a
> > follow up
> > since this one does not affect current behavior at all.
> >
> > Best,
> > tison.
> >
> >
> > Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
> >
> >> Hi Kostas,
> >>
> >> It seems does no harm we have a configuration parameter of
> >> Executor#execute
> >> since we can merge this one with the one configured on Executor created
> >> and
> >> let this one overwhelm that one.
> >>
> >> I can see it is useful that conceptually we can create an Executor for a
> >> series jobs
> >> to the same cluster but with different job configuration per pipeline.
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
> >>
> >>> Hi again,
> >>>
> >>> I did not include this to my previous email, as this is related to the
> >>> proposal on the FLIP itself.
> >>>
> >>> In the existing proposal, the Executor interface is the following.
> >>>
> >>> public interface Executor {
> >>>
> >>>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
> >>>
> >>> }
> >>>
> >>> This implies that all the necessary information for the execution of a
> >>> Pipeline should be included in the Configuration passed in the
> >>> ExecutorFactory which instantiates the Executor itself. This should
> >>> include, for example, all the parameters currently supplied by the
> >>> ProgramOptions, which are conceptually not executor parameters but
> >>> rather parameters for the execution of the specific pipeline. To this
> >>> end, I would like to propose a change in the current Executor
> >>> interface showcased below:
> >>>
> >>>
> >>> public interface Executor {
> >>>
> >>>   JobExecutionResult execute(Pipeline pipeline, Configuration
> >>> executionOptions) throws Exception;
> >>>
> >>> }
> >>>
> >>> The above will allow to have the Executor specific options passed in
> >>> the configuration given during executor instantiation, while the
> >>> pipeline specific options can be passed in the executionOptions. As a
> >>> positive side-effect, this will make Executors re-usable, i.e.
> >>> instantiate an executor and use it to execute multiple pipelines, if
> >>> in the future we choose to do so.
> >>>
> >>> Let me know what do you think,
> >>> Kostas
> >>>
> >>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]>
> >>> wrote:
> >>> >
> >>> > Hi all,
> >>> >
> >>> > I agree with Tison that we should disentangle threads so that people
> >>> > can work independently.
> >>> >
> >>> > For FLIP-73:
> >>> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> >>> > Executors work, as they are using the exexute() method because this is
> >>> > the only "entry" to the user program. To this regard, I believe we
> >>> > should just see the fact that they have their dedicated environment as
> >>> > an "implementation detail".
> >>> >  - for getting rid of the per-job mode: as a first note, there was
> >>> > already a discussion here:
> >>> >
> >>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >>> > with many people, including myself, expressing their opinion. I am
> >>> > mentioning that to show that this topic already has some history and
> >>> > the discussin does not start from scratch but there are already some
> >>> > contradicting opinions. My opinion is that we should not get rid of
> >>> > the per-job mode but I agree that we should discuss about the
> >>> > semantics in more detail. Although in terms of code it may be tempting
> >>> > to "merge" the two submission modes, one of the main benefits of the
> >>> > per-job mode is isolation, both for resources and security, as the
> >>> > jobGraph to be executed is fixed and the cluster is "locked" just for
> >>> > that specific graph. This would be violated by having a session
> >>> > cluster launched and having all the infrastrucutre (ports and
> >>> > endpoints) set for submittting to that cluster any job.
> >>> > - for getting rid of the "detached" mode: I agree with getting rid of
> >>> > it but this implies some potential user-facing changes that should be
> >>> > discussed.
> >>> >
> >>> > Given the above, I think that:
> >>> > 1) in the context of FLIP-73 we should not change any semantics but
> >>> > simply push the existing submission logic behind a reusable
> >>> > abstraction and make it usable via public APIs, as Aljoscha said.
> >>> > 2) as Till said, changing the semantics is beyond the scope of this
> >>> > FLIP and as Tison mentioned we should work towards decoupling
> >>> > discussions rather than the opposite. So let's discuss about the
> >>> > future of the per-job and detached modes in a separate thread. This
> >>> > will also allow to give the proper visibility to such an important
> >>> > topic.
> >>> >
> >>> > Cheers,
> >>> > Kostas
> >>> >
> >>> > On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]> wrote:
> >>> > >
> >>> > > Thanks for your thoughts Aljoscha.
> >>> > >
> >>> > > Another question since FLIP-73 might contains refactors on
> >>> Environemnt:
> >>> > > shall we support
> >>> > > something like PreviewPlanEnvironment? If so, how? From a user
> >>> perspective
> >>> > > preview plan
> >>> > > is useful, by give visual view, to modify topos and configure without
> >>> > > submit it.
> >>> > >
> >>> > > Best,
> >>> > > tison.
> >>> > >
> >>> > >
> >>> > > Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
> >>> > >
> >>> > > > I agree with Till that we should not change the semantics of
> >>> per-job mode.
> >>> > > > In my opinion per-job mode means that the cluster (JobManager) is
> >>> brought
> >>> > > > up with one job and it only executes that one job. There should be
> >>> no open
> >>> > > > ports/anything that would allow submitting further jobs. This is
> >>> very
> >>> > > > important for deployments in docker/Kubernetes or other
> >>> environments were
> >>> > > > you bring up jobs without necessarily having the notion of a Flink
> >>> cluster.
> >>> > > >
> >>> > > > What this means for a user program that has multiple execute()
> >>> calls is
> >>> > > > that you will get a fresh cluster for each execute call. This also
> >>> means,
> >>> > > > that further execute() calls will only happen if the “client” is
> >>> still
> >>> > > > alive, because it is the one driving execution. Currently, this
> >>> only works
> >>> > > > if you start the job in “attached” mode. If you start in
> >>> “detached” mode
> >>> > > > only the first execute() will happen and the rest will be ignored.
> >>> > > >
> >>> > > > This brings us to the tricky question about what to do about
> >>> “detached”
> >>> > > > and “attached”. In the long run, I would like to get rid of the
> >>> distinction
> >>> > > > and leave it up to the user program, by either blocking or not on
> >>> the
> >>> > > > Future (or JobClient or whatnot) that job submission returns. This,
> >>> > > > however, means that users cannot simply request “detached”
> >>> execution when
> >>> > > > using bin/flink, the user program has to “play along”. On the
> >>> other hand,
> >>> > > > “detached” mode is quite strange for the user program. The
> >>> execute() call
> >>> > > > either returns with a proper job result after the job ran (in
> >>> “attached”
> >>> > > > mode) or with a dummy result (in “detached” mode) right after
> >>> submission. I
> >>> > > > think this can even lead to weird cases where multiple "execute()”
> >>> run in
> >>> > > > parallel. For per-job detached mode we also “throw” out of the
> >>> first
> >>> > > > execute so the rest (including result processing logic) is ignored.
> >>> > > >
> >>> > > > For this here FLIP-73 we can (and should) ignore these problems,
> >>> because
> >>> > > > FLIP-73 only moves the existing submission logic behind a reusable
> >>> > > > abstraction and makes it usable via API. We should closely follow
> >>> up on the
> >>> > > > above points though because I think they are also important.
> >>> > > >
> >>> > > > Best,
> >>> > > > Aljoscha
> >>> > > >
> >>> > > > > On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
> >>> wrote:
> >>> > > > >
> >>> > > > > Thanks for your clarification Till.
> >>> > > > >
> >>> > > > > I agree with the current semantics of the per-job mode, one
> >>> should
> >>> > > > deploy a
> >>> > > > > new cluster for each part of the job. Apart from the performance
> >>> concern
> >>> > > > > it also means that PerJobExecutor knows how to deploy a cluster
> >>> actually,
> >>> > > > > which is different from the description that Executor submit a
> >>> job.
> >>> > > > >
> >>> > > > > Anyway it sounds workable and narrow the changes.
> >>> > > >
> >>> > > >
> >>>
> >>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

tison
Thanks for your explanation Kostas to make it clear subtasks under FLIP-73.

As you described, changes of Environment are included in this FLIP. For
"each
API to have a single Environment", it could be helpful to describe which
APIs we'd
like to have after FLIP-73. And if we keep multiple Environments, shall we
keep the
way inject context environment for each API?


Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:

> Hi Tison,
>
> The changes that this FLIP propose are:
> - the introduction of the Executor interface
> - the fact that everything in the current state of job submission in
> Flink can be defined through configuration parameters
> - implementation of Executors that do not change any of the semantics
> of the currently offered "modes" of job submission
>
> In this, and in the FLIP itself where the
> ExecutionEnvironment.execute() method is described, there are details
> about parts of the
> integration with the existing Flink code-base.
>
> So I am not sure what do you mean by making the "integration a
> follow-up discussion".
>
> Cheers,
> Kostas
>
> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]> wrote:
> >
> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> > Executors work, as they are using the exexute() method because this is
> > the only "entry" to the user program. To this regard, I believe we
> > should just see the fact that they have their dedicated environment as
> > an "implementation detail".
> >
> > The proposal says
> >
> > In this document, we propose to abstract away from the Environments the
> job
> > submission logic and put it in a newly introduced Executor. This will
> > allow *each
> > API to have a single Environment* which, based on the provided
> > configuration, will decide which executor to use, *e.g.* Yarn, Local,
> etc.
> > In addition, it will allow different APIs and downstream projects to
> re-use
> > the provided executors, thus limiting the amount of code duplication and
> > the amount of code that has to be written.
> >
> > note that This will allow *each API to have a single Environment*  it
> > seems a bit diverge with you statement above. Or we say a single
> Environment
> > as a possible advantage after the introduction of Executor so that we
> > exclude it
> > from this pass.
> >
> > Best,
> > tison.
> >
> >
> > Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
> >
> > > BTW, correct me if I misunderstand, now I learn more about our
> community
> > > way. Since FLIP-73 aimed at introducing an interface with community
> > > consensus the discussion is more about the interface in order to
> properly
> > > define a useful and extensible API. The integration story could be a
> > > follow up
> > > since this one does not affect current behavior at all.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
> > >
> > >> Hi Kostas,
> > >>
> > >> It seems does no harm we have a configuration parameter of
> > >> Executor#execute
> > >> since we can merge this one with the one configured on Executor
> created
> > >> and
> > >> let this one overwhelm that one.
> > >>
> > >> I can see it is useful that conceptually we can create an Executor
> for a
> > >> series jobs
> > >> to the same cluster but with different job configuration per pipeline.
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >>
> > >> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
> > >>
> > >>> Hi again,
> > >>>
> > >>> I did not include this to my previous email, as this is related to
> the
> > >>> proposal on the FLIP itself.
> > >>>
> > >>> In the existing proposal, the Executor interface is the following.
> > >>>
> > >>> public interface Executor {
> > >>>
> > >>>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
> > >>>
> > >>> }
> > >>>
> > >>> This implies that all the necessary information for the execution of
> a
> > >>> Pipeline should be included in the Configuration passed in the
> > >>> ExecutorFactory which instantiates the Executor itself. This should
> > >>> include, for example, all the parameters currently supplied by the
> > >>> ProgramOptions, which are conceptually not executor parameters but
> > >>> rather parameters for the execution of the specific pipeline. To this
> > >>> end, I would like to propose a change in the current Executor
> > >>> interface showcased below:
> > >>>
> > >>>
> > >>> public interface Executor {
> > >>>
> > >>>   JobExecutionResult execute(Pipeline pipeline, Configuration
> > >>> executionOptions) throws Exception;
> > >>>
> > >>> }
> > >>>
> > >>> The above will allow to have the Executor specific options passed in
> > >>> the configuration given during executor instantiation, while the
> > >>> pipeline specific options can be passed in the executionOptions. As a
> > >>> positive side-effect, this will make Executors re-usable, i.e.
> > >>> instantiate an executor and use it to execute multiple pipelines, if
> > >>> in the future we choose to do so.
> > >>>
> > >>> Let me know what do you think,
> > >>> Kostas
> > >>>
> > >>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]>
> > >>> wrote:
> > >>> >
> > >>> > Hi all,
> > >>> >
> > >>> > I agree with Tison that we should disentangle threads so that
> people
> > >>> > can work independently.
> > >>> >
> > >>> > For FLIP-73:
> > >>> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> > >>> > Executors work, as they are using the exexute() method because
> this is
> > >>> > the only "entry" to the user program. To this regard, I believe we
> > >>> > should just see the fact that they have their dedicated
> environment as
> > >>> > an "implementation detail".
> > >>> >  - for getting rid of the per-job mode: as a first note, there was
> > >>> > already a discussion here:
> > >>> >
> > >>>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > >>> > with many people, including myself, expressing their opinion. I am
> > >>> > mentioning that to show that this topic already has some history
> and
> > >>> > the discussin does not start from scratch but there are already
> some
> > >>> > contradicting opinions. My opinion is that we should not get rid of
> > >>> > the per-job mode but I agree that we should discuss about the
> > >>> > semantics in more detail. Although in terms of code it may be
> tempting
> > >>> > to "merge" the two submission modes, one of the main benefits of
> the
> > >>> > per-job mode is isolation, both for resources and security, as the
> > >>> > jobGraph to be executed is fixed and the cluster is "locked" just
> for
> > >>> > that specific graph. This would be violated by having a session
> > >>> > cluster launched and having all the infrastrucutre (ports and
> > >>> > endpoints) set for submittting to that cluster any job.
> > >>> > - for getting rid of the "detached" mode: I agree with getting rid
> of
> > >>> > it but this implies some potential user-facing changes that should
> be
> > >>> > discussed.
> > >>> >
> > >>> > Given the above, I think that:
> > >>> > 1) in the context of FLIP-73 we should not change any semantics but
> > >>> > simply push the existing submission logic behind a reusable
> > >>> > abstraction and make it usable via public APIs, as Aljoscha said.
> > >>> > 2) as Till said, changing the semantics is beyond the scope of this
> > >>> > FLIP and as Tison mentioned we should work towards decoupling
> > >>> > discussions rather than the opposite. So let's discuss about the
> > >>> > future of the per-job and detached modes in a separate thread. This
> > >>> > will also allow to give the proper visibility to such an important
> > >>> > topic.
> > >>> >
> > >>> > Cheers,
> > >>> > Kostas
> > >>> >
> > >>> > On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]>
> wrote:
> > >>> > >
> > >>> > > Thanks for your thoughts Aljoscha.
> > >>> > >
> > >>> > > Another question since FLIP-73 might contains refactors on
> > >>> Environemnt:
> > >>> > > shall we support
> > >>> > > something like PreviewPlanEnvironment? If so, how? From a user
> > >>> perspective
> > >>> > > preview plan
> > >>> > > is useful, by give visual view, to modify topos and configure
> without
> > >>> > > submit it.
> > >>> > >
> > >>> > > Best,
> > >>> > > tison.
> > >>> > >
> > >>> > >
> > >>> > > Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
> > >>> > >
> > >>> > > > I agree with Till that we should not change the semantics of
> > >>> per-job mode.
> > >>> > > > In my opinion per-job mode means that the cluster (JobManager)
> is
> > >>> brought
> > >>> > > > up with one job and it only executes that one job. There
> should be
> > >>> no open
> > >>> > > > ports/anything that would allow submitting further jobs. This
> is
> > >>> very
> > >>> > > > important for deployments in docker/Kubernetes or other
> > >>> environments were
> > >>> > > > you bring up jobs without necessarily having the notion of a
> Flink
> > >>> cluster.
> > >>> > > >
> > >>> > > > What this means for a user program that has multiple execute()
> > >>> calls is
> > >>> > > > that you will get a fresh cluster for each execute call. This
> also
> > >>> means,
> > >>> > > > that further execute() calls will only happen if the “client”
> is
> > >>> still
> > >>> > > > alive, because it is the one driving execution. Currently, this
> > >>> only works
> > >>> > > > if you start the job in “attached” mode. If you start in
> > >>> “detached” mode
> > >>> > > > only the first execute() will happen and the rest will be
> ignored.
> > >>> > > >
> > >>> > > > This brings us to the tricky question about what to do about
> > >>> “detached”
> > >>> > > > and “attached”. In the long run, I would like to get rid of the
> > >>> distinction
> > >>> > > > and leave it up to the user program, by either blocking or not
> on
> > >>> the
> > >>> > > > Future (or JobClient or whatnot) that job submission returns.
> This,
> > >>> > > > however, means that users cannot simply request “detached”
> > >>> execution when
> > >>> > > > using bin/flink, the user program has to “play along”. On the
> > >>> other hand,
> > >>> > > > “detached” mode is quite strange for the user program. The
> > >>> execute() call
> > >>> > > > either returns with a proper job result after the job ran (in
> > >>> “attached”
> > >>> > > > mode) or with a dummy result (in “detached” mode) right after
> > >>> submission. I
> > >>> > > > think this can even lead to weird cases where multiple
> "execute()”
> > >>> run in
> > >>> > > > parallel. For per-job detached mode we also “throw” out of the
> > >>> first
> > >>> > > > execute so the rest (including result processing logic) is
> ignored.
> > >>> > > >
> > >>> > > > For this here FLIP-73 we can (and should) ignore these
> problems,
> > >>> because
> > >>> > > > FLIP-73 only moves the existing submission logic behind a
> reusable
> > >>> > > > abstraction and makes it usable via API. We should closely
> follow
> > >>> up on the
> > >>> > > > above points though because I think they are also important.
> > >>> > > >
> > >>> > > > Best,
> > >>> > > > Aljoscha
> > >>> > > >
> > >>> > > > > On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
> > >>> wrote:
> > >>> > > > >
> > >>> > > > > Thanks for your clarification Till.
> > >>> > > > >
> > >>> > > > > I agree with the current semantics of the per-job mode, one
> > >>> should
> > >>> > > > deploy a
> > >>> > > > > new cluster for each part of the job. Apart from the
> performance
> > >>> concern
> > >>> > > > > it also means that PerJobExecutor knows how to deploy a
> cluster
> > >>> actually,
> > >>> > > > > which is different from the description that Executor submit
> a
> > >>> job.
> > >>> > > > >
> > >>> > > > > Anyway it sounds workable and narrow the changes.
> > >>> > > >
> > >>> > > >
> > >>>
> > >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

Aljoscha Krettek-2
Hi,

I think the end goal is to have only one environment per API, but I think we won’t be able to achieve that in the short-term because of backwards compatibility. This is most notable with the context environment, preview environments etc.

To keep this FLIP very slim we can make this only about the executors and executor discovery. Anything else like job submission semantics, detached mode, … can be tackled after this. If we don’t focus I’m afraid this will drag on for quite a while.

One thing I would like to propose to make this easier is to change Executor.execute() to return a CompletableFuture and to completely remove the “detached” logic from ClusterClient. That way, the new components make no distinction between “detached” and “attached” but we can still do it in the CLI (via the ContextEnvironment) to support the existing “detached” behaviour of the CLI that users expect. What do you think about this?

Best,
Aljoscha

> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]> wrote:
>
> Thanks for your explanation Kostas to make it clear subtasks under FLIP-73.
>
> As you described, changes of Environment are included in this FLIP. For
> "each
> API to have a single Environment", it could be helpful to describe which
> APIs we'd
> like to have after FLIP-73. And if we keep multiple Environments, shall we
> keep the
> way inject context environment for each API?
>
>
> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
>
>> Hi Tison,
>>
>> The changes that this FLIP propose are:
>> - the introduction of the Executor interface
>> - the fact that everything in the current state of job submission in
>> Flink can be defined through configuration parameters
>> - implementation of Executors that do not change any of the semantics
>> of the currently offered "modes" of job submission
>>
>> In this, and in the FLIP itself where the
>> ExecutionEnvironment.execute() method is described, there are details
>> about parts of the
>> integration with the existing Flink code-base.
>>
>> So I am not sure what do you mean by making the "integration a
>> follow-up discussion".
>>
>> Cheers,
>> Kostas
>>
>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]> wrote:
>>>
>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>> Executors work, as they are using the exexute() method because this is
>>> the only "entry" to the user program. To this regard, I believe we
>>> should just see the fact that they have their dedicated environment as
>>> an "implementation detail".
>>>
>>> The proposal says
>>>
>>> In this document, we propose to abstract away from the Environments the
>> job
>>> submission logic and put it in a newly introduced Executor. This will
>>> allow *each
>>> API to have a single Environment* which, based on the provided
>>> configuration, will decide which executor to use, *e.g.* Yarn, Local,
>> etc.
>>> In addition, it will allow different APIs and downstream projects to
>> re-use
>>> the provided executors, thus limiting the amount of code duplication and
>>> the amount of code that has to be written.
>>>
>>> note that This will allow *each API to have a single Environment*  it
>>> seems a bit diverge with you statement above. Or we say a single
>> Environment
>>> as a possible advantage after the introduction of Executor so that we
>>> exclude it
>>> from this pass.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
>>>
>>>> BTW, correct me if I misunderstand, now I learn more about our
>> community
>>>> way. Since FLIP-73 aimed at introducing an interface with community
>>>> consensus the discussion is more about the interface in order to
>> properly
>>>> define a useful and extensible API. The integration story could be a
>>>> follow up
>>>> since this one does not affect current behavior at all.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>>
>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
>>>>
>>>>> Hi Kostas,
>>>>>
>>>>> It seems does no harm we have a configuration parameter of
>>>>> Executor#execute
>>>>> since we can merge this one with the one configured on Executor
>> created
>>>>> and
>>>>> let this one overwhelm that one.
>>>>>
>>>>> I can see it is useful that conceptually we can create an Executor
>> for a
>>>>> series jobs
>>>>> to the same cluster but with different job configuration per pipeline.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>>
>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
>>>>>
>>>>>> Hi again,
>>>>>>
>>>>>> I did not include this to my previous email, as this is related to
>> the
>>>>>> proposal on the FLIP itself.
>>>>>>
>>>>>> In the existing proposal, the Executor interface is the following.
>>>>>>
>>>>>> public interface Executor {
>>>>>>
>>>>>>  JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> This implies that all the necessary information for the execution of
>> a
>>>>>> Pipeline should be included in the Configuration passed in the
>>>>>> ExecutorFactory which instantiates the Executor itself. This should
>>>>>> include, for example, all the parameters currently supplied by the
>>>>>> ProgramOptions, which are conceptually not executor parameters but
>>>>>> rather parameters for the execution of the specific pipeline. To this
>>>>>> end, I would like to propose a change in the current Executor
>>>>>> interface showcased below:
>>>>>>
>>>>>>
>>>>>> public interface Executor {
>>>>>>
>>>>>>  JobExecutionResult execute(Pipeline pipeline, Configuration
>>>>>> executionOptions) throws Exception;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> The above will allow to have the Executor specific options passed in
>>>>>> the configuration given during executor instantiation, while the
>>>>>> pipeline specific options can be passed in the executionOptions. As a
>>>>>> positive side-effect, this will make Executors re-usable, i.e.
>>>>>> instantiate an executor and use it to execute multiple pipelines, if
>>>>>> in the future we choose to do so.
>>>>>>
>>>>>> Let me know what do you think,
>>>>>> Kostas
>>>>>>
>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I agree with Tison that we should disentangle threads so that
>> people
>>>>>>> can work independently.
>>>>>>>
>>>>>>> For FLIP-73:
>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>>>>> Executors work, as they are using the exexute() method because
>> this is
>>>>>>> the only "entry" to the user program. To this regard, I believe we
>>>>>>> should just see the fact that they have their dedicated
>> environment as
>>>>>>> an "implementation detail".
>>>>>>> - for getting rid of the per-job mode: as a first note, there was
>>>>>>> already a discussion here:
>>>>>>>
>>>>>>
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>> with many people, including myself, expressing their opinion. I am
>>>>>>> mentioning that to show that this topic already has some history
>> and
>>>>>>> the discussin does not start from scratch but there are already
>> some
>>>>>>> contradicting opinions. My opinion is that we should not get rid of
>>>>>>> the per-job mode but I agree that we should discuss about the
>>>>>>> semantics in more detail. Although in terms of code it may be
>> tempting
>>>>>>> to "merge" the two submission modes, one of the main benefits of
>> the
>>>>>>> per-job mode is isolation, both for resources and security, as the
>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" just
>> for
>>>>>>> that specific graph. This would be violated by having a session
>>>>>>> cluster launched and having all the infrastrucutre (ports and
>>>>>>> endpoints) set for submittting to that cluster any job.
>>>>>>> - for getting rid of the "detached" mode: I agree with getting rid
>> of
>>>>>>> it but this implies some potential user-facing changes that should
>> be
>>>>>>> discussed.
>>>>>>>
>>>>>>> Given the above, I think that:
>>>>>>> 1) in the context of FLIP-73 we should not change any semantics but
>>>>>>> simply push the existing submission logic behind a reusable
>>>>>>> abstraction and make it usable via public APIs, as Aljoscha said.
>>>>>>> 2) as Till said, changing the semantics is beyond the scope of this
>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
>>>>>>> discussions rather than the opposite. So let's discuss about the
>>>>>>> future of the per-job and detached modes in a separate thread. This
>>>>>>> will also allow to give the proper visibility to such an important
>>>>>>> topic.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]>
>> wrote:
>>>>>>>>
>>>>>>>> Thanks for your thoughts Aljoscha.
>>>>>>>>
>>>>>>>> Another question since FLIP-73 might contains refactors on
>>>>>> Environemnt:
>>>>>>>> shall we support
>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user
>>>>>> perspective
>>>>>>>> preview plan
>>>>>>>> is useful, by give visual view, to modify topos and configure
>> without
>>>>>>>> submit it.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> tison.
>>>>>>>>
>>>>>>>>
>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
>>>>>>>>
>>>>>>>>> I agree with Till that we should not change the semantics of
>>>>>> per-job mode.
>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager)
>> is
>>>>>> brought
>>>>>>>>> up with one job and it only executes that one job. There
>> should be
>>>>>> no open
>>>>>>>>> ports/anything that would allow submitting further jobs. This
>> is
>>>>>> very
>>>>>>>>> important for deployments in docker/Kubernetes or other
>>>>>> environments were
>>>>>>>>> you bring up jobs without necessarily having the notion of a
>> Flink
>>>>>> cluster.
>>>>>>>>>
>>>>>>>>> What this means for a user program that has multiple execute()
>>>>>> calls is
>>>>>>>>> that you will get a fresh cluster for each execute call. This
>> also
>>>>>> means,
>>>>>>>>> that further execute() calls will only happen if the “client”
>> is
>>>>>> still
>>>>>>>>> alive, because it is the one driving execution. Currently, this
>>>>>> only works
>>>>>>>>> if you start the job in “attached” mode. If you start in
>>>>>> “detached” mode
>>>>>>>>> only the first execute() will happen and the rest will be
>> ignored.
>>>>>>>>>
>>>>>>>>> This brings us to the tricky question about what to do about
>>>>>> “detached”
>>>>>>>>> and “attached”. In the long run, I would like to get rid of the
>>>>>> distinction
>>>>>>>>> and leave it up to the user program, by either blocking or not
>> on
>>>>>> the
>>>>>>>>> Future (or JobClient or whatnot) that job submission returns.
>> This,
>>>>>>>>> however, means that users cannot simply request “detached”
>>>>>> execution when
>>>>>>>>> using bin/flink, the user program has to “play along”. On the
>>>>>> other hand,
>>>>>>>>> “detached” mode is quite strange for the user program. The
>>>>>> execute() call
>>>>>>>>> either returns with a proper job result after the job ran (in
>>>>>> “attached”
>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after
>>>>>> submission. I
>>>>>>>>> think this can even lead to weird cases where multiple
>> "execute()”
>>>>>> run in
>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the
>>>>>> first
>>>>>>>>> execute so the rest (including result processing logic) is
>> ignored.
>>>>>>>>>
>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
>> problems,
>>>>>> because
>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
>> reusable
>>>>>>>>> abstraction and makes it usable via API. We should closely
>> follow
>>>>>> up on the
>>>>>>>>> above points though because I think they are also important.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks for your clarification Till.
>>>>>>>>>>
>>>>>>>>>> I agree with the current semantics of the per-job mode, one
>>>>>> should
>>>>>>>>> deploy a
>>>>>>>>>> new cluster for each part of the job. Apart from the
>> performance
>>>>>> concern
>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
>> cluster
>>>>>> actually,
>>>>>>>>>> which is different from the description that Executor submit
>> a
>>>>>> job.
>>>>>>>>>>
>>>>>>>>>> Anyway it sounds workable and narrow the changes.
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

Aljoscha Krettek-2
Do you all think we could agree on the basic executor primitives and start voting on this FLIP? There are still some implementation details but I think we can discuss/tackle them when we get to them and the various people implementing this should be in close collaboration.

Best,
Aljoscha

> On 4. Oct 2019, at 10:15, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi,
>
> I think the end goal is to have only one environment per API, but I think we won’t be able to achieve that in the short-term because of backwards compatibility. This is most notable with the context environment, preview environments etc.
>
> To keep this FLIP very slim we can make this only about the executors and executor discovery. Anything else like job submission semantics, detached mode, … can be tackled after this. If we don’t focus I’m afraid this will drag on for quite a while.
>
> One thing I would like to propose to make this easier is to change Executor.execute() to return a CompletableFuture and to completely remove the “detached” logic from ClusterClient. That way, the new components make no distinction between “detached” and “attached” but we can still do it in the CLI (via the ContextEnvironment) to support the existing “detached” behaviour of the CLI that users expect. What do you think about this?
>
> Best,
> Aljoscha
>
>> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]> wrote:
>>
>> Thanks for your explanation Kostas to make it clear subtasks under FLIP-73.
>>
>> As you described, changes of Environment are included in this FLIP. For
>> "each
>> API to have a single Environment", it could be helpful to describe which
>> APIs we'd
>> like to have after FLIP-73. And if we keep multiple Environments, shall we
>> keep the
>> way inject context environment for each API?
>>
>>
>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
>>
>>> Hi Tison,
>>>
>>> The changes that this FLIP propose are:
>>> - the introduction of the Executor interface
>>> - the fact that everything in the current state of job submission in
>>> Flink can be defined through configuration parameters
>>> - implementation of Executors that do not change any of the semantics
>>> of the currently offered "modes" of job submission
>>>
>>> In this, and in the FLIP itself where the
>>> ExecutionEnvironment.execute() method is described, there are details
>>> about parts of the
>>> integration with the existing Flink code-base.
>>>
>>> So I am not sure what do you mean by making the "integration a
>>> follow-up discussion".
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]> wrote:
>>>>
>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>> Executors work, as they are using the exexute() method because this is
>>>> the only "entry" to the user program. To this regard, I believe we
>>>> should just see the fact that they have their dedicated environment as
>>>> an "implementation detail".
>>>>
>>>> The proposal says
>>>>
>>>> In this document, we propose to abstract away from the Environments the
>>> job
>>>> submission logic and put it in a newly introduced Executor. This will
>>>> allow *each
>>>> API to have a single Environment* which, based on the provided
>>>> configuration, will decide which executor to use, *e.g.* Yarn, Local,
>>> etc.
>>>> In addition, it will allow different APIs and downstream projects to
>>> re-use
>>>> the provided executors, thus limiting the amount of code duplication and
>>>> the amount of code that has to be written.
>>>>
>>>> note that This will allow *each API to have a single Environment*  it
>>>> seems a bit diverge with you statement above. Or we say a single
>>> Environment
>>>> as a possible advantage after the introduction of Executor so that we
>>>> exclude it
>>>> from this pass.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>>
>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
>>>>
>>>>> BTW, correct me if I misunderstand, now I learn more about our
>>> community
>>>>> way. Since FLIP-73 aimed at introducing an interface with community
>>>>> consensus the discussion is more about the interface in order to
>>> properly
>>>>> define a useful and extensible API. The integration story could be a
>>>>> follow up
>>>>> since this one does not affect current behavior at all.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>>
>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
>>>>>
>>>>>> Hi Kostas,
>>>>>>
>>>>>> It seems does no harm we have a configuration parameter of
>>>>>> Executor#execute
>>>>>> since we can merge this one with the one configured on Executor
>>> created
>>>>>> and
>>>>>> let this one overwhelm that one.
>>>>>>
>>>>>> I can see it is useful that conceptually we can create an Executor
>>> for a
>>>>>> series jobs
>>>>>> to the same cluster but with different job configuration per pipeline.
>>>>>>
>>>>>> Best,
>>>>>> tison.
>>>>>>
>>>>>>
>>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
>>>>>>
>>>>>>> Hi again,
>>>>>>>
>>>>>>> I did not include this to my previous email, as this is related to
>>> the
>>>>>>> proposal on the FLIP itself.
>>>>>>>
>>>>>>> In the existing proposal, the Executor interface is the following.
>>>>>>>
>>>>>>> public interface Executor {
>>>>>>>
>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> This implies that all the necessary information for the execution of
>>> a
>>>>>>> Pipeline should be included in the Configuration passed in the
>>>>>>> ExecutorFactory which instantiates the Executor itself. This should
>>>>>>> include, for example, all the parameters currently supplied by the
>>>>>>> ProgramOptions, which are conceptually not executor parameters but
>>>>>>> rather parameters for the execution of the specific pipeline. To this
>>>>>>> end, I would like to propose a change in the current Executor
>>>>>>> interface showcased below:
>>>>>>>
>>>>>>>
>>>>>>> public interface Executor {
>>>>>>>
>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
>>>>>>> executionOptions) throws Exception;
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> The above will allow to have the Executor specific options passed in
>>>>>>> the configuration given during executor instantiation, while the
>>>>>>> pipeline specific options can be passed in the executionOptions. As a
>>>>>>> positive side-effect, this will make Executors re-usable, i.e.
>>>>>>> instantiate an executor and use it to execute multiple pipelines, if
>>>>>>> in the future we choose to do so.
>>>>>>>
>>>>>>> Let me know what do you think,
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I agree with Tison that we should disentangle threads so that
>>> people
>>>>>>>> can work independently.
>>>>>>>>
>>>>>>>> For FLIP-73:
>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>>>>>> Executors work, as they are using the exexute() method because
>>> this is
>>>>>>>> the only "entry" to the user program. To this regard, I believe we
>>>>>>>> should just see the fact that they have their dedicated
>>> environment as
>>>>>>>> an "implementation detail".
>>>>>>>> - for getting rid of the per-job mode: as a first note, there was
>>>>>>>> already a discussion here:
>>>>>>>>
>>>>>>>
>>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>>> with many people, including myself, expressing their opinion. I am
>>>>>>>> mentioning that to show that this topic already has some history
>>> and
>>>>>>>> the discussin does not start from scratch but there are already
>>> some
>>>>>>>> contradicting opinions. My opinion is that we should not get rid of
>>>>>>>> the per-job mode but I agree that we should discuss about the
>>>>>>>> semantics in more detail. Although in terms of code it may be
>>> tempting
>>>>>>>> to "merge" the two submission modes, one of the main benefits of
>>> the
>>>>>>>> per-job mode is isolation, both for resources and security, as the
>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" just
>>> for
>>>>>>>> that specific graph. This would be violated by having a session
>>>>>>>> cluster launched and having all the infrastrucutre (ports and
>>>>>>>> endpoints) set for submittting to that cluster any job.
>>>>>>>> - for getting rid of the "detached" mode: I agree with getting rid
>>> of
>>>>>>>> it but this implies some potential user-facing changes that should
>>> be
>>>>>>>> discussed.
>>>>>>>>
>>>>>>>> Given the above, I think that:
>>>>>>>> 1) in the context of FLIP-73 we should not change any semantics but
>>>>>>>> simply push the existing submission logic behind a reusable
>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha said.
>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of this
>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
>>>>>>>> discussions rather than the opposite. So let's discuss about the
>>>>>>>> future of the per-job and detached modes in a separate thread. This
>>>>>>>> will also allow to give the proper visibility to such an important
>>>>>>>> topic.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]>
>>> wrote:
>>>>>>>>>
>>>>>>>>> Thanks for your thoughts Aljoscha.
>>>>>>>>>
>>>>>>>>> Another question since FLIP-73 might contains refactors on
>>>>>>> Environemnt:
>>>>>>>>> shall we support
>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user
>>>>>>> perspective
>>>>>>>>> preview plan
>>>>>>>>> is useful, by give visual view, to modify topos and configure
>>> without
>>>>>>>>> submit it.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> tison.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
>>>>>>>>>
>>>>>>>>>> I agree with Till that we should not change the semantics of
>>>>>>> per-job mode.
>>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager)
>>> is
>>>>>>> brought
>>>>>>>>>> up with one job and it only executes that one job. There
>>> should be
>>>>>>> no open
>>>>>>>>>> ports/anything that would allow submitting further jobs. This
>>> is
>>>>>>> very
>>>>>>>>>> important for deployments in docker/Kubernetes or other
>>>>>>> environments were
>>>>>>>>>> you bring up jobs without necessarily having the notion of a
>>> Flink
>>>>>>> cluster.
>>>>>>>>>>
>>>>>>>>>> What this means for a user program that has multiple execute()
>>>>>>> calls is
>>>>>>>>>> that you will get a fresh cluster for each execute call. This
>>> also
>>>>>>> means,
>>>>>>>>>> that further execute() calls will only happen if the “client”
>>> is
>>>>>>> still
>>>>>>>>>> alive, because it is the one driving execution. Currently, this
>>>>>>> only works
>>>>>>>>>> if you start the job in “attached” mode. If you start in
>>>>>>> “detached” mode
>>>>>>>>>> only the first execute() will happen and the rest will be
>>> ignored.
>>>>>>>>>>
>>>>>>>>>> This brings us to the tricky question about what to do about
>>>>>>> “detached”
>>>>>>>>>> and “attached”. In the long run, I would like to get rid of the
>>>>>>> distinction
>>>>>>>>>> and leave it up to the user program, by either blocking or not
>>> on
>>>>>>> the
>>>>>>>>>> Future (or JobClient or whatnot) that job submission returns.
>>> This,
>>>>>>>>>> however, means that users cannot simply request “detached”
>>>>>>> execution when
>>>>>>>>>> using bin/flink, the user program has to “play along”. On the
>>>>>>> other hand,
>>>>>>>>>> “detached” mode is quite strange for the user program. The
>>>>>>> execute() call
>>>>>>>>>> either returns with a proper job result after the job ran (in
>>>>>>> “attached”
>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after
>>>>>>> submission. I
>>>>>>>>>> think this can even lead to weird cases where multiple
>>> "execute()”
>>>>>>> run in
>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the
>>>>>>> first
>>>>>>>>>> execute so the rest (including result processing logic) is
>>> ignored.
>>>>>>>>>>
>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
>>> problems,
>>>>>>> because
>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
>>> reusable
>>>>>>>>>> abstraction and makes it usable via API. We should closely
>>> follow
>>>>>>> up on the
>>>>>>>>>> above points though because I think they are also important.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your clarification Till.
>>>>>>>>>>>
>>>>>>>>>>> I agree with the current semantics of the per-job mode, one
>>>>>>> should
>>>>>>>>>> deploy a
>>>>>>>>>>> new cluster for each part of the job. Apart from the
>>> performance
>>>>>>> concern
>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
>>> cluster
>>>>>>> actually,
>>>>>>>>>>> which is different from the description that Executor submit
>>> a
>>>>>>> job.
>>>>>>>>>>>
>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

tison
Hi Aljoscha,

After clearly narrow the scope of this FLIP it looks good to me the
interface
Executor and its discovery so that I'm glad to see the vote thread.

As you said, we should still discuss on implementation details but I don't
think
it should be a blocker of the vote thread because a vote means we generally
agree on the motivation and overall design.

As for Executor.execute() to be async, it is much better than we keep the
difference between sync/async in this level. But I'd like to note that it
only
works internally for now because user-facing interface is still env.execute
which block and return a JobExecutionResult. I'm afraid that there are
several
people depends on the result for doing post execution process, although it
doesn't
work on current per-job mode.

Best,
tison.


Aljoscha Krettek <[hidden email]> 于2019年10月4日周五 下午4:40写道:

> Do you all think we could agree on the basic executor primitives and start
> voting on this FLIP? There are still some implementation details but I
> think we can discuss/tackle them when we get to them and the various people
> implementing this should be in close collaboration.
>
> Best,
> Aljoscha
>
> > On 4. Oct 2019, at 10:15, Aljoscha Krettek <[hidden email]> wrote:
> >
> > Hi,
> >
> > I think the end goal is to have only one environment per API, but I
> think we won’t be able to achieve that in the short-term because of
> backwards compatibility. This is most notable with the context environment,
> preview environments etc.
> >
> > To keep this FLIP very slim we can make this only about the executors
> and executor discovery. Anything else like job submission semantics,
> detached mode, … can be tackled after this. If we don’t focus I’m afraid
> this will drag on for quite a while.
> >
> > One thing I would like to propose to make this easier is to change
> Executor.execute() to return a CompletableFuture and to completely remove
> the “detached” logic from ClusterClient. That way, the new components make
> no distinction between “detached” and “attached” but we can still do it in
> the CLI (via the ContextEnvironment) to support the existing “detached”
> behaviour of the CLI that users expect. What do you think about this?
> >
> > Best,
> > Aljoscha
> >
> >> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]> wrote:
> >>
> >> Thanks for your explanation Kostas to make it clear subtasks under
> FLIP-73.
> >>
> >> As you described, changes of Environment are included in this FLIP. For
> >> "each
> >> API to have a single Environment", it could be helpful to describe which
> >> APIs we'd
> >> like to have after FLIP-73. And if we keep multiple Environments, shall
> we
> >> keep the
> >> way inject context environment for each API?
> >>
> >>
> >> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
> >>
> >>> Hi Tison,
> >>>
> >>> The changes that this FLIP propose are:
> >>> - the introduction of the Executor interface
> >>> - the fact that everything in the current state of job submission in
> >>> Flink can be defined through configuration parameters
> >>> - implementation of Executors that do not change any of the semantics
> >>> of the currently offered "modes" of job submission
> >>>
> >>> In this, and in the FLIP itself where the
> >>> ExecutionEnvironment.execute() method is described, there are details
> >>> about parts of the
> >>> integration with the existing Flink code-base.
> >>>
> >>> So I am not sure what do you mean by making the "integration a
> >>> follow-up discussion".
> >>>
> >>> Cheers,
> >>> Kostas
> >>>
> >>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]> wrote:
> >>>>
> >>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> >>>> Executors work, as they are using the exexute() method because this is
> >>>> the only "entry" to the user program. To this regard, I believe we
> >>>> should just see the fact that they have their dedicated environment as
> >>>> an "implementation detail".
> >>>>
> >>>> The proposal says
> >>>>
> >>>> In this document, we propose to abstract away from the Environments
> the
> >>> job
> >>>> submission logic and put it in a newly introduced Executor. This will
> >>>> allow *each
> >>>> API to have a single Environment* which, based on the provided
> >>>> configuration, will decide which executor to use, *e.g.* Yarn, Local,
> >>> etc.
> >>>> In addition, it will allow different APIs and downstream projects to
> >>> re-use
> >>>> the provided executors, thus limiting the amount of code duplication
> and
> >>>> the amount of code that has to be written.
> >>>>
> >>>> note that This will allow *each API to have a single Environment*  it
> >>>> seems a bit diverge with you statement above. Or we say a single
> >>> Environment
> >>>> as a possible advantage after the introduction of Executor so that we
> >>>> exclude it
> >>>> from this pass.
> >>>>
> >>>> Best,
> >>>> tison.
> >>>>
> >>>>
> >>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
> >>>>
> >>>>> BTW, correct me if I misunderstand, now I learn more about our
> >>> community
> >>>>> way. Since FLIP-73 aimed at introducing an interface with community
> >>>>> consensus the discussion is more about the interface in order to
> >>> properly
> >>>>> define a useful and extensible API. The integration story could be a
> >>>>> follow up
> >>>>> since this one does not affect current behavior at all.
> >>>>>
> >>>>> Best,
> >>>>> tison.
> >>>>>
> >>>>>
> >>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
> >>>>>
> >>>>>> Hi Kostas,
> >>>>>>
> >>>>>> It seems does no harm we have a configuration parameter of
> >>>>>> Executor#execute
> >>>>>> since we can merge this one with the one configured on Executor
> >>> created
> >>>>>> and
> >>>>>> let this one overwhelm that one.
> >>>>>>
> >>>>>> I can see it is useful that conceptually we can create an Executor
> >>> for a
> >>>>>> series jobs
> >>>>>> to the same cluster but with different job configuration per
> pipeline.
> >>>>>>
> >>>>>> Best,
> >>>>>> tison.
> >>>>>>
> >>>>>>
> >>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
> >>>>>>
> >>>>>>> Hi again,
> >>>>>>>
> >>>>>>> I did not include this to my previous email, as this is related to
> >>> the
> >>>>>>> proposal on the FLIP itself.
> >>>>>>>
> >>>>>>> In the existing proposal, the Executor interface is the following.
> >>>>>>>
> >>>>>>> public interface Executor {
> >>>>>>>
> >>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception;
> >>>>>>>
> >>>>>>> }
> >>>>>>>
> >>>>>>> This implies that all the necessary information for the execution
> of
> >>> a
> >>>>>>> Pipeline should be included in the Configuration passed in the
> >>>>>>> ExecutorFactory which instantiates the Executor itself. This should
> >>>>>>> include, for example, all the parameters currently supplied by the
> >>>>>>> ProgramOptions, which are conceptually not executor parameters but
> >>>>>>> rather parameters for the execution of the specific pipeline. To
> this
> >>>>>>> end, I would like to propose a change in the current Executor
> >>>>>>> interface showcased below:
> >>>>>>>
> >>>>>>>
> >>>>>>> public interface Executor {
> >>>>>>>
> >>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
> >>>>>>> executionOptions) throws Exception;
> >>>>>>>
> >>>>>>> }
> >>>>>>>
> >>>>>>> The above will allow to have the Executor specific options passed
> in
> >>>>>>> the configuration given during executor instantiation, while the
> >>>>>>> pipeline specific options can be passed in the executionOptions.
> As a
> >>>>>>> positive side-effect, this will make Executors re-usable, i.e.
> >>>>>>> instantiate an executor and use it to execute multiple pipelines,
> if
> >>>>>>> in the future we choose to do so.
> >>>>>>>
> >>>>>>> Let me know what do you think,
> >>>>>>> Kostas
> >>>>>>>
> >>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]
> >
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I agree with Tison that we should disentangle threads so that
> >>> people
> >>>>>>>> can work independently.
> >>>>>>>>
> >>>>>>>> For FLIP-73:
> >>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> >>>>>>>> Executors work, as they are using the exexute() method because
> >>> this is
> >>>>>>>> the only "entry" to the user program. To this regard, I believe we
> >>>>>>>> should just see the fact that they have their dedicated
> >>> environment as
> >>>>>>>> an "implementation detail".
> >>>>>>>> - for getting rid of the per-job mode: as a first note, there was
> >>>>>>>> already a discussion here:
> >>>>>>>>
> >>>>>>>
> >>>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >>>>>>>> with many people, including myself, expressing their opinion. I am
> >>>>>>>> mentioning that to show that this topic already has some history
> >>> and
> >>>>>>>> the discussin does not start from scratch but there are already
> >>> some
> >>>>>>>> contradicting opinions. My opinion is that we should not get rid
> of
> >>>>>>>> the per-job mode but I agree that we should discuss about the
> >>>>>>>> semantics in more detail. Although in terms of code it may be
> >>> tempting
> >>>>>>>> to "merge" the two submission modes, one of the main benefits of
> >>> the
> >>>>>>>> per-job mode is isolation, both for resources and security, as the
> >>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" just
> >>> for
> >>>>>>>> that specific graph. This would be violated by having a session
> >>>>>>>> cluster launched and having all the infrastrucutre (ports and
> >>>>>>>> endpoints) set for submittting to that cluster any job.
> >>>>>>>> - for getting rid of the "detached" mode: I agree with getting rid
> >>> of
> >>>>>>>> it but this implies some potential user-facing changes that should
> >>> be
> >>>>>>>> discussed.
> >>>>>>>>
> >>>>>>>> Given the above, I think that:
> >>>>>>>> 1) in the context of FLIP-73 we should not change any semantics
> but
> >>>>>>>> simply push the existing submission logic behind a reusable
> >>>>>>>> abstraction and make it usable via public APIs, as Aljoscha said.
> >>>>>>>> 2) as Till said, changing the semantics is beyond the scope of
> this
> >>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
> >>>>>>>> discussions rather than the opposite. So let's discuss about the
> >>>>>>>> future of the per-job and detached modes in a separate thread.
> This
> >>>>>>>> will also allow to give the proper visibility to such an important
> >>>>>>>> topic.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Kostas
> >>>>>>>>
> >>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]>
> >>> wrote:
> >>>>>>>>>
> >>>>>>>>> Thanks for your thoughts Aljoscha.
> >>>>>>>>>
> >>>>>>>>> Another question since FLIP-73 might contains refactors on
> >>>>>>> Environemnt:
> >>>>>>>>> shall we support
> >>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user
> >>>>>>> perspective
> >>>>>>>>> preview plan
> >>>>>>>>> is useful, by give visual view, to modify topos and configure
> >>> without
> >>>>>>>>> submit it.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> tison.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
> >>>>>>>>>
> >>>>>>>>>> I agree with Till that we should not change the semantics of
> >>>>>>> per-job mode.
> >>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager)
> >>> is
> >>>>>>> brought
> >>>>>>>>>> up with one job and it only executes that one job. There
> >>> should be
> >>>>>>> no open
> >>>>>>>>>> ports/anything that would allow submitting further jobs. This
> >>> is
> >>>>>>> very
> >>>>>>>>>> important for deployments in docker/Kubernetes or other
> >>>>>>> environments were
> >>>>>>>>>> you bring up jobs without necessarily having the notion of a
> >>> Flink
> >>>>>>> cluster.
> >>>>>>>>>>
> >>>>>>>>>> What this means for a user program that has multiple execute()
> >>>>>>> calls is
> >>>>>>>>>> that you will get a fresh cluster for each execute call. This
> >>> also
> >>>>>>> means,
> >>>>>>>>>> that further execute() calls will only happen if the “client”
> >>> is
> >>>>>>> still
> >>>>>>>>>> alive, because it is the one driving execution. Currently, this
> >>>>>>> only works
> >>>>>>>>>> if you start the job in “attached” mode. If you start in
> >>>>>>> “detached” mode
> >>>>>>>>>> only the first execute() will happen and the rest will be
> >>> ignored.
> >>>>>>>>>>
> >>>>>>>>>> This brings us to the tricky question about what to do about
> >>>>>>> “detached”
> >>>>>>>>>> and “attached”. In the long run, I would like to get rid of the
> >>>>>>> distinction
> >>>>>>>>>> and leave it up to the user program, by either blocking or not
> >>> on
> >>>>>>> the
> >>>>>>>>>> Future (or JobClient or whatnot) that job submission returns.
> >>> This,
> >>>>>>>>>> however, means that users cannot simply request “detached”
> >>>>>>> execution when
> >>>>>>>>>> using bin/flink, the user program has to “play along”. On the
> >>>>>>> other hand,
> >>>>>>>>>> “detached” mode is quite strange for the user program. The
> >>>>>>> execute() call
> >>>>>>>>>> either returns with a proper job result after the job ran (in
> >>>>>>> “attached”
> >>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after
> >>>>>>> submission. I
> >>>>>>>>>> think this can even lead to weird cases where multiple
> >>> "execute()”
> >>>>>>> run in
> >>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the
> >>>>>>> first
> >>>>>>>>>> execute so the rest (including result processing logic) is
> >>> ignored.
> >>>>>>>>>>
> >>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
> >>> problems,
> >>>>>>> because
> >>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
> >>> reusable
> >>>>>>>>>> abstraction and makes it usable via API. We should closely
> >>> follow
> >>>>>>> up on the
> >>>>>>>>>> above points though because I think they are also important.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>
> >>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
> >>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for your clarification Till.
> >>>>>>>>>>>
> >>>>>>>>>>> I agree with the current semantics of the per-job mode, one
> >>>>>>> should
> >>>>>>>>>> deploy a
> >>>>>>>>>>> new cluster for each part of the job. Apart from the
> >>> performance
> >>>>>>> concern
> >>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
> >>> cluster
> >>>>>>> actually,
> >>>>>>>>>>> which is different from the description that Executor submit
> >>> a
> >>>>>>> job.
> >>>>>>>>>>>
> >>>>>>>>>>> Anyway it sounds workable and narrow the changes.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

Aljoscha Krettek-2
Hi Tison,

I agree, for now the async Executor.execute() is an internal detail but during your work for FLIP-74 it will probably also reach the public API.

Best,
Aljoscha

> On 4. Oct 2019, at 11:39, Zili Chen <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> After clearly narrow the scope of this FLIP it looks good to me the
> interface
> Executor and its discovery so that I'm glad to see the vote thread.
>
> As you said, we should still discuss on implementation details but I don't
> think
> it should be a blocker of the vote thread because a vote means we generally
> agree on the motivation and overall design.
>
> As for Executor.execute() to be async, it is much better than we keep the
> difference between sync/async in this level. But I'd like to note that it
> only
> works internally for now because user-facing interface is still env.execute
> which block and return a JobExecutionResult. I'm afraid that there are
> several
> people depends on the result for doing post execution process, although it
> doesn't
> work on current per-job mode.
>
> Best,
> tison.
>
>
> Aljoscha Krettek <[hidden email]> 于2019年10月4日周五 下午4:40写道:
>
>> Do you all think we could agree on the basic executor primitives and start
>> voting on this FLIP? There are still some implementation details but I
>> think we can discuss/tackle them when we get to them and the various people
>> implementing this should be in close collaboration.
>>
>> Best,
>> Aljoscha
>>
>>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> I think the end goal is to have only one environment per API, but I
>> think we won’t be able to achieve that in the short-term because of
>> backwards compatibility. This is most notable with the context environment,
>> preview environments etc.
>>>
>>> To keep this FLIP very slim we can make this only about the executors
>> and executor discovery. Anything else like job submission semantics,
>> detached mode, … can be tackled after this. If we don’t focus I’m afraid
>> this will drag on for quite a while.
>>>
>>> One thing I would like to propose to make this easier is to change
>> Executor.execute() to return a CompletableFuture and to completely remove
>> the “detached” logic from ClusterClient. That way, the new components make
>> no distinction between “detached” and “attached” but we can still do it in
>> the CLI (via the ContextEnvironment) to support the existing “detached”
>> behaviour of the CLI that users expect. What do you think about this?
>>>
>>> Best,
>>> Aljoscha
>>>
>>>> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]> wrote:
>>>>
>>>> Thanks for your explanation Kostas to make it clear subtasks under
>> FLIP-73.
>>>>
>>>> As you described, changes of Environment are included in this FLIP. For
>>>> "each
>>>> API to have a single Environment", it could be helpful to describe which
>>>> APIs we'd
>>>> like to have after FLIP-73. And if we keep multiple Environments, shall
>> we
>>>> keep the
>>>> way inject context environment for each API?
>>>>
>>>>
>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
>>>>
>>>>> Hi Tison,
>>>>>
>>>>> The changes that this FLIP propose are:
>>>>> - the introduction of the Executor interface
>>>>> - the fact that everything in the current state of job submission in
>>>>> Flink can be defined through configuration parameters
>>>>> - implementation of Executors that do not change any of the semantics
>>>>> of the currently offered "modes" of job submission
>>>>>
>>>>> In this, and in the FLIP itself where the
>>>>> ExecutionEnvironment.execute() method is described, there are details
>>>>> about parts of the
>>>>> integration with the existing Flink code-base.
>>>>>
>>>>> So I am not sure what do you mean by making the "integration a
>>>>> follow-up discussion".
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]> wrote:
>>>>>>
>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>>>> Executors work, as they are using the exexute() method because this is
>>>>>> the only "entry" to the user program. To this regard, I believe we
>>>>>> should just see the fact that they have their dedicated environment as
>>>>>> an "implementation detail".
>>>>>>
>>>>>> The proposal says
>>>>>>
>>>>>> In this document, we propose to abstract away from the Environments
>> the
>>>>> job
>>>>>> submission logic and put it in a newly introduced Executor. This will
>>>>>> allow *each
>>>>>> API to have a single Environment* which, based on the provided
>>>>>> configuration, will decide which executor to use, *e.g.* Yarn, Local,
>>>>> etc.
>>>>>> In addition, it will allow different APIs and downstream projects to
>>>>> re-use
>>>>>> the provided executors, thus limiting the amount of code duplication
>> and
>>>>>> the amount of code that has to be written.
>>>>>>
>>>>>> note that This will allow *each API to have a single Environment*  it
>>>>>> seems a bit diverge with you statement above. Or we say a single
>>>>> Environment
>>>>>> as a possible advantage after the introduction of Executor so that we
>>>>>> exclude it
>>>>>> from this pass.
>>>>>>
>>>>>> Best,
>>>>>> tison.
>>>>>>
>>>>>>
>>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
>>>>>>
>>>>>>> BTW, correct me if I misunderstand, now I learn more about our
>>>>> community
>>>>>>> way. Since FLIP-73 aimed at introducing an interface with community
>>>>>>> consensus the discussion is more about the interface in order to
>>>>> properly
>>>>>>> define a useful and extensible API. The integration story could be a
>>>>>>> follow up
>>>>>>> since this one does not affect current behavior at all.
>>>>>>>
>>>>>>> Best,
>>>>>>> tison.
>>>>>>>
>>>>>>>
>>>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
>>>>>>>
>>>>>>>> Hi Kostas,
>>>>>>>>
>>>>>>>> It seems does no harm we have a configuration parameter of
>>>>>>>> Executor#execute
>>>>>>>> since we can merge this one with the one configured on Executor
>>>>> created
>>>>>>>> and
>>>>>>>> let this one overwhelm that one.
>>>>>>>>
>>>>>>>> I can see it is useful that conceptually we can create an Executor
>>>>> for a
>>>>>>>> series jobs
>>>>>>>> to the same cluster but with different job configuration per
>> pipeline.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> tison.
>>>>>>>>
>>>>>>>>
>>>>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
>>>>>>>>
>>>>>>>>> Hi again,
>>>>>>>>>
>>>>>>>>> I did not include this to my previous email, as this is related to
>>>>> the
>>>>>>>>> proposal on the FLIP itself.
>>>>>>>>>
>>>>>>>>> In the existing proposal, the Executor interface is the following.
>>>>>>>>>
>>>>>>>>> public interface Executor {
>>>>>>>>>
>>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> This implies that all the necessary information for the execution
>> of
>>>>> a
>>>>>>>>> Pipeline should be included in the Configuration passed in the
>>>>>>>>> ExecutorFactory which instantiates the Executor itself. This should
>>>>>>>>> include, for example, all the parameters currently supplied by the
>>>>>>>>> ProgramOptions, which are conceptually not executor parameters but
>>>>>>>>> rather parameters for the execution of the specific pipeline. To
>> this
>>>>>>>>> end, I would like to propose a change in the current Executor
>>>>>>>>> interface showcased below:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public interface Executor {
>>>>>>>>>
>>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
>>>>>>>>> executionOptions) throws Exception;
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> The above will allow to have the Executor specific options passed
>> in
>>>>>>>>> the configuration given during executor instantiation, while the
>>>>>>>>> pipeline specific options can be passed in the executionOptions.
>> As a
>>>>>>>>> positive side-effect, this will make Executors re-usable, i.e.
>>>>>>>>> instantiate an executor and use it to execute multiple pipelines,
>> if
>>>>>>>>> in the future we choose to do so.
>>>>>>>>>
>>>>>>>>> Let me know what do you think,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <[hidden email]
>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I agree with Tison that we should disentangle threads so that
>>>>> people
>>>>>>>>>> can work independently.
>>>>>>>>>>
>>>>>>>>>> For FLIP-73:
>>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>>>>>>>> Executors work, as they are using the exexute() method because
>>>>> this is
>>>>>>>>>> the only "entry" to the user program. To this regard, I believe we
>>>>>>>>>> should just see the fact that they have their dedicated
>>>>> environment as
>>>>>>>>>> an "implementation detail".
>>>>>>>>>> - for getting rid of the per-job mode: as a first note, there was
>>>>>>>>>> already a discussion here:
>>>>>>>>>>
>>>>>>>>>
>>>>>
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>>>>> with many people, including myself, expressing their opinion. I am
>>>>>>>>>> mentioning that to show that this topic already has some history
>>>>> and
>>>>>>>>>> the discussin does not start from scratch but there are already
>>>>> some
>>>>>>>>>> contradicting opinions. My opinion is that we should not get rid
>> of
>>>>>>>>>> the per-job mode but I agree that we should discuss about the
>>>>>>>>>> semantics in more detail. Although in terms of code it may be
>>>>> tempting
>>>>>>>>>> to "merge" the two submission modes, one of the main benefits of
>>>>> the
>>>>>>>>>> per-job mode is isolation, both for resources and security, as the
>>>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" just
>>>>> for
>>>>>>>>>> that specific graph. This would be violated by having a session
>>>>>>>>>> cluster launched and having all the infrastrucutre (ports and
>>>>>>>>>> endpoints) set for submittting to that cluster any job.
>>>>>>>>>> - for getting rid of the "detached" mode: I agree with getting rid
>>>>> of
>>>>>>>>>> it but this implies some potential user-facing changes that should
>>>>> be
>>>>>>>>>> discussed.
>>>>>>>>>>
>>>>>>>>>> Given the above, I think that:
>>>>>>>>>> 1) in the context of FLIP-73 we should not change any semantics
>> but
>>>>>>>>>> simply push the existing submission logic behind a reusable
>>>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha said.
>>>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of
>> this
>>>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
>>>>>>>>>> discussions rather than the opposite. So let's discuss about the
>>>>>>>>>> future of the per-job and detached modes in a separate thread.
>> This
>>>>>>>>>> will also allow to give the proper visibility to such an important
>>>>>>>>>> topic.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]>
>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your thoughts Aljoscha.
>>>>>>>>>>>
>>>>>>>>>>> Another question since FLIP-73 might contains refactors on
>>>>>>>>> Environemnt:
>>>>>>>>>>> shall we support
>>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user
>>>>>>>>> perspective
>>>>>>>>>>> preview plan
>>>>>>>>>>> is useful, by give visual view, to modify topos and configure
>>>>> without
>>>>>>>>>>> submit it.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> tison.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三 下午10:10写道:
>>>>>>>>>>>
>>>>>>>>>>>> I agree with Till that we should not change the semantics of
>>>>>>>>> per-job mode.
>>>>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager)
>>>>> is
>>>>>>>>> brought
>>>>>>>>>>>> up with one job and it only executes that one job. There
>>>>> should be
>>>>>>>>> no open
>>>>>>>>>>>> ports/anything that would allow submitting further jobs. This
>>>>> is
>>>>>>>>> very
>>>>>>>>>>>> important for deployments in docker/Kubernetes or other
>>>>>>>>> environments were
>>>>>>>>>>>> you bring up jobs without necessarily having the notion of a
>>>>> Flink
>>>>>>>>> cluster.
>>>>>>>>>>>>
>>>>>>>>>>>> What this means for a user program that has multiple execute()
>>>>>>>>> calls is
>>>>>>>>>>>> that you will get a fresh cluster for each execute call. This
>>>>> also
>>>>>>>>> means,
>>>>>>>>>>>> that further execute() calls will only happen if the “client”
>>>>> is
>>>>>>>>> still
>>>>>>>>>>>> alive, because it is the one driving execution. Currently, this
>>>>>>>>> only works
>>>>>>>>>>>> if you start the job in “attached” mode. If you start in
>>>>>>>>> “detached” mode
>>>>>>>>>>>> only the first execute() will happen and the rest will be
>>>>> ignored.
>>>>>>>>>>>>
>>>>>>>>>>>> This brings us to the tricky question about what to do about
>>>>>>>>> “detached”
>>>>>>>>>>>> and “attached”. In the long run, I would like to get rid of the
>>>>>>>>> distinction
>>>>>>>>>>>> and leave it up to the user program, by either blocking or not
>>>>> on
>>>>>>>>> the
>>>>>>>>>>>> Future (or JobClient or whatnot) that job submission returns.
>>>>> This,
>>>>>>>>>>>> however, means that users cannot simply request “detached”
>>>>>>>>> execution when
>>>>>>>>>>>> using bin/flink, the user program has to “play along”. On the
>>>>>>>>> other hand,
>>>>>>>>>>>> “detached” mode is quite strange for the user program. The
>>>>>>>>> execute() call
>>>>>>>>>>>> either returns with a proper job result after the job ran (in
>>>>>>>>> “attached”
>>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after
>>>>>>>>> submission. I
>>>>>>>>>>>> think this can even lead to weird cases where multiple
>>>>> "execute()”
>>>>>>>>> run in
>>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the
>>>>>>>>> first
>>>>>>>>>>>> execute so the rest (including result processing logic) is
>>>>> ignored.
>>>>>>>>>>>>
>>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
>>>>> problems,
>>>>>>>>> because
>>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
>>>>> reusable
>>>>>>>>>>>> abstraction and makes it usable via API. We should closely
>>>>> follow
>>>>>>>>> up on the
>>>>>>>>>>>> above points though because I think they are also important.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>
>>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your clarification Till.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I agree with the current semantics of the per-job mode, one
>>>>>>>>> should
>>>>>>>>>>>> deploy a
>>>>>>>>>>>>> new cluster for each part of the job. Apart from the
>>>>> performance
>>>>>>>>> concern
>>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
>>>>> cluster
>>>>>>>>> actually,
>>>>>>>>>>>>> which is different from the description that Executor submit
>>>>> a
>>>>>>>>> job.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>
>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

Thomas Weise
It might be useful to mention on FLIP-73 that the intention for
Executor.execute is to be an asynchronous API once it becomes public and
also refer to FLIP-74 as such.


On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek <[hidden email]> wrote:

> Hi Tison,
>
> I agree, for now the async Executor.execute() is an internal detail but
> during your work for FLIP-74 it will probably also reach the public API.
>
> Best,
> Aljoscha
>
> > On 4. Oct 2019, at 11:39, Zili Chen <[hidden email]> wrote:
> >
> > Hi Aljoscha,
> >
> > After clearly narrow the scope of this FLIP it looks good to me the
> > interface
> > Executor and its discovery so that I'm glad to see the vote thread.
> >
> > As you said, we should still discuss on implementation details but I
> don't
> > think
> > it should be a blocker of the vote thread because a vote means we
> generally
> > agree on the motivation and overall design.
> >
> > As for Executor.execute() to be async, it is much better than we keep the
> > difference between sync/async in this level. But I'd like to note that it
> > only
> > works internally for now because user-facing interface is still
> env.execute
> > which block and return a JobExecutionResult. I'm afraid that there are
> > several
> > people depends on the result for doing post execution process, although
> it
> > doesn't
> > work on current per-job mode.
> >
> > Best,
> > tison.
> >
> >
> > Aljoscha Krettek <[hidden email]> 于2019年10月4日周五 下午4:40写道:
> >
> >> Do you all think we could agree on the basic executor primitives and
> start
> >> voting on this FLIP? There are still some implementation details but I
> >> think we can discuss/tackle them when we get to them and the various
> people
> >> implementing this should be in close collaboration.
> >>
> >> Best,
> >> Aljoscha
> >>
> >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <[hidden email]>
> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I think the end goal is to have only one environment per API, but I
> >> think we won’t be able to achieve that in the short-term because of
> >> backwards compatibility. This is most notable with the context
> environment,
> >> preview environments etc.
> >>>
> >>> To keep this FLIP very slim we can make this only about the executors
> >> and executor discovery. Anything else like job submission semantics,
> >> detached mode, … can be tackled after this. If we don’t focus I’m afraid
> >> this will drag on for quite a while.
> >>>
> >>> One thing I would like to propose to make this easier is to change
> >> Executor.execute() to return a CompletableFuture and to completely
> remove
> >> the “detached” logic from ClusterClient. That way, the new components
> make
> >> no distinction between “detached” and “attached” but we can still do it
> in
> >> the CLI (via the ContextEnvironment) to support the existing “detached”
> >> behaviour of the CLI that users expect. What do you think about this?
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>>> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]> wrote:
> >>>>
> >>>> Thanks for your explanation Kostas to make it clear subtasks under
> >> FLIP-73.
> >>>>
> >>>> As you described, changes of Environment are included in this FLIP.
> For
> >>>> "each
> >>>> API to have a single Environment", it could be helpful to describe
> which
> >>>> APIs we'd
> >>>> like to have after FLIP-73. And if we keep multiple Environments,
> shall
> >> we
> >>>> keep the
> >>>> way inject context environment for each API?
> >>>>
> >>>>
> >>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
> >>>>
> >>>>> Hi Tison,
> >>>>>
> >>>>> The changes that this FLIP propose are:
> >>>>> - the introduction of the Executor interface
> >>>>> - the fact that everything in the current state of job submission in
> >>>>> Flink can be defined through configuration parameters
> >>>>> - implementation of Executors that do not change any of the semantics
> >>>>> of the currently offered "modes" of job submission
> >>>>>
> >>>>> In this, and in the FLIP itself where the
> >>>>> ExecutionEnvironment.execute() method is described, there are details
> >>>>> about parts of the
> >>>>> integration with the existing Flink code-base.
> >>>>>
> >>>>> So I am not sure what do you mean by making the "integration a
> >>>>> follow-up discussion".
> >>>>>
> >>>>> Cheers,
> >>>>> Kostas
> >>>>>
> >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]>
> wrote:
> >>>>>>
> >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> >>>>>> Executors work, as they are using the exexute() method because this
> is
> >>>>>> the only "entry" to the user program. To this regard, I believe we
> >>>>>> should just see the fact that they have their dedicated environment
> as
> >>>>>> an "implementation detail".
> >>>>>>
> >>>>>> The proposal says
> >>>>>>
> >>>>>> In this document, we propose to abstract away from the Environments
> >> the
> >>>>> job
> >>>>>> submission logic and put it in a newly introduced Executor. This
> will
> >>>>>> allow *each
> >>>>>> API to have a single Environment* which, based on the provided
> >>>>>> configuration, will decide which executor to use, *e.g.* Yarn,
> Local,
> >>>>> etc.
> >>>>>> In addition, it will allow different APIs and downstream projects to
> >>>>> re-use
> >>>>>> the provided executors, thus limiting the amount of code duplication
> >> and
> >>>>>> the amount of code that has to be written.
> >>>>>>
> >>>>>> note that This will allow *each API to have a single Environment*
> it
> >>>>>> seems a bit diverge with you statement above. Or we say a single
> >>>>> Environment
> >>>>>> as a possible advantage after the introduction of Executor so that
> we
> >>>>>> exclude it
> >>>>>> from this pass.
> >>>>>>
> >>>>>> Best,
> >>>>>> tison.
> >>>>>>
> >>>>>>
> >>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
> >>>>>>
> >>>>>>> BTW, correct me if I misunderstand, now I learn more about our
> >>>>> community
> >>>>>>> way. Since FLIP-73 aimed at introducing an interface with community
> >>>>>>> consensus the discussion is more about the interface in order to
> >>>>> properly
> >>>>>>> define a useful and extensible API. The integration story could be
> a
> >>>>>>> follow up
> >>>>>>> since this one does not affect current behavior at all.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> tison.
> >>>>>>>
> >>>>>>>
> >>>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
> >>>>>>>
> >>>>>>>> Hi Kostas,
> >>>>>>>>
> >>>>>>>> It seems does no harm we have a configuration parameter of
> >>>>>>>> Executor#execute
> >>>>>>>> since we can merge this one with the one configured on Executor
> >>>>> created
> >>>>>>>> and
> >>>>>>>> let this one overwhelm that one.
> >>>>>>>>
> >>>>>>>> I can see it is useful that conceptually we can create an Executor
> >>>>> for a
> >>>>>>>> series jobs
> >>>>>>>> to the same cluster but with different job configuration per
> >> pipeline.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> tison.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
> >>>>>>>>
> >>>>>>>>> Hi again,
> >>>>>>>>>
> >>>>>>>>> I did not include this to my previous email, as this is related
> to
> >>>>> the
> >>>>>>>>> proposal on the FLIP itself.
> >>>>>>>>>
> >>>>>>>>> In the existing proposal, the Executor interface is the
> following.
> >>>>>>>>>
> >>>>>>>>> public interface Executor {
> >>>>>>>>>
> >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception;
> >>>>>>>>>
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> This implies that all the necessary information for the execution
> >> of
> >>>>> a
> >>>>>>>>> Pipeline should be included in the Configuration passed in the
> >>>>>>>>> ExecutorFactory which instantiates the Executor itself. This
> should
> >>>>>>>>> include, for example, all the parameters currently supplied by
> the
> >>>>>>>>> ProgramOptions, which are conceptually not executor parameters
> but
> >>>>>>>>> rather parameters for the execution of the specific pipeline. To
> >> this
> >>>>>>>>> end, I would like to propose a change in the current Executor
> >>>>>>>>> interface showcased below:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> public interface Executor {
> >>>>>>>>>
> >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
> >>>>>>>>> executionOptions) throws Exception;
> >>>>>>>>>
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> The above will allow to have the Executor specific options passed
> >> in
> >>>>>>>>> the configuration given during executor instantiation, while the
> >>>>>>>>> pipeline specific options can be passed in the executionOptions.
> >> As a
> >>>>>>>>> positive side-effect, this will make Executors re-usable, i.e.
> >>>>>>>>> instantiate an executor and use it to execute multiple pipelines,
> >> if
> >>>>>>>>> in the future we choose to do so.
> >>>>>>>>>
> >>>>>>>>> Let me know what do you think,
> >>>>>>>>> Kostas
> >>>>>>>>>
> >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <
> [hidden email]
> >>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I agree with Tison that we should disentangle threads so that
> >>>>> people
> >>>>>>>>>> can work independently.
> >>>>>>>>>>
> >>>>>>>>>> For FLIP-73:
> >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to
> the
> >>>>>>>>>> Executors work, as they are using the exexute() method because
> >>>>> this is
> >>>>>>>>>> the only "entry" to the user program. To this regard, I believe
> we
> >>>>>>>>>> should just see the fact that they have their dedicated
> >>>>> environment as
> >>>>>>>>>> an "implementation detail".
> >>>>>>>>>> - for getting rid of the per-job mode: as a first note, there
> was
> >>>>>>>>>> already a discussion here:
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>
> >>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> >>>>>>>>>> with many people, including myself, expressing their opinion. I
> am
> >>>>>>>>>> mentioning that to show that this topic already has some history
> >>>>> and
> >>>>>>>>>> the discussin does not start from scratch but there are already
> >>>>> some
> >>>>>>>>>> contradicting opinions. My opinion is that we should not get rid
> >> of
> >>>>>>>>>> the per-job mode but I agree that we should discuss about the
> >>>>>>>>>> semantics in more detail. Although in terms of code it may be
> >>>>> tempting
> >>>>>>>>>> to "merge" the two submission modes, one of the main benefits of
> >>>>> the
> >>>>>>>>>> per-job mode is isolation, both for resources and security, as
> the
> >>>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked"
> just
> >>>>> for
> >>>>>>>>>> that specific graph. This would be violated by having a session
> >>>>>>>>>> cluster launched and having all the infrastrucutre (ports and
> >>>>>>>>>> endpoints) set for submittting to that cluster any job.
> >>>>>>>>>> - for getting rid of the "detached" mode: I agree with getting
> rid
> >>>>> of
> >>>>>>>>>> it but this implies some potential user-facing changes that
> should
> >>>>> be
> >>>>>>>>>> discussed.
> >>>>>>>>>>
> >>>>>>>>>> Given the above, I think that:
> >>>>>>>>>> 1) in the context of FLIP-73 we should not change any semantics
> >> but
> >>>>>>>>>> simply push the existing submission logic behind a reusable
> >>>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha
> said.
> >>>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of
> >> this
> >>>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
> >>>>>>>>>> discussions rather than the opposite. So let's discuss about the
> >>>>>>>>>> future of the per-job and detached modes in a separate thread.
> >> This
> >>>>>>>>>> will also allow to give the proper visibility to such an
> important
> >>>>>>>>>> topic.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Kostas
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <[hidden email]>
> >>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for your thoughts Aljoscha.
> >>>>>>>>>>>
> >>>>>>>>>>> Another question since FLIP-73 might contains refactors on
> >>>>>>>>> Environemnt:
> >>>>>>>>>>> shall we support
> >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user
> >>>>>>>>> perspective
> >>>>>>>>>>> preview plan
> >>>>>>>>>>> is useful, by give visual view, to modify topos and configure
> >>>>> without
> >>>>>>>>>>> submit it.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> tison.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三
> 下午10:10写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> I agree with Till that we should not change the semantics of
> >>>>>>>>> per-job mode.
> >>>>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager)
> >>>>> is
> >>>>>>>>> brought
> >>>>>>>>>>>> up with one job and it only executes that one job. There
> >>>>> should be
> >>>>>>>>> no open
> >>>>>>>>>>>> ports/anything that would allow submitting further jobs. This
> >>>>> is
> >>>>>>>>> very
> >>>>>>>>>>>> important for deployments in docker/Kubernetes or other
> >>>>>>>>> environments were
> >>>>>>>>>>>> you bring up jobs without necessarily having the notion of a
> >>>>> Flink
> >>>>>>>>> cluster.
> >>>>>>>>>>>>
> >>>>>>>>>>>> What this means for a user program that has multiple execute()
> >>>>>>>>> calls is
> >>>>>>>>>>>> that you will get a fresh cluster for each execute call. This
> >>>>> also
> >>>>>>>>> means,
> >>>>>>>>>>>> that further execute() calls will only happen if the “client”
> >>>>> is
> >>>>>>>>> still
> >>>>>>>>>>>> alive, because it is the one driving execution. Currently,
> this
> >>>>>>>>> only works
> >>>>>>>>>>>> if you start the job in “attached” mode. If you start in
> >>>>>>>>> “detached” mode
> >>>>>>>>>>>> only the first execute() will happen and the rest will be
> >>>>> ignored.
> >>>>>>>>>>>>
> >>>>>>>>>>>> This brings us to the tricky question about what to do about
> >>>>>>>>> “detached”
> >>>>>>>>>>>> and “attached”. In the long run, I would like to get rid of
> the
> >>>>>>>>> distinction
> >>>>>>>>>>>> and leave it up to the user program, by either blocking or not
> >>>>> on
> >>>>>>>>> the
> >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission returns.
> >>>>> This,
> >>>>>>>>>>>> however, means that users cannot simply request “detached”
> >>>>>>>>> execution when
> >>>>>>>>>>>> using bin/flink, the user program has to “play along”. On the
> >>>>>>>>> other hand,
> >>>>>>>>>>>> “detached” mode is quite strange for the user program. The
> >>>>>>>>> execute() call
> >>>>>>>>>>>> either returns with a proper job result after the job ran (in
> >>>>>>>>> “attached”
> >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after
> >>>>>>>>> submission. I
> >>>>>>>>>>>> think this can even lead to weird cases where multiple
> >>>>> "execute()”
> >>>>>>>>> run in
> >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the
> >>>>>>>>> first
> >>>>>>>>>>>> execute so the rest (including result processing logic) is
> >>>>> ignored.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
> >>>>> problems,
> >>>>>>>>> because
> >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
> >>>>> reusable
> >>>>>>>>>>>> abstraction and makes it usable via API. We should closely
> >>>>> follow
> >>>>>>>>> up on the
> >>>>>>>>>>>> above points though because I think they are also important.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for your clarification Till.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I agree with the current semantics of the per-job mode, one
> >>>>>>>>> should
> >>>>>>>>>>>> deploy a
> >>>>>>>>>>>>> new cluster for each part of the job. Apart from the
> >>>>> performance
> >>>>>>>>> concern
> >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
> >>>>> cluster
> >>>>>>>>> actually,
> >>>>>>>>>>>>> which is different from the description that Executor submit
> >>>>> a
> >>>>>>>>> job.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>
> >>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

tison
Hi Kostas & Aljoscha,

I'm drafting a plan exposing multi-layered clients. It is mainly about
how we distinguish different layers and what clients we're going to
expose.

In FLIP-73 scope I'd like to ask a question that whether or not Executor
becomes a public interface that can be made use of by downstream
project developer? Or it just an internal concept for unifying job
submission?
If it is the latter, I'm feeling multi-layer client topic is totally
independent from
Executor.

Best,
tison.


Thomas Weise <[hidden email]> 于2019年10月5日周六 上午12:17写道:

> It might be useful to mention on FLIP-73 that the intention for
> Executor.execute is to be an asynchronous API once it becomes public and
> also refer to FLIP-74 as such.
>
>
> On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi Tison,
> >
> > I agree, for now the async Executor.execute() is an internal detail but
> > during your work for FLIP-74 it will probably also reach the public API.
> >
> > Best,
> > Aljoscha
> >
> > > On 4. Oct 2019, at 11:39, Zili Chen <[hidden email]> wrote:
> > >
> > > Hi Aljoscha,
> > >
> > > After clearly narrow the scope of this FLIP it looks good to me the
> > > interface
> > > Executor and its discovery so that I'm glad to see the vote thread.
> > >
> > > As you said, we should still discuss on implementation details but I
> > don't
> > > think
> > > it should be a blocker of the vote thread because a vote means we
> > generally
> > > agree on the motivation and overall design.
> > >
> > > As for Executor.execute() to be async, it is much better than we keep
> the
> > > difference between sync/async in this level. But I'd like to note that
> it
> > > only
> > > works internally for now because user-facing interface is still
> > env.execute
> > > which block and return a JobExecutionResult. I'm afraid that there are
> > > several
> > > people depends on the result for doing post execution process, although
> > it
> > > doesn't
> > > work on current per-job mode.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Aljoscha Krettek <[hidden email]> 于2019年10月4日周五 下午4:40写道:
> > >
> > >> Do you all think we could agree on the basic executor primitives and
> > start
> > >> voting on this FLIP? There are still some implementation details but I
> > >> think we can discuss/tackle them when we get to them and the various
> > people
> > >> implementing this should be in close collaboration.
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <[hidden email]>
> > wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> I think the end goal is to have only one environment per API, but I
> > >> think we won’t be able to achieve that in the short-term because of
> > >> backwards compatibility. This is most notable with the context
> > environment,
> > >> preview environments etc.
> > >>>
> > >>> To keep this FLIP very slim we can make this only about the executors
> > >> and executor discovery. Anything else like job submission semantics,
> > >> detached mode, … can be tackled after this. If we don’t focus I’m
> afraid
> > >> this will drag on for quite a while.
> > >>>
> > >>> One thing I would like to propose to make this easier is to change
> > >> Executor.execute() to return a CompletableFuture and to completely
> > remove
> > >> the “detached” logic from ClusterClient. That way, the new components
> > make
> > >> no distinction between “detached” and “attached” but we can still do
> it
> > in
> > >> the CLI (via the ContextEnvironment) to support the existing
> “detached”
> > >> behaviour of the CLI that users expect. What do you think about this?
> > >>>
> > >>> Best,
> > >>> Aljoscha
> > >>>
> > >>>> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]> wrote:
> > >>>>
> > >>>> Thanks for your explanation Kostas to make it clear subtasks under
> > >> FLIP-73.
> > >>>>
> > >>>> As you described, changes of Environment are included in this FLIP.
> > For
> > >>>> "each
> > >>>> API to have a single Environment", it could be helpful to describe
> > which
> > >>>> APIs we'd
> > >>>> like to have after FLIP-73. And if we keep multiple Environments,
> > shall
> > >> we
> > >>>> keep the
> > >>>> way inject context environment for each API?
> > >>>>
> > >>>>
> > >>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
> > >>>>
> > >>>>> Hi Tison,
> > >>>>>
> > >>>>> The changes that this FLIP propose are:
> > >>>>> - the introduction of the Executor interface
> > >>>>> - the fact that everything in the current state of job submission
> in
> > >>>>> Flink can be defined through configuration parameters
> > >>>>> - implementation of Executors that do not change any of the
> semantics
> > >>>>> of the currently offered "modes" of job submission
> > >>>>>
> > >>>>> In this, and in the FLIP itself where the
> > >>>>> ExecutionEnvironment.execute() method is described, there are
> details
> > >>>>> about parts of the
> > >>>>> integration with the existing Flink code-base.
> > >>>>>
> > >>>>> So I am not sure what do you mean by making the "integration a
> > >>>>> follow-up discussion".
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Kostas
> > >>>>>
> > >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]>
> > wrote:
> > >>>>>>
> > >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> > >>>>>> Executors work, as they are using the exexute() method because
> this
> > is
> > >>>>>> the only "entry" to the user program. To this regard, I believe we
> > >>>>>> should just see the fact that they have their dedicated
> environment
> > as
> > >>>>>> an "implementation detail".
> > >>>>>>
> > >>>>>> The proposal says
> > >>>>>>
> > >>>>>> In this document, we propose to abstract away from the
> Environments
> > >> the
> > >>>>> job
> > >>>>>> submission logic and put it in a newly introduced Executor. This
> > will
> > >>>>>> allow *each
> > >>>>>> API to have a single Environment* which, based on the provided
> > >>>>>> configuration, will decide which executor to use, *e.g.* Yarn,
> > Local,
> > >>>>> etc.
> > >>>>>> In addition, it will allow different APIs and downstream projects
> to
> > >>>>> re-use
> > >>>>>> the provided executors, thus limiting the amount of code
> duplication
> > >> and
> > >>>>>> the amount of code that has to be written.
> > >>>>>>
> > >>>>>> note that This will allow *each API to have a single Environment*
> > it
> > >>>>>> seems a bit diverge with you statement above. Or we say a single
> > >>>>> Environment
> > >>>>>> as a possible advantage after the introduction of Executor so that
> > we
> > >>>>>> exclude it
> > >>>>>> from this pass.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> tison.
> > >>>>>>
> > >>>>>>
> > >>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
> > >>>>>>
> > >>>>>>> BTW, correct me if I misunderstand, now I learn more about our
> > >>>>> community
> > >>>>>>> way. Since FLIP-73 aimed at introducing an interface with
> community
> > >>>>>>> consensus the discussion is more about the interface in order to
> > >>>>> properly
> > >>>>>>> define a useful and extensible API. The integration story could
> be
> > a
> > >>>>>>> follow up
> > >>>>>>> since this one does not affect current behavior at all.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> tison.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
> > >>>>>>>
> > >>>>>>>> Hi Kostas,
> > >>>>>>>>
> > >>>>>>>> It seems does no harm we have a configuration parameter of
> > >>>>>>>> Executor#execute
> > >>>>>>>> since we can merge this one with the one configured on Executor
> > >>>>> created
> > >>>>>>>> and
> > >>>>>>>> let this one overwhelm that one.
> > >>>>>>>>
> > >>>>>>>> I can see it is useful that conceptually we can create an
> Executor
> > >>>>> for a
> > >>>>>>>> series jobs
> > >>>>>>>> to the same cluster but with different job configuration per
> > >> pipeline.
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> tison.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
> > >>>>>>>>
> > >>>>>>>>> Hi again,
> > >>>>>>>>>
> > >>>>>>>>> I did not include this to my previous email, as this is related
> > to
> > >>>>> the
> > >>>>>>>>> proposal on the FLIP itself.
> > >>>>>>>>>
> > >>>>>>>>> In the existing proposal, the Executor interface is the
> > following.
> > >>>>>>>>>
> > >>>>>>>>> public interface Executor {
> > >>>>>>>>>
> > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception;
> > >>>>>>>>>
> > >>>>>>>>> }
> > >>>>>>>>>
> > >>>>>>>>> This implies that all the necessary information for the
> execution
> > >> of
> > >>>>> a
> > >>>>>>>>> Pipeline should be included in the Configuration passed in the
> > >>>>>>>>> ExecutorFactory which instantiates the Executor itself. This
> > should
> > >>>>>>>>> include, for example, all the parameters currently supplied by
> > the
> > >>>>>>>>> ProgramOptions, which are conceptually not executor parameters
> > but
> > >>>>>>>>> rather parameters for the execution of the specific pipeline.
> To
> > >> this
> > >>>>>>>>> end, I would like to propose a change in the current Executor
> > >>>>>>>>> interface showcased below:
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> public interface Executor {
> > >>>>>>>>>
> > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
> > >>>>>>>>> executionOptions) throws Exception;
> > >>>>>>>>>
> > >>>>>>>>> }
> > >>>>>>>>>
> > >>>>>>>>> The above will allow to have the Executor specific options
> passed
> > >> in
> > >>>>>>>>> the configuration given during executor instantiation, while
> the
> > >>>>>>>>> pipeline specific options can be passed in the
> executionOptions.
> > >> As a
> > >>>>>>>>> positive side-effect, this will make Executors re-usable, i.e.
> > >>>>>>>>> instantiate an executor and use it to execute multiple
> pipelines,
> > >> if
> > >>>>>>>>> in the future we choose to do so.
> > >>>>>>>>>
> > >>>>>>>>> Let me know what do you think,
> > >>>>>>>>> Kostas
> > >>>>>>>>>
> > >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <
> > [hidden email]
> > >>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> Hi all,
> > >>>>>>>>>>
> > >>>>>>>>>> I agree with Tison that we should disentangle threads so that
> > >>>>> people
> > >>>>>>>>>> can work independently.
> > >>>>>>>>>>
> > >>>>>>>>>> For FLIP-73:
> > >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to
> > the
> > >>>>>>>>>> Executors work, as they are using the exexute() method because
> > >>>>> this is
> > >>>>>>>>>> the only "entry" to the user program. To this regard, I
> believe
> > we
> > >>>>>>>>>> should just see the fact that they have their dedicated
> > >>>>> environment as
> > >>>>>>>>>> an "implementation detail".
> > >>>>>>>>>> - for getting rid of the per-job mode: as a first note, there
> > was
> > >>>>>>>>>> already a discussion here:
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>
> > >>
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > >>>>>>>>>> with many people, including myself, expressing their opinion.
> I
> > am
> > >>>>>>>>>> mentioning that to show that this topic already has some
> history
> > >>>>> and
> > >>>>>>>>>> the discussin does not start from scratch but there are
> already
> > >>>>> some
> > >>>>>>>>>> contradicting opinions. My opinion is that we should not get
> rid
> > >> of
> > >>>>>>>>>> the per-job mode but I agree that we should discuss about the
> > >>>>>>>>>> semantics in more detail. Although in terms of code it may be
> > >>>>> tempting
> > >>>>>>>>>> to "merge" the two submission modes, one of the main benefits
> of
> > >>>>> the
> > >>>>>>>>>> per-job mode is isolation, both for resources and security, as
> > the
> > >>>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked"
> > just
> > >>>>> for
> > >>>>>>>>>> that specific graph. This would be violated by having a
> session
> > >>>>>>>>>> cluster launched and having all the infrastrucutre (ports and
> > >>>>>>>>>> endpoints) set for submittting to that cluster any job.
> > >>>>>>>>>> - for getting rid of the "detached" mode: I agree with getting
> > rid
> > >>>>> of
> > >>>>>>>>>> it but this implies some potential user-facing changes that
> > should
> > >>>>> be
> > >>>>>>>>>> discussed.
> > >>>>>>>>>>
> > >>>>>>>>>> Given the above, I think that:
> > >>>>>>>>>> 1) in the context of FLIP-73 we should not change any
> semantics
> > >> but
> > >>>>>>>>>> simply push the existing submission logic behind a reusable
> > >>>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha
> > said.
> > >>>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of
> > >> this
> > >>>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
> > >>>>>>>>>> discussions rather than the opposite. So let's discuss about
> the
> > >>>>>>>>>> future of the per-job and detached modes in a separate thread.
> > >> This
> > >>>>>>>>>> will also allow to give the proper visibility to such an
> > important
> > >>>>>>>>>> topic.
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> Kostas
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <
> [hidden email]>
> > >>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for your thoughts Aljoscha.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Another question since FLIP-73 might contains refactors on
> > >>>>>>>>> Environemnt:
> > >>>>>>>>>>> shall we support
> > >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a
> user
> > >>>>>>>>> perspective
> > >>>>>>>>>>> preview plan
> > >>>>>>>>>>> is useful, by give visual view, to modify topos and configure
> > >>>>> without
> > >>>>>>>>>>> submit it.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>> tison.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三
> > 下午10:10写道:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> I agree with Till that we should not change the semantics of
> > >>>>>>>>> per-job mode.
> > >>>>>>>>>>>> In my opinion per-job mode means that the cluster
> (JobManager)
> > >>>>> is
> > >>>>>>>>> brought
> > >>>>>>>>>>>> up with one job and it only executes that one job. There
> > >>>>> should be
> > >>>>>>>>> no open
> > >>>>>>>>>>>> ports/anything that would allow submitting further jobs.
> This
> > >>>>> is
> > >>>>>>>>> very
> > >>>>>>>>>>>> important for deployments in docker/Kubernetes or other
> > >>>>>>>>> environments were
> > >>>>>>>>>>>> you bring up jobs without necessarily having the notion of a
> > >>>>> Flink
> > >>>>>>>>> cluster.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> What this means for a user program that has multiple
> execute()
> > >>>>>>>>> calls is
> > >>>>>>>>>>>> that you will get a fresh cluster for each execute call.
> This
> > >>>>> also
> > >>>>>>>>> means,
> > >>>>>>>>>>>> that further execute() calls will only happen if the
> “client”
> > >>>>> is
> > >>>>>>>>> still
> > >>>>>>>>>>>> alive, because it is the one driving execution. Currently,
> > this
> > >>>>>>>>> only works
> > >>>>>>>>>>>> if you start the job in “attached” mode. If you start in
> > >>>>>>>>> “detached” mode
> > >>>>>>>>>>>> only the first execute() will happen and the rest will be
> > >>>>> ignored.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> This brings us to the tricky question about what to do about
> > >>>>>>>>> “detached”
> > >>>>>>>>>>>> and “attached”. In the long run, I would like to get rid of
> > the
> > >>>>>>>>> distinction
> > >>>>>>>>>>>> and leave it up to the user program, by either blocking or
> not
> > >>>>> on
> > >>>>>>>>> the
> > >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission
> returns.
> > >>>>> This,
> > >>>>>>>>>>>> however, means that users cannot simply request “detached”
> > >>>>>>>>> execution when
> > >>>>>>>>>>>> using bin/flink, the user program has to “play along”. On
> the
> > >>>>>>>>> other hand,
> > >>>>>>>>>>>> “detached” mode is quite strange for the user program. The
> > >>>>>>>>> execute() call
> > >>>>>>>>>>>> either returns with a proper job result after the job ran
> (in
> > >>>>>>>>> “attached”
> > >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right
> after
> > >>>>>>>>> submission. I
> > >>>>>>>>>>>> think this can even lead to weird cases where multiple
> > >>>>> "execute()”
> > >>>>>>>>> run in
> > >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of
> the
> > >>>>>>>>> first
> > >>>>>>>>>>>> execute so the rest (including result processing logic) is
> > >>>>> ignored.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
> > >>>>> problems,
> > >>>>>>>>> because
> > >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
> > >>>>> reusable
> > >>>>>>>>>>>> abstraction and makes it usable via API. We should closely
> > >>>>> follow
> > >>>>>>>>> up on the
> > >>>>>>>>>>>> above points though because I think they are also important.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Best,
> > >>>>>>>>>>>> Aljoscha
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for your clarification Till.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I agree with the current semantics of the per-job mode, one
> > >>>>>>>>> should
> > >>>>>>>>>>>> deploy a
> > >>>>>>>>>>>>> new cluster for each part of the job. Apart from the
> > >>>>> performance
> > >>>>>>>>> concern
> > >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
> > >>>>> cluster
> > >>>>>>>>> actually,
> > >>>>>>>>>>>>> which is different from the description that Executor
> submit
> > >>>>> a
> > >>>>>>>>> job.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>
> > >>>
> > >>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

Kostas Kloudas-4
Hi Tison,

I would say that as a first step, and until we see that the interfaces
we introduce cover all intended purposes, we keep the Executors
non-public.
From the previous discussion, I think that in general the Clients are
independent from the Executors, as the Executors simply use the
clients to submit jobs and return a cluster client.

Cheers,
Kostas

On Wed, Oct 9, 2019 at 7:01 PM Zili Chen <[hidden email]> wrote:

>
> Hi Kostas & Aljoscha,
>
> I'm drafting a plan exposing multi-layered clients. It is mainly about
> how we distinguish different layers and what clients we're going to
> expose.
>
> In FLIP-73 scope I'd like to ask a question that whether or not Executor
> becomes a public interface that can be made use of by downstream
> project developer? Or it just an internal concept for unifying job
> submission?
> If it is the latter, I'm feeling multi-layer client topic is totally
> independent from
> Executor.
>
> Best,
> tison.
>
>
> Thomas Weise <[hidden email]> 于2019年10月5日周六 上午12:17写道:
>
> > It might be useful to mention on FLIP-73 that the intention for
> > Executor.execute is to be an asynchronous API once it becomes public and
> > also refer to FLIP-74 as such.
> >
> >
> > On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi Tison,
> > >
> > > I agree, for now the async Executor.execute() is an internal detail but
> > > during your work for FLIP-74 it will probably also reach the public API.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 4. Oct 2019, at 11:39, Zili Chen <[hidden email]> wrote:
> > > >
> > > > Hi Aljoscha,
> > > >
> > > > After clearly narrow the scope of this FLIP it looks good to me the
> > > > interface
> > > > Executor and its discovery so that I'm glad to see the vote thread.
> > > >
> > > > As you said, we should still discuss on implementation details but I
> > > don't
> > > > think
> > > > it should be a blocker of the vote thread because a vote means we
> > > generally
> > > > agree on the motivation and overall design.
> > > >
> > > > As for Executor.execute() to be async, it is much better than we keep
> > the
> > > > difference between sync/async in this level. But I'd like to note that
> > it
> > > > only
> > > > works internally for now because user-facing interface is still
> > > env.execute
> > > > which block and return a JobExecutionResult. I'm afraid that there are
> > > > several
> > > > people depends on the result for doing post execution process, although
> > > it
> > > > doesn't
> > > > work on current per-job mode.
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > Aljoscha Krettek <[hidden email]> 于2019年10月4日周五 下午4:40写道:
> > > >
> > > >> Do you all think we could agree on the basic executor primitives and
> > > start
> > > >> voting on this FLIP? There are still some implementation details but I
> > > >> think we can discuss/tackle them when we get to them and the various
> > > people
> > > >> implementing this should be in close collaboration.
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <[hidden email]>
> > > wrote:
> > > >>>
> > > >>> Hi,
> > > >>>
> > > >>> I think the end goal is to have only one environment per API, but I
> > > >> think we won’t be able to achieve that in the short-term because of
> > > >> backwards compatibility. This is most notable with the context
> > > environment,
> > > >> preview environments etc.
> > > >>>
> > > >>> To keep this FLIP very slim we can make this only about the executors
> > > >> and executor discovery. Anything else like job submission semantics,
> > > >> detached mode, … can be tackled after this. If we don’t focus I’m
> > afraid
> > > >> this will drag on for quite a while.
> > > >>>
> > > >>> One thing I would like to propose to make this easier is to change
> > > >> Executor.execute() to return a CompletableFuture and to completely
> > > remove
> > > >> the “detached” logic from ClusterClient. That way, the new components
> > > make
> > > >> no distinction between “detached” and “attached” but we can still do
> > it
> > > in
> > > >> the CLI (via the ContextEnvironment) to support the existing
> > “detached”
> > > >> behaviour of the CLI that users expect. What do you think about this?
> > > >>>
> > > >>> Best,
> > > >>> Aljoscha
> > > >>>
> > > >>>> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]> wrote:
> > > >>>>
> > > >>>> Thanks for your explanation Kostas to make it clear subtasks under
> > > >> FLIP-73.
> > > >>>>
> > > >>>> As you described, changes of Environment are included in this FLIP.
> > > For
> > > >>>> "each
> > > >>>> API to have a single Environment", it could be helpful to describe
> > > which
> > > >>>> APIs we'd
> > > >>>> like to have after FLIP-73. And if we keep multiple Environments,
> > > shall
> > > >> we
> > > >>>> keep the
> > > >>>> way inject context environment for each API?
> > > >>>>
> > > >>>>
> > > >>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
> > > >>>>
> > > >>>>> Hi Tison,
> > > >>>>>
> > > >>>>> The changes that this FLIP propose are:
> > > >>>>> - the introduction of the Executor interface
> > > >>>>> - the fact that everything in the current state of job submission
> > in
> > > >>>>> Flink can be defined through configuration parameters
> > > >>>>> - implementation of Executors that do not change any of the
> > semantics
> > > >>>>> of the currently offered "modes" of job submission
> > > >>>>>
> > > >>>>> In this, and in the FLIP itself where the
> > > >>>>> ExecutionEnvironment.execute() method is described, there are
> > details
> > > >>>>> about parts of the
> > > >>>>> integration with the existing Flink code-base.
> > > >>>>>
> > > >>>>> So I am not sure what do you mean by making the "integration a
> > > >>>>> follow-up discussion".
> > > >>>>>
> > > >>>>> Cheers,
> > > >>>>> Kostas
> > > >>>>>
> > > >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]>
> > > wrote:
> > > >>>>>>
> > > >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> > > >>>>>> Executors work, as they are using the exexute() method because
> > this
> > > is
> > > >>>>>> the only "entry" to the user program. To this regard, I believe we
> > > >>>>>> should just see the fact that they have their dedicated
> > environment
> > > as
> > > >>>>>> an "implementation detail".
> > > >>>>>>
> > > >>>>>> The proposal says
> > > >>>>>>
> > > >>>>>> In this document, we propose to abstract away from the
> > Environments
> > > >> the
> > > >>>>> job
> > > >>>>>> submission logic and put it in a newly introduced Executor. This
> > > will
> > > >>>>>> allow *each
> > > >>>>>> API to have a single Environment* which, based on the provided
> > > >>>>>> configuration, will decide which executor to use, *e.g.* Yarn,
> > > Local,
> > > >>>>> etc.
> > > >>>>>> In addition, it will allow different APIs and downstream projects
> > to
> > > >>>>> re-use
> > > >>>>>> the provided executors, thus limiting the amount of code
> > duplication
> > > >> and
> > > >>>>>> the amount of code that has to be written.
> > > >>>>>>
> > > >>>>>> note that This will allow *each API to have a single Environment*
> > > it
> > > >>>>>> seems a bit diverge with you statement above. Or we say a single
> > > >>>>> Environment
> > > >>>>>> as a possible advantage after the introduction of Executor so that
> > > we
> > > >>>>>> exclude it
> > > >>>>>> from this pass.
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> tison.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
> > > >>>>>>
> > > >>>>>>> BTW, correct me if I misunderstand, now I learn more about our
> > > >>>>> community
> > > >>>>>>> way. Since FLIP-73 aimed at introducing an interface with
> > community
> > > >>>>>>> consensus the discussion is more about the interface in order to
> > > >>>>> properly
> > > >>>>>>> define a useful and extensible API. The integration story could
> > be
> > > a
> > > >>>>>>> follow up
> > > >>>>>>> since this one does not affect current behavior at all.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> tison.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
> > > >>>>>>>
> > > >>>>>>>> Hi Kostas,
> > > >>>>>>>>
> > > >>>>>>>> It seems does no harm we have a configuration parameter of
> > > >>>>>>>> Executor#execute
> > > >>>>>>>> since we can merge this one with the one configured on Executor
> > > >>>>> created
> > > >>>>>>>> and
> > > >>>>>>>> let this one overwhelm that one.
> > > >>>>>>>>
> > > >>>>>>>> I can see it is useful that conceptually we can create an
> > Executor
> > > >>>>> for a
> > > >>>>>>>> series jobs
> > > >>>>>>>> to the same cluster but with different job configuration per
> > > >> pipeline.
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> tison.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 上午1:37写道:
> > > >>>>>>>>
> > > >>>>>>>>> Hi again,
> > > >>>>>>>>>
> > > >>>>>>>>> I did not include this to my previous email, as this is related
> > > to
> > > >>>>> the
> > > >>>>>>>>> proposal on the FLIP itself.
> > > >>>>>>>>>
> > > >>>>>>>>> In the existing proposal, the Executor interface is the
> > > following.
> > > >>>>>>>>>
> > > >>>>>>>>> public interface Executor {
> > > >>>>>>>>>
> > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception;
> > > >>>>>>>>>
> > > >>>>>>>>> }
> > > >>>>>>>>>
> > > >>>>>>>>> This implies that all the necessary information for the
> > execution
> > > >> of
> > > >>>>> a
> > > >>>>>>>>> Pipeline should be included in the Configuration passed in the
> > > >>>>>>>>> ExecutorFactory which instantiates the Executor itself. This
> > > should
> > > >>>>>>>>> include, for example, all the parameters currently supplied by
> > > the
> > > >>>>>>>>> ProgramOptions, which are conceptually not executor parameters
> > > but
> > > >>>>>>>>> rather parameters for the execution of the specific pipeline.
> > To
> > > >> this
> > > >>>>>>>>> end, I would like to propose a change in the current Executor
> > > >>>>>>>>> interface showcased below:
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> public interface Executor {
> > > >>>>>>>>>
> > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
> > > >>>>>>>>> executionOptions) throws Exception;
> > > >>>>>>>>>
> > > >>>>>>>>> }
> > > >>>>>>>>>
> > > >>>>>>>>> The above will allow to have the Executor specific options
> > passed
> > > >> in
> > > >>>>>>>>> the configuration given during executor instantiation, while
> > the
> > > >>>>>>>>> pipeline specific options can be passed in the
> > executionOptions.
> > > >> As a
> > > >>>>>>>>> positive side-effect, this will make Executors re-usable, i.e.
> > > >>>>>>>>> instantiate an executor and use it to execute multiple
> > pipelines,
> > > >> if
> > > >>>>>>>>> in the future we choose to do so.
> > > >>>>>>>>>
> > > >>>>>>>>> Let me know what do you think,
> > > >>>>>>>>> Kostas
> > > >>>>>>>>>
> > > >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <
> > > [hidden email]
> > > >>>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>> Hi all,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I agree with Tison that we should disentangle threads so that
> > > >>>>> people
> > > >>>>>>>>>> can work independently.
> > > >>>>>>>>>>
> > > >>>>>>>>>> For FLIP-73:
> > > >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to
> > > the
> > > >>>>>>>>>> Executors work, as they are using the exexute() method because
> > > >>>>> this is
> > > >>>>>>>>>> the only "entry" to the user program. To this regard, I
> > believe
> > > we
> > > >>>>>>>>>> should just see the fact that they have their dedicated
> > > >>>>> environment as
> > > >>>>>>>>>> an "implementation detail".
> > > >>>>>>>>>> - for getting rid of the per-job mode: as a first note, there
> > > was
> > > >>>>>>>>>> already a discussion here:
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>
> > > >>
> > >
> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > >>>>>>>>>> with many people, including myself, expressing their opinion.
> > I
> > > am
> > > >>>>>>>>>> mentioning that to show that this topic already has some
> > history
> > > >>>>> and
> > > >>>>>>>>>> the discussin does not start from scratch but there are
> > already
> > > >>>>> some
> > > >>>>>>>>>> contradicting opinions. My opinion is that we should not get
> > rid
> > > >> of
> > > >>>>>>>>>> the per-job mode but I agree that we should discuss about the
> > > >>>>>>>>>> semantics in more detail. Although in terms of code it may be
> > > >>>>> tempting
> > > >>>>>>>>>> to "merge" the two submission modes, one of the main benefits
> > of
> > > >>>>> the
> > > >>>>>>>>>> per-job mode is isolation, both for resources and security, as
> > > the
> > > >>>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked"
> > > just
> > > >>>>> for
> > > >>>>>>>>>> that specific graph. This would be violated by having a
> > session
> > > >>>>>>>>>> cluster launched and having all the infrastrucutre (ports and
> > > >>>>>>>>>> endpoints) set for submittting to that cluster any job.
> > > >>>>>>>>>> - for getting rid of the "detached" mode: I agree with getting
> > > rid
> > > >>>>> of
> > > >>>>>>>>>> it but this implies some potential user-facing changes that
> > > should
> > > >>>>> be
> > > >>>>>>>>>> discussed.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Given the above, I think that:
> > > >>>>>>>>>> 1) in the context of FLIP-73 we should not change any
> > semantics
> > > >> but
> > > >>>>>>>>>> simply push the existing submission logic behind a reusable
> > > >>>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha
> > > said.
> > > >>>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of
> > > >> this
> > > >>>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
> > > >>>>>>>>>> discussions rather than the opposite. So let's discuss about
> > the
> > > >>>>>>>>>> future of the per-job and detached modes in a separate thread.
> > > >> This
> > > >>>>>>>>>> will also allow to give the proper visibility to such an
> > > important
> > > >>>>>>>>>> topic.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Cheers,
> > > >>>>>>>>>> Kostas
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <
> > [hidden email]>
> > > >>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks for your thoughts Aljoscha.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Another question since FLIP-73 might contains refactors on
> > > >>>>>>>>> Environemnt:
> > > >>>>>>>>>>> shall we support
> > > >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a
> > user
> > > >>>>>>>>> perspective
> > > >>>>>>>>>>> preview plan
> > > >>>>>>>>>>> is useful, by give visual view, to modify topos and configure
> > > >>>>> without
> > > >>>>>>>>>>> submit it.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Best,
> > > >>>>>>>>>>> tison.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三
> > > 下午10:10写道:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> I agree with Till that we should not change the semantics of
> > > >>>>>>>>> per-job mode.
> > > >>>>>>>>>>>> In my opinion per-job mode means that the cluster
> > (JobManager)
> > > >>>>> is
> > > >>>>>>>>> brought
> > > >>>>>>>>>>>> up with one job and it only executes that one job. There
> > > >>>>> should be
> > > >>>>>>>>> no open
> > > >>>>>>>>>>>> ports/anything that would allow submitting further jobs.
> > This
> > > >>>>> is
> > > >>>>>>>>> very
> > > >>>>>>>>>>>> important for deployments in docker/Kubernetes or other
> > > >>>>>>>>> environments were
> > > >>>>>>>>>>>> you bring up jobs without necessarily having the notion of a
> > > >>>>> Flink
> > > >>>>>>>>> cluster.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> What this means for a user program that has multiple
> > execute()
> > > >>>>>>>>> calls is
> > > >>>>>>>>>>>> that you will get a fresh cluster for each execute call.
> > This
> > > >>>>> also
> > > >>>>>>>>> means,
> > > >>>>>>>>>>>> that further execute() calls will only happen if the
> > “client”
> > > >>>>> is
> > > >>>>>>>>> still
> > > >>>>>>>>>>>> alive, because it is the one driving execution. Currently,
> > > this
> > > >>>>>>>>> only works
> > > >>>>>>>>>>>> if you start the job in “attached” mode. If you start in
> > > >>>>>>>>> “detached” mode
> > > >>>>>>>>>>>> only the first execute() will happen and the rest will be
> > > >>>>> ignored.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> This brings us to the tricky question about what to do about
> > > >>>>>>>>> “detached”
> > > >>>>>>>>>>>> and “attached”. In the long run, I would like to get rid of
> > > the
> > > >>>>>>>>> distinction
> > > >>>>>>>>>>>> and leave it up to the user program, by either blocking or
> > not
> > > >>>>> on
> > > >>>>>>>>> the
> > > >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission
> > returns.
> > > >>>>> This,
> > > >>>>>>>>>>>> however, means that users cannot simply request “detached”
> > > >>>>>>>>> execution when
> > > >>>>>>>>>>>> using bin/flink, the user program has to “play along”. On
> > the
> > > >>>>>>>>> other hand,
> > > >>>>>>>>>>>> “detached” mode is quite strange for the user program. The
> > > >>>>>>>>> execute() call
> > > >>>>>>>>>>>> either returns with a proper job result after the job ran
> > (in
> > > >>>>>>>>> “attached”
> > > >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right
> > after
> > > >>>>>>>>> submission. I
> > > >>>>>>>>>>>> think this can even lead to weird cases where multiple
> > > >>>>> "execute()”
> > > >>>>>>>>> run in
> > > >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of
> > the
> > > >>>>>>>>> first
> > > >>>>>>>>>>>> execute so the rest (including result processing logic) is
> > > >>>>> ignored.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
> > > >>>>> problems,
> > > >>>>>>>>> because
> > > >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
> > > >>>>> reusable
> > > >>>>>>>>>>>> abstraction and makes it usable via API. We should closely
> > > >>>>> follow
> > > >>>>>>>>> up on the
> > > >>>>>>>>>>>> above points though because I think they are also important.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Best,
> > > >>>>>>>>>>>> Aljoscha
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <[hidden email]>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks for your clarification Till.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> I agree with the current semantics of the per-job mode, one
> > > >>>>>>>>> should
> > > >>>>>>>>>>>> deploy a
> > > >>>>>>>>>>>>> new cluster for each part of the job. Apart from the
> > > >>>>> performance
> > > >>>>>>>>> concern
> > > >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
> > > >>>>> cluster
> > > >>>>>>>>> actually,
> > > >>>>>>>>>>>>> which is different from the description that Executor
> > submit
> > > >>>>> a
> > > >>>>>>>>> job.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

tison
Thanks for your explanation Kostas.

I agree that Clients are independent from the Executors. From
your text I wonder one thing that whether Executor#execute
returns a cluster client or a job client? As discussed previously
I think conceptually it is a job client?

Best,
tison.


Kostas Kloudas <[hidden email]> 于2019年10月10日周四 下午5:08写道:

> Hi Tison,
>
> I would say that as a first step, and until we see that the interfaces
> we introduce cover all intended purposes, we keep the Executors
> non-public.
> From the previous discussion, I think that in general the Clients are
> independent from the Executors, as the Executors simply use the
> clients to submit jobs and return a cluster client.
>
> Cheers,
> Kostas
>
> On Wed, Oct 9, 2019 at 7:01 PM Zili Chen <[hidden email]> wrote:
> >
> > Hi Kostas & Aljoscha,
> >
> > I'm drafting a plan exposing multi-layered clients. It is mainly about
> > how we distinguish different layers and what clients we're going to
> > expose.
> >
> > In FLIP-73 scope I'd like to ask a question that whether or not Executor
> > becomes a public interface that can be made use of by downstream
> > project developer? Or it just an internal concept for unifying job
> > submission?
> > If it is the latter, I'm feeling multi-layer client topic is totally
> > independent from
> > Executor.
> >
> > Best,
> > tison.
> >
> >
> > Thomas Weise <[hidden email]> 于2019年10月5日周六 上午12:17写道:
> >
> > > It might be useful to mention on FLIP-73 that the intention for
> > > Executor.execute is to be an asynchronous API once it becomes public
> and
> > > also refer to FLIP-74 as such.
> > >
> > >
> > > On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek <[hidden email]>
> > > wrote:
> > >
> > > > Hi Tison,
> > > >
> > > > I agree, for now the async Executor.execute() is an internal detail
> but
> > > > during your work for FLIP-74 it will probably also reach the public
> API.
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 4. Oct 2019, at 11:39, Zili Chen <[hidden email]> wrote:
> > > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > After clearly narrow the scope of this FLIP it looks good to me the
> > > > > interface
> > > > > Executor and its discovery so that I'm glad to see the vote thread.
> > > > >
> > > > > As you said, we should still discuss on implementation details but
> I
> > > > don't
> > > > > think
> > > > > it should be a blocker of the vote thread because a vote means we
> > > > generally
> > > > > agree on the motivation and overall design.
> > > > >
> > > > > As for Executor.execute() to be async, it is much better than we
> keep
> > > the
> > > > > difference between sync/async in this level. But I'd like to note
> that
> > > it
> > > > > only
> > > > > works internally for now because user-facing interface is still
> > > > env.execute
> > > > > which block and return a JobExecutionResult. I'm afraid that there
> are
> > > > > several
> > > > > people depends on the result for doing post execution process,
> although
> > > > it
> > > > > doesn't
> > > > > work on current per-job mode.
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > Aljoscha Krettek <[hidden email]> 于2019年10月4日周五 下午4:40写道:
> > > > >
> > > > >> Do you all think we could agree on the basic executor primitives
> and
> > > > start
> > > > >> voting on this FLIP? There are still some implementation details
> but I
> > > > >> think we can discuss/tackle them when we get to them and the
> various
> > > > people
> > > > >> implementing this should be in close collaboration.
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <[hidden email]>
> > > > wrote:
> > > > >>>
> > > > >>> Hi,
> > > > >>>
> > > > >>> I think the end goal is to have only one environment per API,
> but I
> > > > >> think we won’t be able to achieve that in the short-term because
> of
> > > > >> backwards compatibility. This is most notable with the context
> > > > environment,
> > > > >> preview environments etc.
> > > > >>>
> > > > >>> To keep this FLIP very slim we can make this only about the
> executors
> > > > >> and executor discovery. Anything else like job submission
> semantics,
> > > > >> detached mode, … can be tackled after this. If we don’t focus I’m
> > > afraid
> > > > >> this will drag on for quite a while.
> > > > >>>
> > > > >>> One thing I would like to propose to make this easier is to
> change
> > > > >> Executor.execute() to return a CompletableFuture and to completely
> > > > remove
> > > > >> the “detached” logic from ClusterClient. That way, the new
> components
> > > > make
> > > > >> no distinction between “detached” and “attached” but we can still
> do
> > > it
> > > > in
> > > > >> the CLI (via the ContextEnvironment) to support the existing
> > > “detached”
> > > > >> behaviour of the CLI that users expect. What do you think about
> this?
> > > > >>>
> > > > >>> Best,
> > > > >>> Aljoscha
> > > > >>>
> > > > >>>> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]>
> wrote:
> > > > >>>>
> > > > >>>> Thanks for your explanation Kostas to make it clear subtasks
> under
> > > > >> FLIP-73.
> > > > >>>>
> > > > >>>> As you described, changes of Environment are included in this
> FLIP.
> > > > For
> > > > >>>> "each
> > > > >>>> API to have a single Environment", it could be helpful to
> describe
> > > > which
> > > > >>>> APIs we'd
> > > > >>>> like to have after FLIP-73. And if we keep multiple
> Environments,
> > > > shall
> > > > >> we
> > > > >>>> keep the
> > > > >>>> way inject context environment for each API?
> > > > >>>>
> > > > >>>>
> > > > >>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
> > > > >>>>
> > > > >>>>> Hi Tison,
> > > > >>>>>
> > > > >>>>> The changes that this FLIP propose are:
> > > > >>>>> - the introduction of the Executor interface
> > > > >>>>> - the fact that everything in the current state of job
> submission
> > > in
> > > > >>>>> Flink can be defined through configuration parameters
> > > > >>>>> - implementation of Executors that do not change any of the
> > > semantics
> > > > >>>>> of the currently offered "modes" of job submission
> > > > >>>>>
> > > > >>>>> In this, and in the FLIP itself where the
> > > > >>>>> ExecutionEnvironment.execute() method is described, there are
> > > details
> > > > >>>>> about parts of the
> > > > >>>>> integration with the existing Flink code-base.
> > > > >>>>>
> > > > >>>>> So I am not sure what do you mean by making the "integration a
> > > > >>>>> follow-up discussion".
> > > > >>>>>
> > > > >>>>> Cheers,
> > > > >>>>> Kostas
> > > > >>>>>
> > > > >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]
> >
> > > > wrote:
> > > > >>>>>>
> > > > >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal
> to the
> > > > >>>>>> Executors work, as they are using the exexute() method because
> > > this
> > > > is
> > > > >>>>>> the only "entry" to the user program. To this regard, I
> believe we
> > > > >>>>>> should just see the fact that they have their dedicated
> > > environment
> > > > as
> > > > >>>>>> an "implementation detail".
> > > > >>>>>>
> > > > >>>>>> The proposal says
> > > > >>>>>>
> > > > >>>>>> In this document, we propose to abstract away from the
> > > Environments
> > > > >> the
> > > > >>>>> job
> > > > >>>>>> submission logic and put it in a newly introduced Executor.
> This
> > > > will
> > > > >>>>>> allow *each
> > > > >>>>>> API to have a single Environment* which, based on the provided
> > > > >>>>>> configuration, will decide which executor to use, *e.g.* Yarn,
> > > > Local,
> > > > >>>>> etc.
> > > > >>>>>> In addition, it will allow different APIs and downstream
> projects
> > > to
> > > > >>>>> re-use
> > > > >>>>>> the provided executors, thus limiting the amount of code
> > > duplication
> > > > >> and
> > > > >>>>>> the amount of code that has to be written.
> > > > >>>>>>
> > > > >>>>>> note that This will allow *each API to have a single
> Environment*
> > > > it
> > > > >>>>>> seems a bit diverge with you statement above. Or we say a
> single
> > > > >>>>> Environment
> > > > >>>>>> as a possible advantage after the introduction of Executor so
> that
> > > > we
> > > > >>>>>> exclude it
> > > > >>>>>> from this pass.
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> tison.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
> > > > >>>>>>
> > > > >>>>>>> BTW, correct me if I misunderstand, now I learn more about
> our
> > > > >>>>> community
> > > > >>>>>>> way. Since FLIP-73 aimed at introducing an interface with
> > > community
> > > > >>>>>>> consensus the discussion is more about the interface in
> order to
> > > > >>>>> properly
> > > > >>>>>>> define a useful and extensible API. The integration story
> could
> > > be
> > > > a
> > > > >>>>>>> follow up
> > > > >>>>>>> since this one does not affect current behavior at all.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> tison.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Kostas,
> > > > >>>>>>>>
> > > > >>>>>>>> It seems does no harm we have a configuration parameter of
> > > > >>>>>>>> Executor#execute
> > > > >>>>>>>> since we can merge this one with the one configured on
> Executor
> > > > >>>>> created
> > > > >>>>>>>> and
> > > > >>>>>>>> let this one overwhelm that one.
> > > > >>>>>>>>
> > > > >>>>>>>> I can see it is useful that conceptually we can create an
> > > Executor
> > > > >>>>> for a
> > > > >>>>>>>> series jobs
> > > > >>>>>>>> to the same cluster but with different job configuration per
> > > > >> pipeline.
> > > > >>>>>>>>
> > > > >>>>>>>> Best,
> > > > >>>>>>>> tison.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四
> 上午1:37写道:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi again,
> > > > >>>>>>>>>
> > > > >>>>>>>>> I did not include this to my previous email, as this is
> related
> > > > to
> > > > >>>>> the
> > > > >>>>>>>>> proposal on the FLIP itself.
> > > > >>>>>>>>>
> > > > >>>>>>>>> In the existing proposal, the Executor interface is the
> > > > following.
> > > > >>>>>>>>>
> > > > >>>>>>>>> public interface Executor {
> > > > >>>>>>>>>
> > > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws
> Exception;
> > > > >>>>>>>>>
> > > > >>>>>>>>> }
> > > > >>>>>>>>>
> > > > >>>>>>>>> This implies that all the necessary information for the
> > > execution
> > > > >> of
> > > > >>>>> a
> > > > >>>>>>>>> Pipeline should be included in the Configuration passed in
> the
> > > > >>>>>>>>> ExecutorFactory which instantiates the Executor itself.
> This
> > > > should
> > > > >>>>>>>>> include, for example, all the parameters currently
> supplied by
> > > > the
> > > > >>>>>>>>> ProgramOptions, which are conceptually not executor
> parameters
> > > > but
> > > > >>>>>>>>> rather parameters for the execution of the specific
> pipeline.
> > > To
> > > > >> this
> > > > >>>>>>>>> end, I would like to propose a change in the current
> Executor
> > > > >>>>>>>>> interface showcased below:
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> public interface Executor {
> > > > >>>>>>>>>
> > > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
> > > > >>>>>>>>> executionOptions) throws Exception;
> > > > >>>>>>>>>
> > > > >>>>>>>>> }
> > > > >>>>>>>>>
> > > > >>>>>>>>> The above will allow to have the Executor specific options
> > > passed
> > > > >> in
> > > > >>>>>>>>> the configuration given during executor instantiation,
> while
> > > the
> > > > >>>>>>>>> pipeline specific options can be passed in the
> > > executionOptions.
> > > > >> As a
> > > > >>>>>>>>> positive side-effect, this will make Executors re-usable,
> i.e.
> > > > >>>>>>>>> instantiate an executor and use it to execute multiple
> > > pipelines,
> > > > >> if
> > > > >>>>>>>>> in the future we choose to do so.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Let me know what do you think,
> > > > >>>>>>>>> Kostas
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <
> > > > [hidden email]
> > > > >>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Hi all,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I agree with Tison that we should disentangle threads so
> that
> > > > >>>>> people
> > > > >>>>>>>>>> can work independently.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> For FLIP-73:
> > > > >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are
> orthogonal to
> > > > the
> > > > >>>>>>>>>> Executors work, as they are using the exexute() method
> because
> > > > >>>>> this is
> > > > >>>>>>>>>> the only "entry" to the user program. To this regard, I
> > > believe
> > > > we
> > > > >>>>>>>>>> should just see the fact that they have their dedicated
> > > > >>>>> environment as
> > > > >>>>>>>>>> an "implementation detail".
> > > > >>>>>>>>>> - for getting rid of the per-job mode: as a first note,
> there
> > > > was
> > > > >>>>>>>>>> already a discussion here:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > > >>>>>>>>>> with many people, including myself, expressing their
> opinion.
> > > I
> > > > am
> > > > >>>>>>>>>> mentioning that to show that this topic already has some
> > > history
> > > > >>>>> and
> > > > >>>>>>>>>> the discussin does not start from scratch but there are
> > > already
> > > > >>>>> some
> > > > >>>>>>>>>> contradicting opinions. My opinion is that we should not
> get
> > > rid
> > > > >> of
> > > > >>>>>>>>>> the per-job mode but I agree that we should discuss about
> the
> > > > >>>>>>>>>> semantics in more detail. Although in terms of code it
> may be
> > > > >>>>> tempting
> > > > >>>>>>>>>> to "merge" the two submission modes, one of the main
> benefits
> > > of
> > > > >>>>> the
> > > > >>>>>>>>>> per-job mode is isolation, both for resources and
> security, as
> > > > the
> > > > >>>>>>>>>> jobGraph to be executed is fixed and the cluster is
> "locked"
> > > > just
> > > > >>>>> for
> > > > >>>>>>>>>> that specific graph. This would be violated by having a
> > > session
> > > > >>>>>>>>>> cluster launched and having all the infrastrucutre (ports
> and
> > > > >>>>>>>>>> endpoints) set for submittting to that cluster any job.
> > > > >>>>>>>>>> - for getting rid of the "detached" mode: I agree with
> getting
> > > > rid
> > > > >>>>> of
> > > > >>>>>>>>>> it but this implies some potential user-facing changes
> that
> > > > should
> > > > >>>>> be
> > > > >>>>>>>>>> discussed.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Given the above, I think that:
> > > > >>>>>>>>>> 1) in the context of FLIP-73 we should not change any
> > > semantics
> > > > >> but
> > > > >>>>>>>>>> simply push the existing submission logic behind a
> reusable
> > > > >>>>>>>>>> abstraction and make it usable via public APIs, as
> Aljoscha
> > > > said.
> > > > >>>>>>>>>> 2) as Till said, changing the semantics is beyond the
> scope of
> > > > >> this
> > > > >>>>>>>>>> FLIP and as Tison mentioned we should work towards
> decoupling
> > > > >>>>>>>>>> discussions rather than the opposite. So let's discuss
> about
> > > the
> > > > >>>>>>>>>> future of the per-job and detached modes in a separate
> thread.
> > > > >> This
> > > > >>>>>>>>>> will also allow to give the proper visibility to such an
> > > > important
> > > > >>>>>>>>>> topic.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Cheers,
> > > > >>>>>>>>>> Kostas
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <
> > > [hidden email]>
> > > > >>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for your thoughts Aljoscha.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Another question since FLIP-73 might contains refactors
> on
> > > > >>>>>>>>> Environemnt:
> > > > >>>>>>>>>>> shall we support
> > > > >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a
> > > user
> > > > >>>>>>>>> perspective
> > > > >>>>>>>>>>> preview plan
> > > > >>>>>>>>>>> is useful, by give visual view, to modify topos and
> configure
> > > > >>>>> without
> > > > >>>>>>>>>>> submit it.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Best,
> > > > >>>>>>>>>>> tison.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三
> > > > 下午10:10写道:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> I agree with Till that we should not change the
> semantics of
> > > > >>>>>>>>> per-job mode.
> > > > >>>>>>>>>>>> In my opinion per-job mode means that the cluster
> > > (JobManager)
> > > > >>>>> is
> > > > >>>>>>>>> brought
> > > > >>>>>>>>>>>> up with one job and it only executes that one job. There
> > > > >>>>> should be
> > > > >>>>>>>>> no open
> > > > >>>>>>>>>>>> ports/anything that would allow submitting further jobs.
> > > This
> > > > >>>>> is
> > > > >>>>>>>>> very
> > > > >>>>>>>>>>>> important for deployments in docker/Kubernetes or other
> > > > >>>>>>>>> environments were
> > > > >>>>>>>>>>>> you bring up jobs without necessarily having the notion
> of a
> > > > >>>>> Flink
> > > > >>>>>>>>> cluster.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> What this means for a user program that has multiple
> > > execute()
> > > > >>>>>>>>> calls is
> > > > >>>>>>>>>>>> that you will get a fresh cluster for each execute call.
> > > This
> > > > >>>>> also
> > > > >>>>>>>>> means,
> > > > >>>>>>>>>>>> that further execute() calls will only happen if the
> > > “client”
> > > > >>>>> is
> > > > >>>>>>>>> still
> > > > >>>>>>>>>>>> alive, because it is the one driving execution.
> Currently,
> > > > this
> > > > >>>>>>>>> only works
> > > > >>>>>>>>>>>> if you start the job in “attached” mode. If you start in
> > > > >>>>>>>>> “detached” mode
> > > > >>>>>>>>>>>> only the first execute() will happen and the rest will
> be
> > > > >>>>> ignored.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> This brings us to the tricky question about what to do
> about
> > > > >>>>>>>>> “detached”
> > > > >>>>>>>>>>>> and “attached”. In the long run, I would like to get
> rid of
> > > > the
> > > > >>>>>>>>> distinction
> > > > >>>>>>>>>>>> and leave it up to the user program, by either blocking
> or
> > > not
> > > > >>>>> on
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission
> > > returns.
> > > > >>>>> This,
> > > > >>>>>>>>>>>> however, means that users cannot simply request
> “detached”
> > > > >>>>>>>>> execution when
> > > > >>>>>>>>>>>> using bin/flink, the user program has to “play along”.
> On
> > > the
> > > > >>>>>>>>> other hand,
> > > > >>>>>>>>>>>> “detached” mode is quite strange for the user program.
> The
> > > > >>>>>>>>> execute() call
> > > > >>>>>>>>>>>> either returns with a proper job result after the job
> ran
> > > (in
> > > > >>>>>>>>> “attached”
> > > > >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right
> > > after
> > > > >>>>>>>>> submission. I
> > > > >>>>>>>>>>>> think this can even lead to weird cases where multiple
> > > > >>>>> "execute()”
> > > > >>>>>>>>> run in
> > > > >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out
> of
> > > the
> > > > >>>>>>>>> first
> > > > >>>>>>>>>>>> execute so the rest (including result processing logic)
> is
> > > > >>>>> ignored.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
> > > > >>>>> problems,
> > > > >>>>>>>>> because
> > > > >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind
> a
> > > > >>>>> reusable
> > > > >>>>>>>>>>>> abstraction and makes it usable via API. We should
> closely
> > > > >>>>> follow
> > > > >>>>>>>>> up on the
> > > > >>>>>>>>>>>> above points though because I think they are also
> important.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>> Aljoscha
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <
> [hidden email]>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks for your clarification Till.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> I agree with the current semantics of the per-job
> mode, one
> > > > >>>>>>>>> should
> > > > >>>>>>>>>>>> deploy a
> > > > >>>>>>>>>>>>> new cluster for each part of the job. Apart from the
> > > > >>>>> performance
> > > > >>>>>>>>> concern
> > > > >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
> > > > >>>>> cluster
> > > > >>>>>>>>> actually,
> > > > >>>>>>>>>>>>> which is different from the description that Executor
> > > submit
> > > > >>>>> a
> > > > >>>>>>>>> job.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

Kostas Kloudas-4
Yes, it is a JobClient.

Cheers,
Kostas

On Thu, Oct 10, 2019 at 11:26 AM Zili Chen <[hidden email]> wrote:

>
> Thanks for your explanation Kostas.
>
> I agree that Clients are independent from the Executors. From
> your text I wonder one thing that whether Executor#execute
> returns a cluster client or a job client? As discussed previously
> I think conceptually it is a job client?
>
> Best,
> tison.
>
>
> Kostas Kloudas <[hidden email]> 于2019年10月10日周四 下午5:08写道:
>
> > Hi Tison,
> >
> > I would say that as a first step, and until we see that the interfaces
> > we introduce cover all intended purposes, we keep the Executors
> > non-public.
> > From the previous discussion, I think that in general the Clients are
> > independent from the Executors, as the Executors simply use the
> > clients to submit jobs and return a cluster client.
> >
> > Cheers,
> > Kostas
> >
> > On Wed, Oct 9, 2019 at 7:01 PM Zili Chen <[hidden email]> wrote:
> > >
> > > Hi Kostas & Aljoscha,
> > >
> > > I'm drafting a plan exposing multi-layered clients. It is mainly about
> > > how we distinguish different layers and what clients we're going to
> > > expose.
> > >
> > > In FLIP-73 scope I'd like to ask a question that whether or not Executor
> > > becomes a public interface that can be made use of by downstream
> > > project developer? Or it just an internal concept for unifying job
> > > submission?
> > > If it is the latter, I'm feeling multi-layer client topic is totally
> > > independent from
> > > Executor.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Thomas Weise <[hidden email]> 于2019年10月5日周六 上午12:17写道:
> > >
> > > > It might be useful to mention on FLIP-73 that the intention for
> > > > Executor.execute is to be an asynchronous API once it becomes public
> > and
> > > > also refer to FLIP-74 as such.
> > > >
> > > >
> > > > On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek <[hidden email]>
> > > > wrote:
> > > >
> > > > > Hi Tison,
> > > > >
> > > > > I agree, for now the async Executor.execute() is an internal detail
> > but
> > > > > during your work for FLIP-74 it will probably also reach the public
> > API.
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > > On 4. Oct 2019, at 11:39, Zili Chen <[hidden email]> wrote:
> > > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > After clearly narrow the scope of this FLIP it looks good to me the
> > > > > > interface
> > > > > > Executor and its discovery so that I'm glad to see the vote thread.
> > > > > >
> > > > > > As you said, we should still discuss on implementation details but
> > I
> > > > > don't
> > > > > > think
> > > > > > it should be a blocker of the vote thread because a vote means we
> > > > > generally
> > > > > > agree on the motivation and overall design.
> > > > > >
> > > > > > As for Executor.execute() to be async, it is much better than we
> > keep
> > > > the
> > > > > > difference between sync/async in this level. But I'd like to note
> > that
> > > > it
> > > > > > only
> > > > > > works internally for now because user-facing interface is still
> > > > > env.execute
> > > > > > which block and return a JobExecutionResult. I'm afraid that there
> > are
> > > > > > several
> > > > > > people depends on the result for doing post execution process,
> > although
> > > > > it
> > > > > > doesn't
> > > > > > work on current per-job mode.
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > >
> > > > > > Aljoscha Krettek <[hidden email]> 于2019年10月4日周五 下午4:40写道:
> > > > > >
> > > > > >> Do you all think we could agree on the basic executor primitives
> > and
> > > > > start
> > > > > >> voting on this FLIP? There are still some implementation details
> > but I
> > > > > >> think we can discuss/tackle them when we get to them and the
> > various
> > > > > people
> > > > > >> implementing this should be in close collaboration.
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <[hidden email]>
> > > > > wrote:
> > > > > >>>
> > > > > >>> Hi,
> > > > > >>>
> > > > > >>> I think the end goal is to have only one environment per API,
> > but I
> > > > > >> think we won’t be able to achieve that in the short-term because
> > of
> > > > > >> backwards compatibility. This is most notable with the context
> > > > > environment,
> > > > > >> preview environments etc.
> > > > > >>>
> > > > > >>> To keep this FLIP very slim we can make this only about the
> > executors
> > > > > >> and executor discovery. Anything else like job submission
> > semantics,
> > > > > >> detached mode, … can be tackled after this. If we don’t focus I’m
> > > > afraid
> > > > > >> this will drag on for quite a while.
> > > > > >>>
> > > > > >>> One thing I would like to propose to make this easier is to
> > change
> > > > > >> Executor.execute() to return a CompletableFuture and to completely
> > > > > remove
> > > > > >> the “detached” logic from ClusterClient. That way, the new
> > components
> > > > > make
> > > > > >> no distinction between “detached” and “attached” but we can still
> > do
> > > > it
> > > > > in
> > > > > >> the CLI (via the ContextEnvironment) to support the existing
> > > > “detached”
> > > > > >> behaviour of the CLI that users expect. What do you think about
> > this?
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> Aljoscha
> > > > > >>>
> > > > > >>>> On 3. Oct 2019, at 10:03, Zili Chen <[hidden email]>
> > wrote:
> > > > > >>>>
> > > > > >>>> Thanks for your explanation Kostas to make it clear subtasks
> > under
> > > > > >> FLIP-73.
> > > > > >>>>
> > > > > >>>> As you described, changes of Environment are included in this
> > FLIP.
> > > > > For
> > > > > >>>> "each
> > > > > >>>> API to have a single Environment", it could be helpful to
> > describe
> > > > > which
> > > > > >>>> APIs we'd
> > > > > >>>> like to have after FLIP-73. And if we keep multiple
> > Environments,
> > > > > shall
> > > > > >> we
> > > > > >>>> keep the
> > > > > >>>> way inject context environment for each API?
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四 下午1:44写道:
> > > > > >>>>
> > > > > >>>>> Hi Tison,
> > > > > >>>>>
> > > > > >>>>> The changes that this FLIP propose are:
> > > > > >>>>> - the introduction of the Executor interface
> > > > > >>>>> - the fact that everything in the current state of job
> > submission
> > > > in
> > > > > >>>>> Flink can be defined through configuration parameters
> > > > > >>>>> - implementation of Executors that do not change any of the
> > > > semantics
> > > > > >>>>> of the currently offered "modes" of job submission
> > > > > >>>>>
> > > > > >>>>> In this, and in the FLIP itself where the
> > > > > >>>>> ExecutionEnvironment.execute() method is described, there are
> > > > details
> > > > > >>>>> about parts of the
> > > > > >>>>> integration with the existing Flink code-base.
> > > > > >>>>>
> > > > > >>>>> So I am not sure what do you mean by making the "integration a
> > > > > >>>>> follow-up discussion".
> > > > > >>>>>
> > > > > >>>>> Cheers,
> > > > > >>>>> Kostas
> > > > > >>>>>
> > > > > >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <[hidden email]
> > >
> > > > > wrote:
> > > > > >>>>>>
> > > > > >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal
> > to the
> > > > > >>>>>> Executors work, as they are using the exexute() method because
> > > > this
> > > > > is
> > > > > >>>>>> the only "entry" to the user program. To this regard, I
> > believe we
> > > > > >>>>>> should just see the fact that they have their dedicated
> > > > environment
> > > > > as
> > > > > >>>>>> an "implementation detail".
> > > > > >>>>>>
> > > > > >>>>>> The proposal says
> > > > > >>>>>>
> > > > > >>>>>> In this document, we propose to abstract away from the
> > > > Environments
> > > > > >> the
> > > > > >>>>> job
> > > > > >>>>>> submission logic and put it in a newly introduced Executor.
> > This
> > > > > will
> > > > > >>>>>> allow *each
> > > > > >>>>>> API to have a single Environment* which, based on the provided
> > > > > >>>>>> configuration, will decide which executor to use, *e.g.* Yarn,
> > > > > Local,
> > > > > >>>>> etc.
> > > > > >>>>>> In addition, it will allow different APIs and downstream
> > projects
> > > > to
> > > > > >>>>> re-use
> > > > > >>>>>> the provided executors, thus limiting the amount of code
> > > > duplication
> > > > > >> and
> > > > > >>>>>> the amount of code that has to be written.
> > > > > >>>>>>
> > > > > >>>>>> note that This will allow *each API to have a single
> > Environment*
> > > > > it
> > > > > >>>>>> seems a bit diverge with you statement above. Or we say a
> > single
> > > > > >>>>> Environment
> > > > > >>>>>> as a possible advantage after the introduction of Executor so
> > that
> > > > > we
> > > > > >>>>>> exclude it
> > > > > >>>>>> from this pass.
> > > > > >>>>>>
> > > > > >>>>>> Best,
> > > > > >>>>>> tison.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:07写道:
> > > > > >>>>>>
> > > > > >>>>>>> BTW, correct me if I misunderstand, now I learn more about
> > our
> > > > > >>>>> community
> > > > > >>>>>>> way. Since FLIP-73 aimed at introducing an interface with
> > > > community
> > > > > >>>>>>> consensus the discussion is more about the interface in
> > order to
> > > > > >>>>> properly
> > > > > >>>>>>> define a useful and extensible API. The integration story
> > could
> > > > be
> > > > > a
> > > > > >>>>>>> follow up
> > > > > >>>>>>> since this one does not affect current behavior at all.
> > > > > >>>>>>>
> > > > > >>>>>>> Best,
> > > > > >>>>>>> tison.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> Zili Chen <[hidden email]> 于2019年10月3日周四 上午2:02写道:
> > > > > >>>>>>>
> > > > > >>>>>>>> Hi Kostas,
> > > > > >>>>>>>>
> > > > > >>>>>>>> It seems does no harm we have a configuration parameter of
> > > > > >>>>>>>> Executor#execute
> > > > > >>>>>>>> since we can merge this one with the one configured on
> > Executor
> > > > > >>>>> created
> > > > > >>>>>>>> and
> > > > > >>>>>>>> let this one overwhelm that one.
> > > > > >>>>>>>>
> > > > > >>>>>>>> I can see it is useful that conceptually we can create an
> > > > Executor
> > > > > >>>>> for a
> > > > > >>>>>>>> series jobs
> > > > > >>>>>>>> to the same cluster but with different job configuration per
> > > > > >> pipeline.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Best,
> > > > > >>>>>>>> tison.
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> Kostas Kloudas <[hidden email]> 于2019年10月3日周四
> > 上午1:37写道:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Hi again,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> I did not include this to my previous email, as this is
> > related
> > > > > to
> > > > > >>>>> the
> > > > > >>>>>>>>> proposal on the FLIP itself.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> In the existing proposal, the Executor interface is the
> > > > > following.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> public interface Executor {
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws
> > Exception;
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> }
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> This implies that all the necessary information for the
> > > > execution
> > > > > >> of
> > > > > >>>>> a
> > > > > >>>>>>>>> Pipeline should be included in the Configuration passed in
> > the
> > > > > >>>>>>>>> ExecutorFactory which instantiates the Executor itself.
> > This
> > > > > should
> > > > > >>>>>>>>> include, for example, all the parameters currently
> > supplied by
> > > > > the
> > > > > >>>>>>>>> ProgramOptions, which are conceptually not executor
> > parameters
> > > > > but
> > > > > >>>>>>>>> rather parameters for the execution of the specific
> > pipeline.
> > > > To
> > > > > >> this
> > > > > >>>>>>>>> end, I would like to propose a change in the current
> > Executor
> > > > > >>>>>>>>> interface showcased below:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> public interface Executor {
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
> > > > > >>>>>>>>> executionOptions) throws Exception;
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> }
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> The above will allow to have the Executor specific options
> > > > passed
> > > > > >> in
> > > > > >>>>>>>>> the configuration given during executor instantiation,
> > while
> > > > the
> > > > > >>>>>>>>> pipeline specific options can be passed in the
> > > > executionOptions.
> > > > > >> As a
> > > > > >>>>>>>>> positive side-effect, this will make Executors re-usable,
> > i.e.
> > > > > >>>>>>>>> instantiate an executor and use it to execute multiple
> > > > pipelines,
> > > > > >> if
> > > > > >>>>>>>>> in the future we choose to do so.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Let me know what do you think,
> > > > > >>>>>>>>> Kostas
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <
> > > > > [hidden email]
> > > > > >>>
> > > > > >>>>>>>>> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I agree with Tison that we should disentangle threads so
> > that
> > > > > >>>>> people
> > > > > >>>>>>>>>> can work independently.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> For FLIP-73:
> > > > > >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are
> > orthogonal to
> > > > > the
> > > > > >>>>>>>>>> Executors work, as they are using the exexute() method
> > because
> > > > > >>>>> this is
> > > > > >>>>>>>>>> the only "entry" to the user program. To this regard, I
> > > > believe
> > > > > we
> > > > > >>>>>>>>>> should just see the fact that they have their dedicated
> > > > > >>>>> environment as
> > > > > >>>>>>>>>> an "implementation detail".
> > > > > >>>>>>>>>> - for getting rid of the per-job mode: as a first note,
> > there
> > > > > was
> > > > > >>>>>>>>>> already a discussion here:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>
> > > > > >>
> > > > >
> > > >
> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > > > >>>>>>>>>> with many people, including myself, expressing their
> > opinion.
> > > > I
> > > > > am
> > > > > >>>>>>>>>> mentioning that to show that this topic already has some
> > > > history
> > > > > >>>>> and
> > > > > >>>>>>>>>> the discussin does not start from scratch but there are
> > > > already
> > > > > >>>>> some
> > > > > >>>>>>>>>> contradicting opinions. My opinion is that we should not
> > get
> > > > rid
> > > > > >> of
> > > > > >>>>>>>>>> the per-job mode but I agree that we should discuss about
> > the
> > > > > >>>>>>>>>> semantics in more detail. Although in terms of code it
> > may be
> > > > > >>>>> tempting
> > > > > >>>>>>>>>> to "merge" the two submission modes, one of the main
> > benefits
> > > > of
> > > > > >>>>> the
> > > > > >>>>>>>>>> per-job mode is isolation, both for resources and
> > security, as
> > > > > the
> > > > > >>>>>>>>>> jobGraph to be executed is fixed and the cluster is
> > "locked"
> > > > > just
> > > > > >>>>> for
> > > > > >>>>>>>>>> that specific graph. This would be violated by having a
> > > > session
> > > > > >>>>>>>>>> cluster launched and having all the infrastrucutre (ports
> > and
> > > > > >>>>>>>>>> endpoints) set for submittting to that cluster any job.
> > > > > >>>>>>>>>> - for getting rid of the "detached" mode: I agree with
> > getting
> > > > > rid
> > > > > >>>>> of
> > > > > >>>>>>>>>> it but this implies some potential user-facing changes
> > that
> > > > > should
> > > > > >>>>> be
> > > > > >>>>>>>>>> discussed.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Given the above, I think that:
> > > > > >>>>>>>>>> 1) in the context of FLIP-73 we should not change any
> > > > semantics
> > > > > >> but
> > > > > >>>>>>>>>> simply push the existing submission logic behind a
> > reusable
> > > > > >>>>>>>>>> abstraction and make it usable via public APIs, as
> > Aljoscha
> > > > > said.
> > > > > >>>>>>>>>> 2) as Till said, changing the semantics is beyond the
> > scope of
> > > > > >> this
> > > > > >>>>>>>>>> FLIP and as Tison mentioned we should work towards
> > decoupling
> > > > > >>>>>>>>>> discussions rather than the opposite. So let's discuss
> > about
> > > > the
> > > > > >>>>>>>>>> future of the per-job and detached modes in a separate
> > thread.
> > > > > >> This
> > > > > >>>>>>>>>> will also allow to give the proper visibility to such an
> > > > > important
> > > > > >>>>>>>>>> topic.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Cheers,
> > > > > >>>>>>>>>> Kostas
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <
> > > > [hidden email]>
> > > > > >>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Thanks for your thoughts Aljoscha.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Another question since FLIP-73 might contains refactors
> > on
> > > > > >>>>>>>>> Environemnt:
> > > > > >>>>>>>>>>> shall we support
> > > > > >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a
> > > > user
> > > > > >>>>>>>>> perspective
> > > > > >>>>>>>>>>> preview plan
> > > > > >>>>>>>>>>> is useful, by give visual view, to modify topos and
> > configure
> > > > > >>>>> without
> > > > > >>>>>>>>>>> submit it.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Best,
> > > > > >>>>>>>>>>> tison.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Aljoscha Krettek <[hidden email]> 于2019年10月2日周三
> > > > > 下午10:10写道:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> I agree with Till that we should not change the
> > semantics of
> > > > > >>>>>>>>> per-job mode.
> > > > > >>>>>>>>>>>> In my opinion per-job mode means that the cluster
> > > > (JobManager)
> > > > > >>>>> is
> > > > > >>>>>>>>> brought
> > > > > >>>>>>>>>>>> up with one job and it only executes that one job. There
> > > > > >>>>> should be
> > > > > >>>>>>>>> no open
> > > > > >>>>>>>>>>>> ports/anything that would allow submitting further jobs.
> > > > This
> > > > > >>>>> is
> > > > > >>>>>>>>> very
> > > > > >>>>>>>>>>>> important for deployments in docker/Kubernetes or other
> > > > > >>>>>>>>> environments were
> > > > > >>>>>>>>>>>> you bring up jobs without necessarily having the notion
> > of a
> > > > > >>>>> Flink
> > > > > >>>>>>>>> cluster.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> What this means for a user program that has multiple
> > > > execute()
> > > > > >>>>>>>>> calls is
> > > > > >>>>>>>>>>>> that you will get a fresh cluster for each execute call.
> > > > This
> > > > > >>>>> also
> > > > > >>>>>>>>> means,
> > > > > >>>>>>>>>>>> that further execute() calls will only happen if the
> > > > “client”
> > > > > >>>>> is
> > > > > >>>>>>>>> still
> > > > > >>>>>>>>>>>> alive, because it is the one driving execution.
> > Currently,
> > > > > this
> > > > > >>>>>>>>> only works
> > > > > >>>>>>>>>>>> if you start the job in “attached” mode. If you start in
> > > > > >>>>>>>>> “detached” mode
> > > > > >>>>>>>>>>>> only the first execute() will happen and the rest will
> > be
> > > > > >>>>> ignored.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> This brings us to the tricky question about what to do
> > about
> > > > > >>>>>>>>> “detached”
> > > > > >>>>>>>>>>>> and “attached”. In the long run, I would like to get
> > rid of
> > > > > the
> > > > > >>>>>>>>> distinction
> > > > > >>>>>>>>>>>> and leave it up to the user program, by either blocking
> > or
> > > > not
> > > > > >>>>> on
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission
> > > > returns.
> > > > > >>>>> This,
> > > > > >>>>>>>>>>>> however, means that users cannot simply request
> > “detached”
> > > > > >>>>>>>>> execution when
> > > > > >>>>>>>>>>>> using bin/flink, the user program has to “play along”.
> > On
> > > > the
> > > > > >>>>>>>>> other hand,
> > > > > >>>>>>>>>>>> “detached” mode is quite strange for the user program.
> > The
> > > > > >>>>>>>>> execute() call
> > > > > >>>>>>>>>>>> either returns with a proper job result after the job
> > ran
> > > > (in
> > > > > >>>>>>>>> “attached”
> > > > > >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right
> > > > after
> > > > > >>>>>>>>> submission. I
> > > > > >>>>>>>>>>>> think this can even lead to weird cases where multiple
> > > > > >>>>> "execute()”
> > > > > >>>>>>>>> run in
> > > > > >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out
> > of
> > > > the
> > > > > >>>>>>>>> first
> > > > > >>>>>>>>>>>> execute so the rest (including result processing logic)
> > is
> > > > > >>>>> ignored.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
> > > > > >>>>> problems,
> > > > > >>>>>>>>> because
> > > > > >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind
> > a
> > > > > >>>>> reusable
> > > > > >>>>>>>>>>>> abstraction and makes it usable via API. We should
> > closely
> > > > > >>>>> follow
> > > > > >>>>>>>>> up on the
> > > > > >>>>>>>>>>>> above points though because I think they are also
> > important.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Best,
> > > > > >>>>>>>>>>>> Aljoscha
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <
> > [hidden email]>
> > > > > >>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Thanks for your clarification Till.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> I agree with the current semantics of the per-job
> > mode, one
> > > > > >>>>>>>>> should
> > > > > >>>>>>>>>>>> deploy a
> > > > > >>>>>>>>>>>>> new cluster for each part of the job. Apart from the
> > > > > >>>>> performance
> > > > > >>>>>>>>> concern
> > > > > >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
> > > > > >>>>> cluster
> > > > > >>>>>>>>> actually,
> > > > > >>>>>>>>>>>>> which is different from the description that Executor
> > > > submit
> > > > > >>>>> a
> > > > > >>>>>>>>> job.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > >
> > > > >
> > > >
> >
12