Hi there,
I'm working on a Flink platform in my corp, which provides a service to provision and manage multiple dedicated Flink clusters. The problem is that we want to sync a job status without delay after its submission through our platform as long as it has been changed. Since we want to update this in-time and make our services stateless, pulling a job's status periodically is not a good solution. I do not find any proper way to achieve this by letting a job manager push changes directly to our platform except changing the source code, which registers an additional `JobStatusListener` in the method `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. I wonder if we can enhance `JobStatusListener` a little bit so that a Flink user can register his custom JobStatusListener at the startup. To be specific, we can have a `JobStatusListenerFactory` interface and its corresponding `ServiceLoader<JobStatusListenerFactory>`, where the JobStatusListenerFactory will have the following method: - JobStatusListener createJobStatusListener(Properties properties); Custom listeners will be created during the JobMaster#startScheduling method. If someone would like to implement his own JobStatusListener, he will package all the related classes into a standalone jar with a `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` file and place it under the `lib/` directory. In addition, I find that there is a Jira ticket similar to what I'm asking: FLINK-17104 but I do not see any comment or update yet. Hope anyone could help me move on this feature or give me some suggestions about it. Thanks, Wenhao |
Hi,
I understand from your email that `StreamExecutionEnvironment.registerJobListener()` would not be enought for you because you want to be notified of changes on the cluster side, correct? That is when the job status changes on the master. Best, Aljoscha On 23.09.20 14:31, 季文昊 wrote: > Hi there, > > I'm working on a Flink platform in my corp, which provides a service to > provision and manage multiple dedicated Flink clusters. The problem is that > we want to sync a job status without delay after its submission through our > platform as long as it has been changed. > > Since we want to update this in-time and make our services stateless, > pulling a job's status periodically is not a good solution. I do not find > any proper way to achieve this by letting a job manager push changes > directly to our platform except changing the source code, which registers > an additional `JobStatusListener` in the method > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. > > I wonder if we can enhance `JobStatusListener` a little bit so that a Flink > user can register his custom JobStatusListener at the startup. > > To be specific, we can have a `JobStatusListenerFactory` interface and its > corresponding `ServiceLoader<JobStatusListenerFactory>`, where > the JobStatusListenerFactory will have the following method: > - JobStatusListener createJobStatusListener(Properties properties); > > Custom listeners will be created during the JobMaster#startScheduling > method. > > If someone would like to implement his own JobStatusListener, he will > package all the related classes into a standalone jar with a > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` > file and place it under the `lib/` directory. > > In addition, I find that there is a Jira ticket similar to what I'm > asking: FLINK-17104 but I do not see any comment or update yet. Hope anyone > could help me move on this feature or give me some suggestions about it. > > Thanks, > Wenhao > |
Hi Aljoscha,
Yes, that is not enough, since the `JobListener`s are called only once when `excute()` or `executeAsync()` is called. And in order to sync the status, I also have to call `JobClient#getJobStatus` periodically. On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]> wrote: > Hi, > > I understand from your email that > `StreamExecutionEnvironment.registerJobListener()` would not be enought > for you because you want to be notified of changes on the cluster side, > correct? That is when the job status changes on the master. > > Best, > Aljoscha > > On 23.09.20 14:31, 季文昊 wrote: > > Hi there, > > > > I'm working on a Flink platform in my corp, which provides a service to > > provision and manage multiple dedicated Flink clusters. The problem is > that > > we want to sync a job status without delay after its submission through > our > > platform as long as it has been changed. > > > > Since we want to update this in-time and make our services stateless, > > pulling a job's status periodically is not a good solution. I do not find > > any proper way to achieve this by letting a job manager push changes > > directly to our platform except changing the source code, which registers > > an additional `JobStatusListener` in the method > > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. > > > > I wonder if we can enhance `JobStatusListener` a little bit so that a > Flink > > user can register his custom JobStatusListener at the startup. > > > > To be specific, we can have a `JobStatusListenerFactory` interface and > its > > corresponding `ServiceLoader<JobStatusListenerFactory>`, where > > the JobStatusListenerFactory will have the following method: > > - JobStatusListener createJobStatusListener(Properties properties); > > > > Custom listeners will be created during the JobMaster#startScheduling > > method. > > > > If someone would like to implement his own JobStatusListener, he will > > package all the related classes into a standalone jar with a > > > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` > > file and place it under the `lib/` directory. > > > > In addition, I find that there is a Jira ticket similar to what I'm > > asking: FLINK-17104 but I do not see any comment or update yet. Hope > anyone > > could help me move on this feature or give me some suggestions about it. > > > > Thanks, > > Wenhao > > > > |
Till or Chesnay (cc'ed), have you thought about adding a hook on the
JobMaster/JobManager to allow external systems to get push notifications about submitted jobs. If they are ok with such as future, would you maybe be interested in implementing it yourself, Wenhao? Best, Aljoscha On 2020/09/28 11:14, 季文昊 wrote: >Hi Aljoscha, > >Yes, that is not enough, since the `JobListener`s are called only once when >`excute()` or `executeAsync()` is called. And in order to sync the status, >I also have to call `JobClient#getJobStatus` periodically. > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]> >wrote: > >> Hi, >> >> I understand from your email that >> `StreamExecutionEnvironment.registerJobListener()` would not be enought >> for you because you want to be notified of changes on the cluster side, >> correct? That is when the job status changes on the master. >> >> Best, >> Aljoscha >> >> On 23.09.20 14:31, 季文昊 wrote: >> > Hi there, >> > >> > I'm working on a Flink platform in my corp, which provides a service to >> > provision and manage multiple dedicated Flink clusters. The problem is >> that >> > we want to sync a job status without delay after its submission through >> our >> > platform as long as it has been changed. >> > >> > Since we want to update this in-time and make our services stateless, >> > pulling a job's status periodically is not a good solution. I do not find >> > any proper way to achieve this by letting a job manager push changes >> > directly to our platform except changing the source code, which registers >> > an additional `JobStatusListener` in the method >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. >> > >> > I wonder if we can enhance `JobStatusListener` a little bit so that a >> Flink >> > user can register his custom JobStatusListener at the startup. >> > >> > To be specific, we can have a `JobStatusListenerFactory` interface and >> its >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where >> > the JobStatusListenerFactory will have the following method: >> > - JobStatusListener createJobStatusListener(Properties properties); >> > >> > Custom listeners will be created during the JobMaster#startScheduling >> > method. >> > >> > If someone would like to implement his own JobStatusListener, he will >> > package all the related classes into a standalone jar with a >> > >> `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` >> > file and place it under the `lib/` directory. >> > >> > In addition, I find that there is a Jira ticket similar to what I'm >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope >> anyone >> > could help me move on this feature or give me some suggestions about it. >> > >> > Thanks, >> > Wenhao >> > >> >> |
At the moment, this requirement has not come up very often. In general, I
am always a bit cautious when adding functionality which executes user code in the JobManager because it can easily become a stability problem. On the other hand, I can't think of a different solution other than polling the job status atm. Cheers, Till On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]> wrote: > Till or Chesnay (cc'ed), have you thought about adding a hook on the > JobMaster/JobManager to allow external systems to get push notifications > about submitted jobs. > > If they are ok with such as future, would you maybe be interested in > implementing it yourself, Wenhao? > > Best, > Aljoscha > > On 2020/09/28 11:14, 季文昊 wrote: > >Hi Aljoscha, > > > >Yes, that is not enough, since the `JobListener`s are called only once > when > >`excute()` or `executeAsync()` is called. And in order to sync the status, > >I also have to call `JobClient#getJobStatus` periodically. > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]> > >wrote: > > > >> Hi, > >> > >> I understand from your email that > >> `StreamExecutionEnvironment.registerJobListener()` would not be enought > >> for you because you want to be notified of changes on the cluster side, > >> correct? That is when the job status changes on the master. > >> > >> Best, > >> Aljoscha > >> > >> On 23.09.20 14:31, 季文昊 wrote: > >> > Hi there, > >> > > >> > I'm working on a Flink platform in my corp, which provides a service > to > >> > provision and manage multiple dedicated Flink clusters. The problem is > >> that > >> > we want to sync a job status without delay after its submission > through > >> our > >> > platform as long as it has been changed. > >> > > >> > Since we want to update this in-time and make our services stateless, > >> > pulling a job's status periodically is not a good solution. I do not > find > >> > any proper way to achieve this by letting a job manager push changes > >> > directly to our platform except changing the source code, which > registers > >> > an additional `JobStatusListener` in the method > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. > >> > > >> > I wonder if we can enhance `JobStatusListener` a little bit so that a > >> Flink > >> > user can register his custom JobStatusListener at the startup. > >> > > >> > To be specific, we can have a `JobStatusListenerFactory` interface and > >> its > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where > >> > the JobStatusListenerFactory will have the following method: > >> > - JobStatusListener createJobStatusListener(Properties properties); > >> > > >> > Custom listeners will be created during the JobMaster#startScheduling > >> > method. > >> > > >> > If someone would like to implement his own JobStatusListener, he will > >> > package all the related classes into a standalone jar with a > >> > > >> > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` > >> > file and place it under the `lib/` directory. > >> > > >> > In addition, I find that there is a Jira ticket similar to what I'm > >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope > >> anyone > >> > could help me move on this feature or give me some suggestions about > it. > >> > > >> > Thanks, > >> > Wenhao > >> > > >> > >> > |
Hi Till,
IIUC for application mode, we already allow to run user code in job manager Till Rohrmann <[hidden email]> 于2021年1月8日周五 下午9:53写道: > At the moment, this requirement has not come up very often. In general, I > am always a bit cautious when adding functionality which executes user code > in the JobManager because it can easily become a stability problem. On the > other hand, I can't think of a different solution other than polling the > job status atm. > > Cheers, > Till > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]> > wrote: > > > Till or Chesnay (cc'ed), have you thought about adding a hook on the > > JobMaster/JobManager to allow external systems to get push notifications > > about submitted jobs. > > > > If they are ok with such as future, would you maybe be interested in > > implementing it yourself, Wenhao? > > > > Best, > > Aljoscha > > > > On 2020/09/28 11:14, 季文昊 wrote: > > >Hi Aljoscha, > > > > > >Yes, that is not enough, since the `JobListener`s are called only once > > when > > >`excute()` or `executeAsync()` is called. And in order to sync the > status, > > >I also have to call `JobClient#getJobStatus` periodically. > > > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email]> > > >wrote: > > > > > >> Hi, > > >> > > >> I understand from your email that > > >> `StreamExecutionEnvironment.registerJobListener()` would not be > enought > > >> for you because you want to be notified of changes on the cluster > side, > > >> correct? That is when the job status changes on the master. > > >> > > >> Best, > > >> Aljoscha > > >> > > >> On 23.09.20 14:31, 季文昊 wrote: > > >> > Hi there, > > >> > > > >> > I'm working on a Flink platform in my corp, which provides a service > > to > > >> > provision and manage multiple dedicated Flink clusters. The problem > is > > >> that > > >> > we want to sync a job status without delay after its submission > > through > > >> our > > >> > platform as long as it has been changed. > > >> > > > >> > Since we want to update this in-time and make our services > stateless, > > >> > pulling a job's status periodically is not a good solution. I do not > > find > > >> > any proper way to achieve this by letting a job manager push changes > > >> > directly to our platform except changing the source code, which > > registers > > >> > an additional `JobStatusListener` in the method > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. > > >> > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so that > a > > >> Flink > > >> > user can register his custom JobStatusListener at the startup. > > >> > > > >> > To be specific, we can have a `JobStatusListenerFactory` interface > and > > >> its > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where > > >> > the JobStatusListenerFactory will have the following method: > > >> > - JobStatusListener createJobStatusListener(Properties > properties); > > >> > > > >> > Custom listeners will be created during the > JobMaster#startScheduling > > >> > method. > > >> > > > >> > If someone would like to implement his own JobStatusListener, he > will > > >> > package all the related classes into a standalone jar with a > > >> > > > >> > > > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` > > >> > file and place it under the `lib/` directory. > > >> > > > >> > In addition, I find that there is a Jira ticket similar to what I'm > > >> > asking: FLINK-17104 but I do not see any comment or update yet. Hope > > >> anyone > > >> > could help me move on this feature or give me some suggestions about > > it. > > >> > > > >> > Thanks, > > >> > Wenhao > > >> > > > >> > > >> > > > -- Best Regards Jeff Zhang |
Hi Till. Indeed, there is no proper solution now other than the polling
method. It is painful to have such code in our platform since it consumes a lot of resources to keep the polling run periodically when there are hundreds of Flink clusters to maintain. A lot of pollings are actually useless as the job status seldom changes. Also, it makes the status not synchronized in-time. Aljoscha, can I submit a PR firstly so that we can review and discuss whether it will introduce any stability problem or potential risks. Thanks, Wenhao On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <[hidden email]> wrote: > Hi Till, > > IIUC for application mode, we already allow to run user code in job manager > > Till Rohrmann <[hidden email]> 于2021年1月8日周五 下午9:53写道: > > > At the moment, this requirement has not come up very often. In general, I > > am always a bit cautious when adding functionality which executes user > code > > in the JobManager because it can easily become a stability problem. On > the > > other hand, I can't think of a different solution other than polling the > > job status atm. > > > > Cheers, > > Till > > > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]> > > wrote: > > > > > Till or Chesnay (cc'ed), have you thought about adding a hook on the > > > JobMaster/JobManager to allow external systems to get push > notifications > > > about submitted jobs. > > > > > > If they are ok with such as future, would you maybe be interested in > > > implementing it yourself, Wenhao? > > > > > > Best, > > > Aljoscha > > > > > > On 2020/09/28 11:14, 季文昊 wrote: > > > >Hi Aljoscha, > > > > > > > >Yes, that is not enough, since the `JobListener`s are called only once > > > when > > > >`excute()` or `executeAsync()` is called. And in order to sync the > > status, > > > >I also have to call `JobClient#getJobStatus` periodically. > > > > > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek <[hidden email] > > > > > >wrote: > > > > > > > >> Hi, > > > >> > > > >> I understand from your email that > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be > > enought > > > >> for you because you want to be notified of changes on the cluster > > side, > > > >> correct? That is when the job status changes on the master. > > > >> > > > >> Best, > > > >> Aljoscha > > > >> > > > >> On 23.09.20 14:31, 季文昊 wrote: > > > >> > Hi there, > > > >> > > > > >> > I'm working on a Flink platform in my corp, which provides a > service > > > to > > > >> > provision and manage multiple dedicated Flink clusters. The > problem > > is > > > >> that > > > >> > we want to sync a job status without delay after its submission > > > through > > > >> our > > > >> > platform as long as it has been changed. > > > >> > > > > >> > Since we want to update this in-time and make our services > > stateless, > > > >> > pulling a job's status periodically is not a good solution. I do > not > > > find > > > >> > any proper way to achieve this by letting a job manager push > changes > > > >> > directly to our platform except changing the source code, which > > > registers > > > >> > an additional `JobStatusListener` in the method > > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. > > > >> > > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so > that > > a > > > >> Flink > > > >> > user can register his custom JobStatusListener at the startup. > > > >> > > > > >> > To be specific, we can have a `JobStatusListenerFactory` interface > > and > > > >> its > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where > > > >> > the JobStatusListenerFactory will have the following method: > > > >> > - JobStatusListener createJobStatusListener(Properties > > properties); > > > >> > > > > >> > Custom listeners will be created during the > > JobMaster#startScheduling > > > >> > method. > > > >> > > > > >> > If someone would like to implement his own JobStatusListener, he > > will > > > >> > package all the related classes into a standalone jar with a > > > >> > > > > >> > > > > > > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` > > > >> > file and place it under the `lib/` directory. > > > >> > > > > >> > In addition, I find that there is a Jira ticket similar to what > I'm > > > >> > asking: FLINK-17104 but I do not see any comment or update yet. > Hope > > > >> anyone > > > >> > could help me move on this feature or give me some suggestions > about > > > it. > > > >> > > > > >> > Thanks, > > > >> > Wenhao > > > >> > > > > >> > > > >> > > > > > > > > -- > Best Regards > > Jeff Zhang > |
Hi Wenhao,
Aljoscha might not be as responsive as before. Surely you can create a POC to evaluate different approaches. But the outcome should be a design which we discuss before starting to implement the code properly. At the moment the community might be a bit busy with the upcoming feature freeze (just for your information). Cheers, Till On Wed, Mar 10, 2021 at 3:40 PM Wenhao Ji <[hidden email]> wrote: > Hi Till. Indeed, there is no proper solution now other than the polling > method. It is painful to have such code in our platform since it consumes a > lot of resources to keep the polling run periodically when there are > hundreds of Flink clusters to maintain. A lot of pollings are actually > useless as the job status seldom changes. Also, it makes the status not > synchronized in-time. > > Aljoscha, can I submit a PR firstly so that we can review and discuss > whether it will introduce any stability problem or potential risks. > > Thanks, > Wenhao > > On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <[hidden email]> wrote: > > > Hi Till, > > > > IIUC for application mode, we already allow to run user code in job > manager > > > > Till Rohrmann <[hidden email]> 于2021年1月8日周五 下午9:53写道: > > > > > At the moment, this requirement has not come up very often. In > general, I > > > am always a bit cautious when adding functionality which executes user > > code > > > in the JobManager because it can easily become a stability problem. On > > the > > > other hand, I can't think of a different solution other than polling > the > > > job status atm. > > > > > > Cheers, > > > Till > > > > > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email]> > > > wrote: > > > > > > > Till or Chesnay (cc'ed), have you thought about adding a hook on the > > > > JobMaster/JobManager to allow external systems to get push > > notifications > > > > about submitted jobs. > > > > > > > > If they are ok with such as future, would you maybe be interested in > > > > implementing it yourself, Wenhao? > > > > > > > > Best, > > > > Aljoscha > > > > > > > > On 2020/09/28 11:14, 季文昊 wrote: > > > > >Hi Aljoscha, > > > > > > > > > >Yes, that is not enough, since the `JobListener`s are called only > once > > > > when > > > > >`excute()` or `executeAsync()` is called. And in order to sync the > > > status, > > > > >I also have to call `JobClient#getJobStatus` periodically. > > > > > > > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek < > [hidden email] > > > > > > > >wrote: > > > > > > > > > >> Hi, > > > > >> > > > > >> I understand from your email that > > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be > > > enought > > > > >> for you because you want to be notified of changes on the cluster > > > side, > > > > >> correct? That is when the job status changes on the master. > > > > >> > > > > >> Best, > > > > >> Aljoscha > > > > >> > > > > >> On 23.09.20 14:31, 季文昊 wrote: > > > > >> > Hi there, > > > > >> > > > > > >> > I'm working on a Flink platform in my corp, which provides a > > service > > > > to > > > > >> > provision and manage multiple dedicated Flink clusters. The > > problem > > > is > > > > >> that > > > > >> > we want to sync a job status without delay after its submission > > > > through > > > > >> our > > > > >> > platform as long as it has been changed. > > > > >> > > > > > >> > Since we want to update this in-time and make our services > > > stateless, > > > > >> > pulling a job's status periodically is not a good solution. I do > > not > > > > find > > > > >> > any proper way to achieve this by letting a job manager push > > changes > > > > >> > directly to our platform except changing the source code, which > > > > registers > > > > >> > an additional `JobStatusListener` in the method > > > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. > > > > >> > > > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so > > that > > > a > > > > >> Flink > > > > >> > user can register his custom JobStatusListener at the startup. > > > > >> > > > > > >> > To be specific, we can have a `JobStatusListenerFactory` > interface > > > and > > > > >> its > > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where > > > > >> > the JobStatusListenerFactory will have the following method: > > > > >> > - JobStatusListener createJobStatusListener(Properties > > > properties); > > > > >> > > > > > >> > Custom listeners will be created during the > > > JobMaster#startScheduling > > > > >> > method. > > > > >> > > > > > >> > If someone would like to implement his own JobStatusListener, he > > > will > > > > >> > package all the related classes into a standalone jar with a > > > > >> > > > > > >> > > > > > > > > > > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` > > > > >> > file and place it under the `lib/` directory. > > > > >> > > > > > >> > In addition, I find that there is a Jira ticket similar to what > > I'm > > > > >> > asking: FLINK-17104 but I do not see any comment or update yet. > > Hope > > > > >> anyone > > > > >> > could help me move on this feature or give me some suggestions > > about > > > > it. > > > > >> > > > > > >> > Thanks, > > > > >> > Wenhao > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > > > |
Hi Till,
Thanks for taking time out of your busy schedule. I have created a POC for this feature. The code change for the Flink source code will be like this commit: https://github.com/predatorray/flink/commit/2cab8bb1119162213632db984d2eb7529b8140e7 Generally, the idea is that custom `JobStatusListener`s will be loaded via their `JobStatusListenerFactories` using the `PluginManager`. They will be created and initialized during the construction of the `ClusterEntrypoint`. `JobMaster` will register these listeners when it starts scheduling. Finally, we can implement our own plugins. I have also written an example of a JobStatusListener plugin, which simply prints the job status changes: https://github.com/predatorray/flink-example-listener-plugin Hope you will have time to review the code and idea. Thanks again! Wenhao On Wed, Mar 10, 2021 at 11:23 PM Till Rohrmann <[hidden email]> wrote: > Hi Wenhao, > > Aljoscha might not be as responsive as before. Surely you can create a POC > to evaluate different approaches. But the outcome should be a design which > we discuss before starting to implement the code properly. At the moment > the community might be a bit busy with the upcoming feature freeze (just > for your information). > > Cheers, > Till > > On Wed, Mar 10, 2021 at 3:40 PM Wenhao Ji <[hidden email]> wrote: > > > Hi Till. Indeed, there is no proper solution now other than the polling > > method. It is painful to have such code in our platform since it > consumes a > > lot of resources to keep the polling run periodically when there are > > hundreds of Flink clusters to maintain. A lot of pollings are actually > > useless as the job status seldom changes. Also, it makes the status not > > synchronized in-time. > > > > Aljoscha, can I submit a PR firstly so that we can review and discuss > > whether it will introduce any stability problem or potential risks. > > > > Thanks, > > Wenhao > > > > On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <[hidden email]> wrote: > > > > > Hi Till, > > > > > > IIUC for application mode, we already allow to run user code in job > > manager > > > > > > Till Rohrmann <[hidden email]> 于2021年1月8日周五 下午9:53写道: > > > > > > > At the moment, this requirement has not come up very often. In > > general, I > > > > am always a bit cautious when adding functionality which executes > user > > > code > > > > in the JobManager because it can easily become a stability problem. > On > > > the > > > > other hand, I can't think of a different solution other than polling > > the > > > > job status atm. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <[hidden email] > > > > > > wrote: > > > > > > > > > Till or Chesnay (cc'ed), have you thought about adding a hook on > the > > > > > JobMaster/JobManager to allow external systems to get push > > > notifications > > > > > about submitted jobs. > > > > > > > > > > If they are ok with such as future, would you maybe be interested > in > > > > > implementing it yourself, Wenhao? > > > > > > > > > > Best, > > > > > Aljoscha > > > > > > > > > > On 2020/09/28 11:14, 季文昊 wrote: > > > > > >Hi Aljoscha, > > > > > > > > > > > >Yes, that is not enough, since the `JobListener`s are called only > > once > > > > > when > > > > > >`excute()` or `executeAsync()` is called. And in order to sync the > > > > status, > > > > > >I also have to call `JobClient#getJobStatus` periodically. > > > > > > > > > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek < > > [hidden email] > > > > > > > > > >wrote: > > > > > > > > > > > >> Hi, > > > > > >> > > > > > >> I understand from your email that > > > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be > > > > enought > > > > > >> for you because you want to be notified of changes on the > cluster > > > > side, > > > > > >> correct? That is when the job status changes on the master. > > > > > >> > > > > > >> Best, > > > > > >> Aljoscha > > > > > >> > > > > > >> On 23.09.20 14:31, 季文昊 wrote: > > > > > >> > Hi there, > > > > > >> > > > > > > >> > I'm working on a Flink platform in my corp, which provides a > > > service > > > > > to > > > > > >> > provision and manage multiple dedicated Flink clusters. The > > > problem > > > > is > > > > > >> that > > > > > >> > we want to sync a job status without delay after its > submission > > > > > through > > > > > >> our > > > > > >> > platform as long as it has been changed. > > > > > >> > > > > > > >> > Since we want to update this in-time and make our services > > > > stateless, > > > > > >> > pulling a job's status periodically is not a good solution. I > do > > > not > > > > > find > > > > > >> > any proper way to achieve this by letting a job manager push > > > changes > > > > > >> > directly to our platform except changing the source code, > which > > > > > registers > > > > > >> > an additional `JobStatusListener` in the method > > > > > >> > > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. > > > > > >> > > > > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so > > > that > > > > a > > > > > >> Flink > > > > > >> > user can register his custom JobStatusListener at the startup. > > > > > >> > > > > > > >> > To be specific, we can have a `JobStatusListenerFactory` > > interface > > > > and > > > > > >> its > > > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where > > > > > >> > the JobStatusListenerFactory will have the following method: > > > > > >> > - JobStatusListener createJobStatusListener(Properties > > > > properties); > > > > > >> > > > > > > >> > Custom listeners will be created during the > > > > JobMaster#startScheduling > > > > > >> > method. > > > > > >> > > > > > > >> > If someone would like to implement his own JobStatusListener, > he > > > > will > > > > > >> > package all the related classes into a standalone jar with a > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` > > > > > >> > file and place it under the `lib/` directory. > > > > > >> > > > > > > >> > In addition, I find that there is a Jira ticket similar to > what > > > I'm > > > > > >> > asking: FLINK-17104 but I do not see any comment or update > yet. > > > Hope > > > > > >> anyone > > > > > >> > could help me move on this feature or give me some suggestions > > > about > > > > > it. > > > > > >> > > > > > > >> > Thanks, > > > > > >> > Wenhao > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > -- > > > Best Regards > > > > > > Jeff Zhang > > > > > > |
Free forum by Nabble | Edit this page |