[DISCUSS] Support registering custom JobStatusListeners when scheduling a job

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Support registering custom JobStatusListeners when scheduling a job

zetaplusae
Hi there,

I'm working on a Flink platform in my corp, which provides a service to
provision and manage multiple dedicated Flink clusters. The problem is that
we want to sync a job status without delay after its submission through our
platform as long as it has been changed.

Since we want to update this in-time and make our services stateless,
pulling a job's status periodically is not a good solution. I do not find
any proper way to achieve this by letting a job manager push changes
directly to our platform except changing the source code, which registers
an additional `JobStatusListener` in the method
`org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.

I wonder if we can enhance `JobStatusListener` a little bit so that a Flink
user can register his custom JobStatusListener at the startup.

To be specific, we can have a `JobStatusListenerFactory` interface and its
corresponding `ServiceLoader<JobStatusListenerFactory>`, where
the JobStatusListenerFactory will have the following method:
 - JobStatusListener createJobStatusListener(Properties properties);

Custom listeners will be created during the JobMaster#startScheduling
method.

If someone would like to implement his own JobStatusListener, he will
package all the related classes into a standalone jar with a
`META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
file and place it under the `lib/` directory.

In addition, I find that there is a Jira ticket similar to what I'm
asking: FLINK-17104 but I do not see any comment or update yet. Hope anyone
could help me move on this feature or give me some suggestions about it.

Thanks,
Wenhao
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Aljoscha Krettek-2
Hi,

I understand from your email that
`StreamExecutionEnvironment.registerJobListener()` would not be enought
for you because you want to be notified of changes on the cluster side,
correct? That is when the job status changes on the master.

Best,
Aljoscha

On 23.09.20 14:31, 季文昊 wrote:

> Hi there,
>
> I'm working on a Flink platform in my corp, which provides a service to
> provision and manage multiple dedicated Flink clusters. The problem is that
> we want to sync a job status without delay after its submission through our
> platform as long as it has been changed.
>
> Since we want to update this in-time and make our services stateless,
> pulling a job's status periodically is not a good solution. I do not find
> any proper way to achieve this by letting a job manager push changes
> directly to our platform except changing the source code, which registers
> an additional `JobStatusListener` in the method
> `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
>
> I wonder if we can enhance `JobStatusListener` a little bit so that a Flink
> user can register his custom JobStatusListener at the startup.
>
> To be specific, we can have a `JobStatusListenerFactory` interface and its
> corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> the JobStatusListenerFactory will have the following method:
>   - JobStatusListener createJobStatusListener(Properties properties);
>
> Custom listeners will be created during the JobMaster#startScheduling
> method.
>
> If someone would like to implement his own JobStatusListener, he will
> package all the related classes into a standalone jar with a
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> file and place it under the `lib/` directory.
>
> In addition, I find that there is a Jira ticket similar to what I'm
> asking: FLINK-17104 but I do not see any comment or update yet. Hope anyone
> could help me move on this feature or give me some suggestions about it.
>
> Thanks,
> Wenhao
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

zetaplusae
Hi Aljoscha,

Yes, that is not enough, since the `JobListener`s are called only once when
`excute()` or `executeAsync()` is called. And in order to sync the status,
I also have to call `JobClient#getJobStatus` periodically.

On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
>
> I understand from your email that
> `StreamExecutionEnvironment.registerJobListener()` would not be enought
> for you because you want to be notified of changes on the cluster side,
> correct? That is when the job status changes on the master.
>
> Best,
> Aljoscha
>
> On 23.09.20 14:31, 季文昊 wrote:
> > Hi there,
> >
> > I'm working on a Flink platform in my corp, which provides a service to
> > provision and manage multiple dedicated Flink clusters. The problem is
> that
> > we want to sync a job status without delay after its submission through
> our
> > platform as long as it has been changed.
> >
> > Since we want to update this in-time and make our services stateless,
> > pulling a job's status periodically is not a good solution. I do not find
> > any proper way to achieve this by letting a job manager push changes
> > directly to our platform except changing the source code, which registers
> > an additional `JobStatusListener` in the method
> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> >
> > I wonder if we can enhance `JobStatusListener` a little bit so that a
> Flink
> > user can register his custom JobStatusListener at the startup.
> >
> > To be specific, we can have a `JobStatusListenerFactory` interface and
> its
> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > the JobStatusListenerFactory will have the following method:
> >   - JobStatusListener createJobStatusListener(Properties properties);
> >
> > Custom listeners will be created during the JobMaster#startScheduling
> > method.
> >
> > If someone would like to implement his own JobStatusListener, he will
> > package all the related classes into a standalone jar with a
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > file and place it under the `lib/` directory.
> >
> > In addition, I find that there is a Jira ticket similar to what I'm
> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
> anyone
> > could help me move on this feature or give me some suggestions about it.
> >
> > Thanks,
> > Wenhao
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Aljoscha Krettek-2
Till or Chesnay (cc'ed), have you thought about adding a hook on the
JobMaster/JobManager to allow external systems to get push notifications
about submitted jobs.

If they are ok with such as future, would you maybe be interested in
implementing it yourself, Wenhao?

Best,
Aljoscha

On 2020/09/28 11:14, 季文昊 wrote:

>Hi Aljoscha,
>
>Yes, that is not enough, since the `JobListener`s are called only once when
>`excute()` or `executeAsync()` is called. And in order to sync the status,
>I also have to call `JobClient#getJobStatus` periodically.
>
>On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]>
>wrote:
>
>> Hi,
>>
>> I understand from your email that
>> `StreamExecutionEnvironment.registerJobListener()` would not be enought
>> for you because you want to be notified of changes on the cluster side,
>> correct? That is when the job status changes on the master.
>>
>> Best,
>> Aljoscha
>>
>> On 23.09.20 14:31, 季文昊 wrote:
>> > Hi there,
>> >
>> > I'm working on a Flink platform in my corp, which provides a service to
>> > provision and manage multiple dedicated Flink clusters. The problem is
>> that
>> > we want to sync a job status without delay after its submission through
>> our
>> > platform as long as it has been changed.
>> >
>> > Since we want to update this in-time and make our services stateless,
>> > pulling a job's status periodically is not a good solution. I do not find
>> > any proper way to achieve this by letting a job manager push changes
>> > directly to our platform except changing the source code, which registers
>> > an additional `JobStatusListener` in the method
>> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
>> >
>> > I wonder if we can enhance `JobStatusListener` a little bit so that a
>> Flink
>> > user can register his custom JobStatusListener at the startup.
>> >
>> > To be specific, we can have a `JobStatusListenerFactory` interface and
>> its
>> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
>> > the JobStatusListenerFactory will have the following method:
>> >   - JobStatusListener createJobStatusListener(Properties properties);
>> >
>> > Custom listeners will be created during the JobMaster#startScheduling
>> > method.
>> >
>> > If someone would like to implement his own JobStatusListener, he will
>> > package all the related classes into a standalone jar with a
>> >
>> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
>> > file and place it under the `lib/` directory.
>> >
>> > In addition, I find that there is a Jira ticket similar to what I'm
>> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
>> anyone
>> > could help me move on this feature or give me some suggestions about it.
>> >
>> > Thanks,
>> > Wenhao
>> >
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Till Rohrmann
At the moment, this requirement has not come up very often. In general, I
am always a bit cautious when adding functionality which executes user code
in the JobManager because it can easily become a stability problem. On the
other hand, I can't think of a different solution other than polling the
job status atm.

Cheers,
Till

On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]> wrote:

> Till or Chesnay (cc'ed), have you thought about adding a hook on the
> JobMaster/JobManager to allow external systems to get push notifications
> about submitted jobs.
>
> If they are ok with such as future, would you maybe be interested in
> implementing it yourself, Wenhao?
>
> Best,
> Aljoscha
>
> On 2020/09/28 11:14, 季文昊 wrote:
> >Hi Aljoscha,
> >
> >Yes, that is not enough, since the `JobListener`s are called only once
> when
> >`excute()` or `executeAsync()` is called. And in order to sync the status,
> >I also have to call `JobClient#getJobStatus` periodically.
> >
> >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]>
> >wrote:
> >
> >> Hi,
> >>
> >> I understand from your email that
> >> `StreamExecutionEnvironment.registerJobListener()` would not be enought
> >> for you because you want to be notified of changes on the cluster side,
> >> correct? That is when the job status changes on the master.
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 23.09.20 14:31, 季文昊 wrote:
> >> > Hi there,
> >> >
> >> > I'm working on a Flink platform in my corp, which provides a service
> to
> >> > provision and manage multiple dedicated Flink clusters. The problem is
> >> that
> >> > we want to sync a job status without delay after its submission
> through
> >> our
> >> > platform as long as it has been changed.
> >> >
> >> > Since we want to update this in-time and make our services stateless,
> >> > pulling a job's status periodically is not a good solution. I do not
> find
> >> > any proper way to achieve this by letting a job manager push changes
> >> > directly to our platform except changing the source code, which
> registers
> >> > an additional `JobStatusListener` in the method
> >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> >> >
> >> > I wonder if we can enhance `JobStatusListener` a little bit so that a
> >> Flink
> >> > user can register his custom JobStatusListener at the startup.
> >> >
> >> > To be specific, we can have a `JobStatusListenerFactory` interface and
> >> its
> >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> >> > the JobStatusListenerFactory will have the following method:
> >> >   - JobStatusListener createJobStatusListener(Properties properties);
> >> >
> >> > Custom listeners will be created during the JobMaster#startScheduling
> >> > method.
> >> >
> >> > If someone would like to implement his own JobStatusListener, he will
> >> > package all the related classes into a standalone jar with a
> >> >
> >>
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> >> > file and place it under the `lib/` directory.
> >> >
> >> > In addition, I find that there is a Jira ticket similar to what I'm
> >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
> >> anyone
> >> > could help me move on this feature or give me some suggestions about
> it.
> >> >
> >> > Thanks,
> >> > Wenhao
> >> >
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Jeff Zhang
Hi Till,

IIUC for application mode, we already allow to run user code in job manager

Till Rohrmann <[hidden email]> 于2021年1月8日周五 下午9:53写道:

> At the moment, this requirement has not come up very often. In general, I
> am always a bit cautious when adding functionality which executes user code
> in the JobManager because it can easily become a stability problem. On the
> other hand, I can't think of a different solution other than polling the
> job status atm.
>
> Cheers,
> Till
>
> On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Till or Chesnay (cc'ed), have you thought about adding a hook on the
> > JobMaster/JobManager to allow external systems to get push notifications
> > about submitted jobs.
> >
> > If they are ok with such as future, would you maybe be interested in
> > implementing it yourself, Wenhao?
> >
> > Best,
> > Aljoscha
> >
> > On 2020/09/28 11:14, 季文昊 wrote:
> > >Hi Aljoscha,
> > >
> > >Yes, that is not enough, since the `JobListener`s are called only once
> > when
> > >`excute()` or `executeAsync()` is called. And in order to sync the
> status,
> > >I also have to call `JobClient#getJobStatus` periodically.
> > >
> > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]>
> > >wrote:
> > >
> > >> Hi,
> > >>
> > >> I understand from your email that
> > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> enought
> > >> for you because you want to be notified of changes on the cluster
> side,
> > >> correct? That is when the job status changes on the master.
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> On 23.09.20 14:31, 季文昊 wrote:
> > >> > Hi there,
> > >> >
> > >> > I'm working on a Flink platform in my corp, which provides a service
> > to
> > >> > provision and manage multiple dedicated Flink clusters. The problem
> is
> > >> that
> > >> > we want to sync a job status without delay after its submission
> > through
> > >> our
> > >> > platform as long as it has been changed.
> > >> >
> > >> > Since we want to update this in-time and make our services
> stateless,
> > >> > pulling a job's status periodically is not a good solution. I do not
> > find
> > >> > any proper way to achieve this by letting a job manager push changes
> > >> > directly to our platform except changing the source code, which
> > registers
> > >> > an additional `JobStatusListener` in the method
> > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > >> >
> > >> > I wonder if we can enhance `JobStatusListener` a little bit so that
> a
> > >> Flink
> > >> > user can register his custom JobStatusListener at the startup.
> > >> >
> > >> > To be specific, we can have a `JobStatusListenerFactory` interface
> and
> > >> its
> > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > >> > the JobStatusListenerFactory will have the following method:
> > >> >   - JobStatusListener createJobStatusListener(Properties
> properties);
> > >> >
> > >> > Custom listeners will be created during the
> JobMaster#startScheduling
> > >> > method.
> > >> >
> > >> > If someone would like to implement his own JobStatusListener, he
> will
> > >> > package all the related classes into a standalone jar with a
> > >> >
> > >>
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > >> > file and place it under the `lib/` directory.
> > >> >
> > >> > In addition, I find that there is a Jira ticket similar to what I'm
> > >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope
> > >> anyone
> > >> > could help me move on this feature or give me some suggestions about
> > it.
> > >> >
> > >> > Thanks,
> > >> > Wenhao
> > >> >
> > >>
> > >>
> >
>


--
Best Regards

Jeff Zhang
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

zetaplusae
Hi Till. Indeed, there is no proper solution now other than the polling
method. It is painful to have such code in our platform since it consumes a
lot of resources to keep the polling run periodically when there are
hundreds of Flink clusters to maintain. A lot of pollings are actually
useless as the job status seldom changes. Also, it makes the status not
synchronized in-time.

Aljoscha, can I submit a PR firstly so that we can review and discuss
whether it will introduce any stability problem or potential risks.

Thanks,
Wenhao

On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <[hidden email]> wrote:

> Hi Till,
>
> IIUC for application mode, we already allow to run user code in job manager
>
> Till Rohrmann <[hidden email]> 于2021年1月8日周五 下午9:53写道:
>
> > At the moment, this requirement has not come up very often. In general, I
> > am always a bit cautious when adding functionality which executes user
> code
> > in the JobManager because it can easily become a stability problem. On
> the
> > other hand, I can't think of a different solution other than polling the
> > job status atm.
> >
> > Cheers,
> > Till
> >
> > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Till or Chesnay (cc'ed), have you thought about adding a hook on the
> > > JobMaster/JobManager to allow external systems to get push
> notifications
> > > about submitted jobs.
> > >
> > > If they are ok with such as future, would you maybe be interested in
> > > implementing it yourself, Wenhao?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > On 2020/09/28 11:14, 季文昊 wrote:
> > > >Hi Aljoscha,
> > > >
> > > >Yes, that is not enough, since the `JobListener`s are called only once
> > > when
> > > >`excute()` or `executeAsync()` is called. And in order to sync the
> > status,
> > > >I also have to call `JobClient#getJobStatus` periodically.
> > > >
> > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]
> >
> > > >wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I understand from your email that
> > > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> > enought
> > > >> for you because you want to be notified of changes on the cluster
> > side,
> > > >> correct? That is when the job status changes on the master.
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >> On 23.09.20 14:31, 季文昊 wrote:
> > > >> > Hi there,
> > > >> >
> > > >> > I'm working on a Flink platform in my corp, which provides a
> service
> > > to
> > > >> > provision and manage multiple dedicated Flink clusters. The
> problem
> > is
> > > >> that
> > > >> > we want to sync a job status without delay after its submission
> > > through
> > > >> our
> > > >> > platform as long as it has been changed.
> > > >> >
> > > >> > Since we want to update this in-time and make our services
> > stateless,
> > > >> > pulling a job's status periodically is not a good solution. I do
> not
> > > find
> > > >> > any proper way to achieve this by letting a job manager push
> changes
> > > >> > directly to our platform except changing the source code, which
> > > registers
> > > >> > an additional `JobStatusListener` in the method
> > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > > >> >
> > > >> > I wonder if we can enhance `JobStatusListener` a little bit so
> that
> > a
> > > >> Flink
> > > >> > user can register his custom JobStatusListener at the startup.
> > > >> >
> > > >> > To be specific, we can have a `JobStatusListenerFactory` interface
> > and
> > > >> its
> > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > > >> > the JobStatusListenerFactory will have the following method:
> > > >> >   - JobStatusListener createJobStatusListener(Properties
> > properties);
> > > >> >
> > > >> > Custom listeners will be created during the
> > JobMaster#startScheduling
> > > >> > method.
> > > >> >
> > > >> > If someone would like to implement his own JobStatusListener, he
> > will
> > > >> > package all the related classes into a standalone jar with a
> > > >> >
> > > >>
> > >
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > > >> > file and place it under the `lib/` directory.
> > > >> >
> > > >> > In addition, I find that there is a Jira ticket similar to what
> I'm
> > > >> > asking: FLINK-17104 but I do not see any comment or update yet.
> Hope
> > > >> anyone
> > > >> > could help me move on this feature or give me some suggestions
> about
> > > it.
> > > >> >
> > > >> > Thanks,
> > > >> > Wenhao
> > > >> >
> > > >>
> > > >>
> > >
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

Till Rohrmann
Hi Wenhao,

Aljoscha might not be as responsive as before. Surely you can create a POC
to evaluate different approaches. But the outcome should be a design which
we discuss before starting to implement the code properly. At the moment
the community might be a bit busy with the upcoming feature freeze (just
for your information).

Cheers,
Till

On Wed, Mar 10, 2021 at 3:40 PM Wenhao Ji <[hidden email]> wrote:

> Hi Till. Indeed, there is no proper solution now other than the polling
> method. It is painful to have such code in our platform since it consumes a
> lot of resources to keep the polling run periodically when there are
> hundreds of Flink clusters to maintain. A lot of pollings are actually
> useless as the job status seldom changes. Also, it makes the status not
> synchronized in-time.
>
> Aljoscha, can I submit a PR firstly so that we can review and discuss
> whether it will introduce any stability problem or potential risks.
>
> Thanks,
> Wenhao
>
> On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <[hidden email]> wrote:
>
> > Hi Till,
> >
> > IIUC for application mode, we already allow to run user code in job
> manager
> >
> > Till Rohrmann <[hidden email]> 于2021年1月8日周五 下午9:53写道:
> >
> > > At the moment, this requirement has not come up very often. In
> general, I
> > > am always a bit cautious when adding functionality which executes user
> > code
> > > in the JobManager because it can easily become a stability problem. On
> > the
> > > other hand, I can't think of a different solution other than polling
> the
> > > job status atm.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]>
> > > wrote:
> > >
> > > > Till or Chesnay (cc'ed), have you thought about adding a hook on the
> > > > JobMaster/JobManager to allow external systems to get push
> > notifications
> > > > about submitted jobs.
> > > >
> > > > If they are ok with such as future, would you maybe be interested in
> > > > implementing it yourself, Wenhao?
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > On 2020/09/28 11:14, 季文昊 wrote:
> > > > >Hi Aljoscha,
> > > > >
> > > > >Yes, that is not enough, since the `JobListener`s are called only
> once
> > > > when
> > > > >`excute()` or `executeAsync()` is called. And in order to sync the
> > > status,
> > > > >I also have to call `JobClient#getJobStatus` periodically.
> > > > >
> > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <
> [hidden email]
> > >
> > > > >wrote:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> I understand from your email that
> > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> > > enought
> > > > >> for you because you want to be notified of changes on the cluster
> > > side,
> > > > >> correct? That is when the job status changes on the master.
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >> On 23.09.20 14:31, 季文昊 wrote:
> > > > >> > Hi there,
> > > > >> >
> > > > >> > I'm working on a Flink platform in my corp, which provides a
> > service
> > > > to
> > > > >> > provision and manage multiple dedicated Flink clusters. The
> > problem
> > > is
> > > > >> that
> > > > >> > we want to sync a job status without delay after its submission
> > > > through
> > > > >> our
> > > > >> > platform as long as it has been changed.
> > > > >> >
> > > > >> > Since we want to update this in-time and make our services
> > > stateless,
> > > > >> > pulling a job's status periodically is not a good solution. I do
> > not
> > > > find
> > > > >> > any proper way to achieve this by letting a job manager push
> > changes
> > > > >> > directly to our platform except changing the source code, which
> > > > registers
> > > > >> > an additional `JobStatusListener` in the method
> > > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > > > >> >
> > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so
> > that
> > > a
> > > > >> Flink
> > > > >> > user can register his custom JobStatusListener at the startup.
> > > > >> >
> > > > >> > To be specific, we can have a `JobStatusListenerFactory`
> interface
> > > and
> > > > >> its
> > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > > > >> > the JobStatusListenerFactory will have the following method:
> > > > >> >   - JobStatusListener createJobStatusListener(Properties
> > > properties);
> > > > >> >
> > > > >> > Custom listeners will be created during the
> > > JobMaster#startScheduling
> > > > >> > method.
> > > > >> >
> > > > >> > If someone would like to implement his own JobStatusListener, he
> > > will
> > > > >> > package all the related classes into a standalone jar with a
> > > > >> >
> > > > >>
> > > >
> > >
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > > > >> > file and place it under the `lib/` directory.
> > > > >> >
> > > > >> > In addition, I find that there is a Jira ticket similar to what
> > I'm
> > > > >> > asking: FLINK-17104 but I do not see any comment or update yet.
> > Hope
> > > > >> anyone
> > > > >> > could help me move on this feature or give me some suggestions
> > about
> > > > it.
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Wenhao
> > > > >> >
> > > > >>
> > > > >>
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support registering custom JobStatusListeners when scheduling a job

zetaplusae
Hi Till,

Thanks for taking time out of your busy schedule.
I have created a POC for this feature.

The code change for the Flink source code will be like this commit:
https://github.com/predatorray/flink/commit/2cab8bb1119162213632db984d2eb7529b8140e7
Generally, the idea is that custom `JobStatusListener`s will be loaded via
their `JobStatusListenerFactories` using the `PluginManager`. They will be
created and initialized during the construction of the `ClusterEntrypoint`.
`JobMaster` will register these listeners when it starts scheduling.

Finally, we can implement our own plugins. I have also written an example
of a JobStatusListener plugin, which simply prints the job status changes:
https://github.com/predatorray/flink-example-listener-plugin

Hope you will have time to review the code and idea.

Thanks again!

Wenhao

On Wed, Mar 10, 2021 at 11:23 PM Till Rohrmann <[hidden email]> wrote:

> Hi Wenhao,
>
> Aljoscha might not be as responsive as before. Surely you can create a POC
> to evaluate different approaches. But the outcome should be a design which
> we discuss before starting to implement the code properly. At the moment
> the community might be a bit busy with the upcoming feature freeze (just
> for your information).
>
> Cheers,
> Till
>
> On Wed, Mar 10, 2021 at 3:40 PM Wenhao Ji <[hidden email]> wrote:
>
> > Hi Till. Indeed, there is no proper solution now other than the polling
> > method. It is painful to have such code in our platform since it
> consumes a
> > lot of resources to keep the polling run periodically when there are
> > hundreds of Flink clusters to maintain. A lot of pollings are actually
> > useless as the job status seldom changes. Also, it makes the status not
> > synchronized in-time.
> >
> > Aljoscha, can I submit a PR firstly so that we can review and discuss
> > whether it will introduce any stability problem or potential risks.
> >
> > Thanks,
> > Wenhao
> >
> > On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <[hidden email]> wrote:
> >
> > > Hi Till,
> > >
> > > IIUC for application mode, we already allow to run user code in job
> > manager
> > >
> > > Till Rohrmann <[hidden email]> 于2021年1月8日周五 下午9:53写道:
> > >
> > > > At the moment, this requirement has not come up very often. In
> > general, I
> > > > am always a bit cautious when adding functionality which executes
> user
> > > code
> > > > in the JobManager because it can easily become a stability problem.
> On
> > > the
> > > > other hand, I can't think of a different solution other than polling
> > the
> > > > job status atm.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]
> >
> > > > wrote:
> > > >
> > > > > Till or Chesnay (cc'ed), have you thought about adding a hook on
> the
> > > > > JobMaster/JobManager to allow external systems to get push
> > > notifications
> > > > > about submitted jobs.
> > > > >
> > > > > If they are ok with such as future, would you maybe be interested
> in
> > > > > implementing it yourself, Wenhao?
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > On 2020/09/28 11:14, 季文昊 wrote:
> > > > > >Hi Aljoscha,
> > > > > >
> > > > > >Yes, that is not enough, since the `JobListener`s are called only
> > once
> > > > > when
> > > > > >`excute()` or `executeAsync()` is called. And in order to sync the
> > > > status,
> > > > > >I also have to call `JobClient#getJobStatus` periodically.
> > > > > >
> > > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <
> > [hidden email]
> > > >
> > > > > >wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >>
> > > > > >> I understand from your email that
> > > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be
> > > > enought
> > > > > >> for you because you want to be notified of changes on the
> cluster
> > > > side,
> > > > > >> correct? That is when the job status changes on the master.
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >> On 23.09.20 14:31, 季文昊 wrote:
> > > > > >> > Hi there,
> > > > > >> >
> > > > > >> > I'm working on a Flink platform in my corp, which provides a
> > > service
> > > > > to
> > > > > >> > provision and manage multiple dedicated Flink clusters. The
> > > problem
> > > > is
> > > > > >> that
> > > > > >> > we want to sync a job status without delay after its
> submission
> > > > > through
> > > > > >> our
> > > > > >> > platform as long as it has been changed.
> > > > > >> >
> > > > > >> > Since we want to update this in-time and make our services
> > > > stateless,
> > > > > >> > pulling a job's status periodically is not a good solution. I
> do
> > > not
> > > > > find
> > > > > >> > any proper way to achieve this by letting a job manager push
> > > changes
> > > > > >> > directly to our platform except changing the source code,
> which
> > > > > registers
> > > > > >> > an additional `JobStatusListener` in the method
> > > > > >> >
> `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`.
> > > > > >> >
> > > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so
> > > that
> > > > a
> > > > > >> Flink
> > > > > >> > user can register his custom JobStatusListener at the startup.
> > > > > >> >
> > > > > >> > To be specific, we can have a `JobStatusListenerFactory`
> > interface
> > > > and
> > > > > >> its
> > > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where
> > > > > >> > the JobStatusListenerFactory will have the following method:
> > > > > >> >   - JobStatusListener createJobStatusListener(Properties
> > > > properties);
> > > > > >> >
> > > > > >> > Custom listeners will be created during the
> > > > JobMaster#startScheduling
> > > > > >> > method.
> > > > > >> >
> > > > > >> > If someone would like to implement his own JobStatusListener,
> he
> > > > will
> > > > > >> > package all the related classes into a standalone jar with a
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener`
> > > > > >> > file and place it under the `lib/` directory.
> > > > > >> >
> > > > > >> > In addition, I find that there is a Jira ticket similar to
> what
> > > I'm
> > > > > >> > asking: FLINK-17104 but I do not see any comment or update
> yet.
> > > Hope
> > > > > >> anyone
> > > > > >> > could help me move on this feature or give me some suggestions
> > > about
> > > > > it.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Wenhao
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>