(CC user list because I think users may have ideas on how per-job mode
should look like) Hi all, In the discussion about Flink on k8s[1] we encounter a problem that opinions diverge in how so-called per-job mode works. This thread is aimed at stating a dedicated discussion about per-job semantic and how to implement it. **The AS IS per-job mode** * in standalone deployment, we bundle user jar with Flink jar, retrieve JobGraph which is the very first JobGraph from user program in classpath, and then start a Dispatcher with this JobGraph preconfigured, which launches it as "recovered" job. * in YARN deployment, we accept submission via CliFrontend, extract JobGraph which is the very first JobGraph from user program submitted, serialize the JobGraph and upload it to YARN as resource, and then when AM starts, retrieve the JobGraph as resource and start Dispatcher with this JobGraph preconfigured, follows are the same. Specifically, in order to support multiple parts job, if YARN deployment configured as "attached", it starts a SessionCluster, proceeds the progress and shutdown the cluster on job finished. **Motivation** The implementation mentioned above, however, suffers from problems. The major two of them are 1. only respect the very first JobGraph from user program 2. compile job in client side 1. Only respect the very first JobGraph from user program There is already issue about this topic[2]. As we extract JobGraph from user program by hijacking Environment#execute we actually abort any execution after the first call to #execute. Besides it surprises users many times that any logic they write in the program is possibly never executed, here the problem is that the semantic of "job" from Flink perspective. I'd like to say in current implementation "per-job" is actually "per-job-graph". However, in practices since we support jar submission it is "per-program" semantic wanted. 2. Compile job in client side Well, standalone deployment is not in the case. But in YARN deployment, we compile job and get JobGraph in client side, and then upload it to YARN. This approach, however, somehow breaks isolation. We have observed that user program contains exception handling logic which call System.exit in main method, which causes a compilation of the job exit the whole client at once. It is a critical problem if we manage multiple Flink job in a unique platform. In this case, it shut down the whole service. Besides there are many times I was asked why per-job mode doesn't run "just like" session mode but with a dedicated cluster. It might imply that current implementation mismatches users' demand. **Proposal** In order to provide a "per-program" semantic mode which acts "just like" session mode but with a dedicated cluster, I propose a workflow as below. It acts like starting a drive on cluster but is not a general driver solution as proposed here[3], the main purpose of the workflow below is for providing a "per-program" semantic mode. *From CliFrontend* 1. CliFrontend receives submission, gathers all configuration and starts a corresponding ClusterDescriptor. 2. ClusterDescriptor deploys a cluster with main class ProgramClusterEntrypoint while shipping resources including user program. 3. ProgramClusterEntrypoint#main contains logic starting components including Standalone Dispatcher, configuring user program to start a RpcClusterClient, and then invoking main method of user program. 4. RpcClusterClient acts like MiniClusterClient which is able to submit the JobGraph after leader elected so that we don't fallback to round-robin or fail submission due to no leader. 5. Whether or not deliver job result depends on user program logic, since we can already get a JobClient from execute. ProgramClusterEntrypoint exits on user program exits and all jobs submitted globally terminate. This way fits in the direction of FLIP-73 because strategy starting a RpcClusterClient can be regarded as a special Executor. After ProgramClusterEntrypoint#main starts a Cluster, it generates and passes configuration to user program so that when Executor generated, it knows to use a RpcClusterClient for submission and the address of Dispatcher. **Compatibility** In my opinion this mode can be totally an add-on to current codebase. We actually don't replace current per-job mode with so-called "per-program" mode. It happens that current per-job mode would be useless if we have such "per-program" mode so that we possibly deprecate it for preferring the other. I'm glad to discuss more into details if you're interested in, but let's say we'd better first reach a consensus on the overall design :-) Looking forward to your reply! Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-9953 [2] https://issues.apache.org/jira/browse/FLINK-10879 [3] https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# |
Hi
Thanks for bringing this. The design looks very nice to me in that 1. In the new per-job mode, we don't need to compile user programs in the client and can directly run user programs with user jars. That way, it's easier for resource isolation in multi-tenant platforms and is much safer. 2. The execution of user programs can be unified in session and per-job modes. In session mode, user jobs are submitted via a remote ClusterClient while in per-job mode user jobs are submitted via a local ClusterClient. Regards, Xiaogang tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: > (CC user list because I think users may have ideas on how per-job mode > should look like) > > Hi all, > > In the discussion about Flink on k8s[1] we encounter a problem that > opinions > diverge in how so-called per-job mode works. This thread is aimed at > stating > a dedicated discussion about per-job semantic and how to implement it. > > **The AS IS per-job mode** > > * in standalone deployment, we bundle user jar with Flink jar, retrieve > JobGraph which is the very first JobGraph from user program in classpath, > and then start a Dispatcher with this JobGraph preconfigured, which > launches it as "recovered" job. > > * in YARN deployment, we accept submission via CliFrontend, extract > JobGraph > which is the very first JobGraph from user program submitted, serialize > the JobGraph and upload it to YARN as resource, and then when AM starts, > retrieve the JobGraph as resource and start Dispatcher with this JobGraph > preconfigured, follows are the same. > > Specifically, in order to support multiple parts job, if YARN deployment > configured as "attached", it starts a SessionCluster, proceeds the progress > and shutdown the cluster on job finished. > > **Motivation** > > The implementation mentioned above, however, suffers from problems. The > major > two of them are 1. only respect the very first JobGraph from user program > 2. > compile job in client side > > 1. Only respect the very first JobGraph from user program > > There is already issue about this topic[2]. As we extract JobGraph from > user > program by hijacking Environment#execute we actually abort any execution > after the first call to #execute. Besides it surprises users many times > that > any logic they write in the program is possibly never executed, here the > problem is that the semantic of "job" from Flink perspective. I'd like to > say > in current implementation "per-job" is actually "per-job-graph". However, > in practices since we support jar submission it is "per-program" semantic > wanted. > > 2. Compile job in client side > > Well, standalone deployment is not in the case. But in YARN deployment, we > compile job and get JobGraph in client side, and then upload it to YARN. > This approach, however, somehow breaks isolation. We have observed that > user > program contains exception handling logic which call System.exit in main > method, which causes a compilation of the job exit the whole client at > once. > It is a critical problem if we manage multiple Flink job in a unique > platform. > In this case, it shut down the whole service. > > Besides there are many times I was asked why per-job mode doesn't run > "just like" session mode but with a dedicated cluster. It might imply that > current implementation mismatches users' demand. > > **Proposal** > > In order to provide a "per-program" semantic mode which acts "just like" > session > mode but with a dedicated cluster, I propose a workflow as below. It acts > like > starting a drive on cluster but is not a general driver solution as > proposed > here[3], the main purpose of the workflow below is for providing a > "per-program" > semantic mode. > > *From CliFrontend* > > 1. CliFrontend receives submission, gathers all configuration and starts a > corresponding ClusterDescriptor. > > 2. ClusterDescriptor deploys a cluster with main class > ProgramClusterEntrypoint > while shipping resources including user program. > > 3. ProgramClusterEntrypoint#main contains logic starting components > including > Standalone Dispatcher, configuring user program to start a > RpcClusterClient, > and then invoking main method of user program. > > 4. RpcClusterClient acts like MiniClusterClient which is able to submit the > JobGraph after leader elected so that we don't fallback to round-robin or > fail submission due to no leader. > > 5. Whether or not deliver job result depends on user program logic, since > we > can already get a JobClient from execute. ProgramClusterEntrypoint exits on > user program exits and all jobs submitted globally terminate. > > This way fits in the direction of FLIP-73 because strategy starting a > RpcClusterClient can be regarded as a special Executor. After > ProgramClusterEntrypoint#main starts a Cluster, it generates and passes > configuration to > user program so that when Executor generated, it knows to use a > RpcClusterClient > for submission and the address of Dispatcher. > > **Compatibility** > > In my opinion this mode can be totally an add-on to current codebase. We > actually don't replace current per-job mode with so-called "per-program" > mode. > It happens that current per-job mode would be useless if we have such > "per-program" mode so that we possibly deprecate it for preferring the > other. > > I'm glad to discuss more into details if you're interested in, but let's > say > we'd better first reach a consensus on the overall design :-) > > Looking forward to your reply! > > Best, > tison. > > [1] https://issues.apache.org/jira/browse/FLINK-9953 > [2] https://issues.apache.org/jira/browse/FLINK-10879 > [3] > https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# > |
Hi,
Thanks for starting the discussion. WRT the per-job semantic, it looks natural to me that per-job means per-job-graph, because in my understanding JobGraph is the representation of a job. Could you share some use case in which a user program should contain multiple job graphs? WRT the per-program mode, I’m also in flavor of a unified cluster-side execution for user program, so +1 from my side. But I think there may be some values for the current per-job mode: we now have some common resources available on the client machine that would be read by main methods in user programs. If migrated to per-program mode, we must explicitly set the specific resources for each user program and ship them to the cluster, it would be a bit inconvenient. Also, as the job graph is compiled at the client, we can recognize the errors caused by user code before starting the cluster and easily get access to the logs. Best, Paul Lam > 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: > > Hi > > Thanks for bringing this. > > The design looks very nice to me in that > 1. In the new per-job mode, we don't need to compile user programs in the client and can directly run user programs with user jars. That way, it's easier for resource isolation in multi-tenant platforms and is much safer. > 2. The execution of user programs can be unified in session and per-job modes. In session mode, user jobs are submitted via a remote ClusterClient while in per-job mode user jobs are submitted via a local ClusterClient. > > Regards, > Xiaogang > > tison <[hidden email] <mailto:[hidden email]>> 于2019年10月30日周三 下午3:30写道: > (CC user list because I think users may have ideas on how per-job mode should look like) > > Hi all, > > In the discussion about Flink on k8s[1] we encounter a problem that opinions > diverge in how so-called per-job mode works. This thread is aimed at stating > a dedicated discussion about per-job semantic and how to implement it. > > **The AS IS per-job mode** > > * in standalone deployment, we bundle user jar with Flink jar, retrieve > JobGraph which is the very first JobGraph from user program in classpath, > and then start a Dispatcher with this JobGraph preconfigured, which > launches it as "recovered" job. > > * in YARN deployment, we accept submission via CliFrontend, extract JobGraph > which is the very first JobGraph from user program submitted, serialize > the JobGraph and upload it to YARN as resource, and then when AM starts, > retrieve the JobGraph as resource and start Dispatcher with this JobGraph > preconfigured, follows are the same. > > Specifically, in order to support multiple parts job, if YARN deployment > configured as "attached", it starts a SessionCluster, proceeds the progress > and shutdown the cluster on job finished. > > **Motivation** > > The implementation mentioned above, however, suffers from problems. The major > two of them are 1. only respect the very first JobGraph from user program 2. > compile job in client side > > 1. Only respect the very first JobGraph from user program > > There is already issue about this topic[2]. As we extract JobGraph from user > program by hijacking Environment#execute we actually abort any execution > after the first call to #execute. Besides it surprises users many times that > any logic they write in the program is possibly never executed, here the > problem is that the semantic of "job" from Flink perspective. I'd like to say > in current implementation "per-job" is actually "per-job-graph". However, > in practices since we support jar submission it is "per-program" semantic > wanted. > > 2. Compile job in client side > > Well, standalone deployment is not in the case. But in YARN deployment, we > compile job and get JobGraph in client side, and then upload it to YARN. > This approach, however, somehow breaks isolation. We have observed that user > program contains exception handling logic which call System.exit in main > method, which causes a compilation of the job exit the whole client at once. > It is a critical problem if we manage multiple Flink job in a unique platform. > In this case, it shut down the whole service. > > Besides there are many times I was asked why per-job mode doesn't run > "just like" session mode but with a dedicated cluster. It might imply that > current implementation mismatches users' demand. > > **Proposal** > > In order to provide a "per-program" semantic mode which acts "just like" session > mode but with a dedicated cluster, I propose a workflow as below. It acts like > starting a drive on cluster but is not a general driver solution as proposed > here[3], the main purpose of the workflow below is for providing a "per-program" > semantic mode. > > *From CliFrontend* > > 1. CliFrontend receives submission, gathers all configuration and starts a > corresponding ClusterDescriptor. > > 2. ClusterDescriptor deploys a cluster with main class ProgramClusterEntrypoint > while shipping resources including user program. > > 3. ProgramClusterEntrypoint#main contains logic starting components including > Standalone Dispatcher, configuring user program to start a RpcClusterClient, > and then invoking main method of user program. > > 4. RpcClusterClient acts like MiniClusterClient which is able to submit the > JobGraph after leader elected so that we don't fallback to round-robin or > fail submission due to no leader. > > 5. Whether or not deliver job result depends on user program logic, since we > can already get a JobClient from execute. ProgramClusterEntrypoint exits on > user program exits and all jobs submitted globally terminate. > > This way fits in the direction of FLIP-73 because strategy starting a > RpcClusterClient can be regarded as a special Executor. After > ProgramClusterEntrypoint#main starts a Cluster, it generates and passes configuration to > user program so that when Executor generated, it knows to use a RpcClusterClient > for submission and the address of Dispatcher. > > **Compatibility** > > In my opinion this mode can be totally an add-on to current codebase. We > actually don't replace current per-job mode with so-called "per-program" mode. > It happens that current per-job mode would be useless if we have such > "per-program" mode so that we possibly deprecate it for preferring the other. > > I'm glad to discuss more into details if you're interested in, but let's say > we'd better first reach a consensus on the overall design :-) > > Looking forward to your reply! > > Best, > tison. > > [1] https://issues.apache.org/jira/browse/FLINK-9953 <https://issues.apache.org/jira/browse/FLINK-9953> > [2] https://issues.apache.org/jira/browse/FLINK-10879 <https://issues.apache.org/jira/browse/FLINK-10879> > [3] https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# <https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#> |
Thanks for your attentions!
@[hidden email] <[hidden email]> Yes correct. We try to avoid jobs affect one another. Also a local ClusterClient in case saves the overhead about retry before leader elected and persist JobGraph before submission in RestClusterClient as well as the net cost. @Paul Lam <[hidden email]> 1. Here is already a note[1] about multiple part jobs. I am also confused a bit on this concept at first :-) Things go in similar way if you program contains the only JobGraph so that I think per-program acts like per-job-graph in this case which provides compatibility for many of one job graph program. Besides, we have to respect user program which doesn't with current implementation because we return abruptly when calling env#execute which hijack user control so that they cannot deal with the job result or the future of it. I think this is why we have to add a detach/attach option. 2. For compilation part, I think it could be a workaround that you upload those resources in a commonly known address such as HDFS so that compilation can read from either client or cluster. Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 Newport, Billy <[hidden email]> 于2019年10月30日周三 下午10:41写道: > We execute multiple job graphs routinely because we cannot submit a single > graph without it blowing up. I believe Regina spoke to this in Berlin > during her talk. We instead if we are processing a database ingestion with > 200 tables in it, we do a job graph per table rather than a single job > graph that does all tables instead. A single job graph can be in the tens > of thousands of nodes in our largest cases and we have found flink (as os > 1.3/1.6.4) cannot handle graphs of that size. We’re currently testing 1.9.1 > but have not retested the large graph scenario. > > > > Billy > > > > > > *From:* Paul Lam [mailto:[hidden email]] > *Sent:* Wednesday, October 30, 2019 8:41 AM > *To:* SHI Xiaogang > *Cc:* tison; dev; user > *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode > > > > Hi, > > > > Thanks for starting the discussion. > > > > WRT the per-job semantic, it looks natural to me that per-job means > per-job-graph, > > because in my understanding JobGraph is the representation of a job. Could > you > > share some use case in which a user program should contain multiple job > graphs? > > > > WRT the per-program mode, I’m also in flavor of a unified cluster-side > execution > > for user program, so +1 from my side. > > > > But I think there may be some values for the current per-job mode: we now > have > > some common resources available on the client machine that would be read > by main > > methods in user programs. If migrated to per-program mode, we must > explicitly > > set the specific resources for each user program and ship them to the > cluster, > > it would be a bit inconvenient. Also, as the job graph is compiled at the > client, > > we can recognize the errors caused by user code before starting the > cluster > > and easily get access to the logs. > > > > Best, > > Paul Lam > > > > 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: > > > > Hi > > > > Thanks for bringing this. > > > > The design looks very nice to me in that > > 1. In the new per-job mode, we don't need to compile user programs in the > client and can directly run user programs with user jars. That way, it's > easier for resource isolation in multi-tenant platforms and is much safer. > > 2. The execution of user programs can be unified in session and per-job > modes. In session mode, user jobs are submitted via a remote ClusterClient > while in per-job mode user jobs are submitted via a local ClusterClient. > > > > Regards, > > Xiaogang > > > > tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: > > (CC user list because I think users may have ideas on how per-job mode > should look like) > > > > Hi all, > > In the discussion about Flink on k8s[1] we encounter a problem that > opinions > diverge in how so-called per-job mode works. This thread is aimed at > stating > a dedicated discussion about per-job semantic and how to implement it. > > **The AS IS per-job mode** > > * in standalone deployment, we bundle user jar with Flink jar, retrieve > JobGraph which is the very first JobGraph from user program in classpath, > and then start a Dispatcher with this JobGraph preconfigured, which > launches it as "recovered" job. > > * in YARN deployment, we accept submission via CliFrontend, extract > JobGraph > which is the very first JobGraph from user program submitted, serialize > the JobGraph and upload it to YARN as resource, and then when AM starts, > retrieve the JobGraph as resource and start Dispatcher with this JobGraph > preconfigured, follows are the same. > > Specifically, in order to support multiple parts job, if YARN deployment > configured as "attached", it starts a SessionCluster, proceeds the progress > and shutdown the cluster on job finished. > > **Motivation** > > The implementation mentioned above, however, suffers from problems. The > major > two of them are 1. only respect the very first JobGraph from user program > 2. > compile job in client side > > 1. Only respect the very first JobGraph from user program > > There is already issue about this topic[2]. As we extract JobGraph from > user > program by hijacking Environment#execute we actually abort any execution > after the first call to #execute. Besides it surprises users many times > that > any logic they write in the program is possibly never executed, here the > problem is that the semantic of "job" from Flink perspective. I'd like to > say > in current implementation "per-job" is actually "per-job-graph". However, > in practices since we support jar submission it is "per-program" semantic > wanted. > > 2. Compile job in client side > > Well, standalone deployment is not in the case. But in YARN deployment, we > compile job and get JobGraph in client side, and then upload it to YARN. > This approach, however, somehow breaks isolation. We have observed that > user > program contains exception handling logic which call System.exit in main > method, which causes a compilation of the job exit the whole client at > once. > It is a critical problem if we manage multiple Flink job in a unique > platform. > In this case, it shut down the whole service. > > Besides there are many times I was asked why per-job mode doesn't run > "just like" session mode but with a dedicated cluster. It might imply that > current implementation mismatches users' demand. > > **Proposal** > > In order to provide a "per-program" semantic mode which acts "just like" > session > mode but with a dedicated cluster, I propose a workflow as below. It acts > like > starting a drive on cluster but is not a general driver solution as > proposed > here[3], the main purpose of the workflow below is for providing a > "per-program" > semantic mode. > > *From CliFrontend* > > 1. CliFrontend receives submission, gathers all configuration and starts a > corresponding ClusterDescriptor. > > 2. ClusterDescriptor deploys a cluster with main class > ProgramClusterEntrypoint > while shipping resources including user program. > > 3. ProgramClusterEntrypoint#main contains logic starting components > including > Standalone Dispatcher, configuring user program to start a > RpcClusterClient, > and then invoking main method of user program. > > 4. RpcClusterClient acts like MiniClusterClient which is able to submit the > JobGraph after leader elected so that we don't fallback to round-robin or > fail submission due to no leader. > > 5. Whether or not deliver job result depends on user program logic, since > we > can already get a JobClient from execute. ProgramClusterEntrypoint exits on > user program exits and all jobs submitted globally terminate. > > This way fits in the direction of FLIP-73 because strategy starting a > RpcClusterClient can be regarded as a special Executor. After > ProgramClusterEntrypoint#main starts a Cluster, it generates and passes > configuration to > user program so that when Executor generated, it knows to use a > RpcClusterClient > for submission and the address of Dispatcher. > > **Compatibility** > > In my opinion this mode can be totally an add-on to current codebase. We > actually don't replace current per-job mode with so-called "per-program" > mode. > It happens that current per-job mode would be useless if we have such > "per-program" mode so that we possibly deprecate it for preferring the > other. > > I'm glad to discuss more into details if you're interested in, but let's > say > we'd better first reach a consensus on the overall design :-) > > Looking forward to your reply! > > Best, > tison. > > [1] https://issues.apache.org/jira/browse/FLINK-9953 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> > [2] https://issues.apache.org/jira/browse/FLINK-10879 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> > [3] > https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# > <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> > > > > ------------------------------ > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices > |
Hi all,
Regarding the compilation part: I think there are up and downsides to building the Flink job (running the main method) on the client side, however since this is the current way of doing it we should have a very powerful reason to change the default behaviour. While there is a possible workaround for reading local resources on the client (upload to hdfs) if this is a common pattern then it makes no sense to force users to this workaround. If we want to introduce server-side building of the jobs we should provide both options for the users to choose from in my opinion and client side should probably stay the default. Cheers, Gyula On Thu, Oct 31, 2019 at 2:18 AM tison <[hidden email]> wrote: > Thanks for your attentions! > > @[hidden email] <[hidden email]> > > Yes correct. We try to avoid jobs affect one another. Also a local > ClusterClient > in case saves the overhead about retry before leader elected and persist > JobGraph before submission in RestClusterClient as well as the net cost. > > @Paul Lam <[hidden email]> > > 1. Here is already a note[1] about multiple part jobs. I am also confused > a bit > on this concept at first :-) Things go in similar way if you program > contains the > only JobGraph so that I think per-program acts like per-job-graph in this > case > which provides compatibility for many of one job graph program. > > Besides, we have to respect user program which doesn't with current > implementation because we return abruptly when calling env#execute which > hijack user control so that they cannot deal with the job result or the > future of > it. I think this is why we have to add a detach/attach option. > > 2. For compilation part, I think it could be a workaround that you upload > those > resources in a commonly known address such as HDFS so that compilation > can read from either client or cluster. > > Best, > tison. > > [1] > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 > > > Newport, Billy <[hidden email]> 于2019年10月30日周三 下午10:41写道: > >> We execute multiple job graphs routinely because we cannot submit a >> single graph without it blowing up. I believe Regina spoke to this in >> Berlin during her talk. We instead if we are processing a database >> ingestion with 200 tables in it, we do a job graph per table rather than a >> single job graph that does all tables instead. A single job graph can be in >> the tens of thousands of nodes in our largest cases and we have found flink >> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently >> testing 1.9.1 but have not retested the large graph scenario. >> >> >> >> Billy >> >> >> >> >> >> *From:* Paul Lam [mailto:[hidden email]] >> *Sent:* Wednesday, October 30, 2019 8:41 AM >> *To:* SHI Xiaogang >> *Cc:* tison; dev; user >> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >> >> >> >> Hi, >> >> >> >> Thanks for starting the discussion. >> >> >> >> WRT the per-job semantic, it looks natural to me that per-job means >> per-job-graph, >> >> because in my understanding JobGraph is the representation of a job. >> Could you >> >> share some use case in which a user program should contain multiple job >> graphs? >> >> >> >> WRT the per-program mode, I’m also in flavor of a unified cluster-side >> execution >> >> for user program, so +1 from my side. >> >> >> >> But I think there may be some values for the current per-job mode: we now >> have >> >> some common resources available on the client machine that would be read >> by main >> >> methods in user programs. If migrated to per-program mode, we must >> explicitly >> >> set the specific resources for each user program and ship them to the >> cluster, >> >> it would be a bit inconvenient. Also, as the job graph is compiled at >> the client, >> >> we can recognize the errors caused by user code before starting the >> cluster >> >> and easily get access to the logs. >> >> >> >> Best, >> >> Paul Lam >> >> >> >> 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: >> >> >> >> Hi >> >> >> >> Thanks for bringing this. >> >> >> >> The design looks very nice to me in that >> >> 1. In the new per-job mode, we don't need to compile user programs in the >> client and can directly run user programs with user jars. That way, it's >> easier for resource isolation in multi-tenant platforms and is much safer. >> >> 2. The execution of user programs can be unified in session and per-job >> modes. In session mode, user jobs are submitted via a remote ClusterClient >> while in per-job mode user jobs are submitted via a local ClusterClient. >> >> >> >> Regards, >> >> Xiaogang >> >> >> >> tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: >> >> (CC user list because I think users may have ideas on how per-job mode >> should look like) >> >> >> >> Hi all, >> >> In the discussion about Flink on k8s[1] we encounter a problem that >> opinions >> diverge in how so-called per-job mode works. This thread is aimed at >> stating >> a dedicated discussion about per-job semantic and how to implement it. >> >> **The AS IS per-job mode** >> >> * in standalone deployment, we bundle user jar with Flink jar, retrieve >> JobGraph which is the very first JobGraph from user program in classpath, >> and then start a Dispatcher with this JobGraph preconfigured, which >> launches it as "recovered" job. >> >> * in YARN deployment, we accept submission via CliFrontend, extract >> JobGraph >> which is the very first JobGraph from user program submitted, serialize >> the JobGraph and upload it to YARN as resource, and then when AM starts, >> retrieve the JobGraph as resource and start Dispatcher with this JobGraph >> preconfigured, follows are the same. >> >> Specifically, in order to support multiple parts job, if YARN deployment >> configured as "attached", it starts a SessionCluster, proceeds the >> progress >> and shutdown the cluster on job finished. >> >> **Motivation** >> >> The implementation mentioned above, however, suffers from problems. The >> major >> two of them are 1. only respect the very first JobGraph from user program >> 2. >> compile job in client side >> >> 1. Only respect the very first JobGraph from user program >> >> There is already issue about this topic[2]. As we extract JobGraph from >> user >> program by hijacking Environment#execute we actually abort any execution >> after the first call to #execute. Besides it surprises users many times >> that >> any logic they write in the program is possibly never executed, here the >> problem is that the semantic of "job" from Flink perspective. I'd like to >> say >> in current implementation "per-job" is actually "per-job-graph". However, >> in practices since we support jar submission it is "per-program" semantic >> wanted. >> >> 2. Compile job in client side >> >> Well, standalone deployment is not in the case. But in YARN deployment, we >> compile job and get JobGraph in client side, and then upload it to YARN. >> This approach, however, somehow breaks isolation. We have observed that >> user >> program contains exception handling logic which call System.exit in main >> method, which causes a compilation of the job exit the whole client at >> once. >> It is a critical problem if we manage multiple Flink job in a unique >> platform. >> In this case, it shut down the whole service. >> >> Besides there are many times I was asked why per-job mode doesn't run >> "just like" session mode but with a dedicated cluster. It might imply that >> current implementation mismatches users' demand. >> >> **Proposal** >> >> In order to provide a "per-program" semantic mode which acts "just like" >> session >> mode but with a dedicated cluster, I propose a workflow as below. It acts >> like >> starting a drive on cluster but is not a general driver solution as >> proposed >> here[3], the main purpose of the workflow below is for providing a >> "per-program" >> semantic mode. >> >> *From CliFrontend* >> >> 1. CliFrontend receives submission, gathers all configuration and starts a >> corresponding ClusterDescriptor. >> >> 2. ClusterDescriptor deploys a cluster with main class >> ProgramClusterEntrypoint >> while shipping resources including user program. >> >> 3. ProgramClusterEntrypoint#main contains logic starting components >> including >> Standalone Dispatcher, configuring user program to start a >> RpcClusterClient, >> and then invoking main method of user program. >> >> 4. RpcClusterClient acts like MiniClusterClient which is able to submit >> the >> JobGraph after leader elected so that we don't fallback to round-robin or >> fail submission due to no leader. >> >> 5. Whether or not deliver job result depends on user program logic, since >> we >> can already get a JobClient from execute. ProgramClusterEntrypoint exits >> on >> user program exits and all jobs submitted globally terminate. >> >> This way fits in the direction of FLIP-73 because strategy starting a >> RpcClusterClient can be regarded as a special Executor. After >> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes >> configuration to >> user program so that when Executor generated, it knows to use a >> RpcClusterClient >> for submission and the address of Dispatcher. >> >> **Compatibility** >> >> In my opinion this mode can be totally an add-on to current codebase. We >> actually don't replace current per-job mode with so-called "per-program" >> mode. >> It happens that current per-job mode would be useless if we have such >> "per-program" mode so that we possibly deprecate it for preferring the >> other. >> >> I'm glad to discuss more into details if you're interested in, but let's >> say >> we'd better first reach a consensus on the overall design :-) >> >> Looking forward to your reply! >> >> Best, >> tison. >> >> [1] https://issues.apache.org/jira/browse/FLINK-9953 >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> >> [2] https://issues.apache.org/jira/browse/FLINK-10879 >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> >> [3] >> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> >> >> >> >> ------------------------------ >> >> Your Personal Data: We may collect and process information about you that >> may be subject to data protection laws. For more information about how we >> use and disclose your personal data, how we protect your information, our >> legal basis to use your information, your rights and who you can contact, >> please refer to: www.gs.com/privacy-notices >> > |
In reply to this post by tison
Thanks for tison starting this exciting discussion. We also suffer a lot
from the per job mode. I think the per-job cluster is a dedicated cluster for only one job and will not accept more other jobs. It has the advantage of one-step submission, do not need to start dispatcher first and then submit the job. And it does not matter where the job graph is generated and job is submitted. Now we have two cases. (1) Current Yarn detached cluster. The job graph is generated in client and then use distributed cache to flink master container. And the MiniDispatcher uses `FileJobGraphRetrieve` to get it. The job will be submitted at flink master side. (2) Standalone per job cluster. User jars are already built into image. So the job graph will be generated at flink master side and `ClasspathJobGraphRetriver` is used to get it. The job will also be submitted at flink master side. For the (1) and (2), only one job in user program could be supported. The per job means per job-graph, so it works just as expected. Tison suggests to add a new mode "per-program”. The user jar will be transferred to flink master container, and a local client will be started to generate job graph and submit job. I think it could cover all the functionality of current per job, both (1) and (2). Also the detach mode and attach mode could be unified. We do not need to start a session cluster to simulate per job for multiple parts. I am in favor of the “per-program” mode. Just two concerns. 1. How many users are using multiple jobs in one program? 2. Why do not always use session cluster to simulate per job? Maybe one-step submission is a convincing reason. Best, Yang tison <[hidden email]> 于2019年10月31日周四 上午9:18写道: > Thanks for your attentions! > > @[hidden email] <[hidden email]> > > Yes correct. We try to avoid jobs affect one another. Also a local > ClusterClient > in case saves the overhead about retry before leader elected and persist > JobGraph before submission in RestClusterClient as well as the net cost. > > @Paul Lam <[hidden email]> > > 1. Here is already a note[1] about multiple part jobs. I am also confused > a bit > on this concept at first :-) Things go in similar way if you program > contains the > only JobGraph so that I think per-program acts like per-job-graph in this > case > which provides compatibility for many of one job graph program. > > Besides, we have to respect user program which doesn't with current > implementation because we return abruptly when calling env#execute which > hijack user control so that they cannot deal with the job result or the > future of > it. I think this is why we have to add a detach/attach option. > > 2. For compilation part, I think it could be a workaround that you upload > those > resources in a commonly known address such as HDFS so that compilation > can read from either client or cluster. > > Best, > tison. > > [1] > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 > > > Newport, Billy <[hidden email]> 于2019年10月30日周三 下午10:41写道: > >> We execute multiple job graphs routinely because we cannot submit a >> single graph without it blowing up. I believe Regina spoke to this in >> Berlin during her talk. We instead if we are processing a database >> ingestion with 200 tables in it, we do a job graph per table rather than a >> single job graph that does all tables instead. A single job graph can be in >> the tens of thousands of nodes in our largest cases and we have found flink >> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently >> testing 1.9.1 but have not retested the large graph scenario. >> >> >> >> Billy >> >> >> >> >> >> *From:* Paul Lam [mailto:[hidden email]] >> *Sent:* Wednesday, October 30, 2019 8:41 AM >> *To:* SHI Xiaogang >> *Cc:* tison; dev; user >> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >> >> >> >> Hi, >> >> >> >> Thanks for starting the discussion. >> >> >> >> WRT the per-job semantic, it looks natural to me that per-job means >> per-job-graph, >> >> because in my understanding JobGraph is the representation of a job. >> Could you >> >> share some use case in which a user program should contain multiple job >> graphs? >> >> >> >> WRT the per-program mode, I’m also in flavor of a unified cluster-side >> execution >> >> for user program, so +1 from my side. >> >> >> >> But I think there may be some values for the current per-job mode: we now >> have >> >> some common resources available on the client machine that would be read >> by main >> >> methods in user programs. If migrated to per-program mode, we must >> explicitly >> >> set the specific resources for each user program and ship them to the >> cluster, >> >> it would be a bit inconvenient. Also, as the job graph is compiled at >> the client, >> >> we can recognize the errors caused by user code before starting the >> cluster >> >> and easily get access to the logs. >> >> >> >> Best, >> >> Paul Lam >> >> >> >> 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: >> >> >> >> Hi >> >> >> >> Thanks for bringing this. >> >> >> >> The design looks very nice to me in that >> >> 1. In the new per-job mode, we don't need to compile user programs in the >> client and can directly run user programs with user jars. That way, it's >> easier for resource isolation in multi-tenant platforms and is much safer. >> >> 2. The execution of user programs can be unified in session and per-job >> modes. In session mode, user jobs are submitted via a remote ClusterClient >> while in per-job mode user jobs are submitted via a local ClusterClient. >> >> >> >> Regards, >> >> Xiaogang >> >> >> >> tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: >> >> (CC user list because I think users may have ideas on how per-job mode >> should look like) >> >> >> >> Hi all, >> >> In the discussion about Flink on k8s[1] we encounter a problem that >> opinions >> diverge in how so-called per-job mode works. This thread is aimed at >> stating >> a dedicated discussion about per-job semantic and how to implement it. >> >> **The AS IS per-job mode** >> >> * in standalone deployment, we bundle user jar with Flink jar, retrieve >> JobGraph which is the very first JobGraph from user program in classpath, >> and then start a Dispatcher with this JobGraph preconfigured, which >> launches it as "recovered" job. >> >> * in YARN deployment, we accept submission via CliFrontend, extract >> JobGraph >> which is the very first JobGraph from user program submitted, serialize >> the JobGraph and upload it to YARN as resource, and then when AM starts, >> retrieve the JobGraph as resource and start Dispatcher with this JobGraph >> preconfigured, follows are the same. >> >> Specifically, in order to support multiple parts job, if YARN deployment >> configured as "attached", it starts a SessionCluster, proceeds the >> progress >> and shutdown the cluster on job finished. >> >> **Motivation** >> >> The implementation mentioned above, however, suffers from problems. The >> major >> two of them are 1. only respect the very first JobGraph from user program >> 2. >> compile job in client side >> >> 1. Only respect the very first JobGraph from user program >> >> There is already issue about this topic[2]. As we extract JobGraph from >> user >> program by hijacking Environment#execute we actually abort any execution >> after the first call to #execute. Besides it surprises users many times >> that >> any logic they write in the program is possibly never executed, here the >> problem is that the semantic of "job" from Flink perspective. I'd like to >> say >> in current implementation "per-job" is actually "per-job-graph". However, >> in practices since we support jar submission it is "per-program" semantic >> wanted. >> >> 2. Compile job in client side >> >> Well, standalone deployment is not in the case. But in YARN deployment, we >> compile job and get JobGraph in client side, and then upload it to YARN. >> This approach, however, somehow breaks isolation. We have observed that >> user >> program contains exception handling logic which call System.exit in main >> method, which causes a compilation of the job exit the whole client at >> once. >> It is a critical problem if we manage multiple Flink job in a unique >> platform. >> In this case, it shut down the whole service. >> >> Besides there are many times I was asked why per-job mode doesn't run >> "just like" session mode but with a dedicated cluster. It might imply that >> current implementation mismatches users' demand. >> >> **Proposal** >> >> In order to provide a "per-program" semantic mode which acts "just like" >> session >> mode but with a dedicated cluster, I propose a workflow as below. It acts >> like >> starting a drive on cluster but is not a general driver solution as >> proposed >> here[3], the main purpose of the workflow below is for providing a >> "per-program" >> semantic mode. >> >> *From CliFrontend* >> >> 1. CliFrontend receives submission, gathers all configuration and starts a >> corresponding ClusterDescriptor. >> >> 2. ClusterDescriptor deploys a cluster with main class >> ProgramClusterEntrypoint >> while shipping resources including user program. >> >> 3. ProgramClusterEntrypoint#main contains logic starting components >> including >> Standalone Dispatcher, configuring user program to start a >> RpcClusterClient, >> and then invoking main method of user program. >> >> 4. RpcClusterClient acts like MiniClusterClient which is able to submit >> the >> JobGraph after leader elected so that we don't fallback to round-robin or >> fail submission due to no leader. >> >> 5. Whether or not deliver job result depends on user program logic, since >> we >> can already get a JobClient from execute. ProgramClusterEntrypoint exits >> on >> user program exits and all jobs submitted globally terminate. >> >> This way fits in the direction of FLIP-73 because strategy starting a >> RpcClusterClient can be regarded as a special Executor. After >> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes >> configuration to >> user program so that when Executor generated, it knows to use a >> RpcClusterClient >> for submission and the address of Dispatcher. >> >> **Compatibility** >> >> In my opinion this mode can be totally an add-on to current codebase. We >> actually don't replace current per-job mode with so-called "per-program" >> mode. >> It happens that current per-job mode would be useless if we have such >> "per-program" mode so that we possibly deprecate it for preferring the >> other. >> >> I'm glad to discuss more into details if you're interested in, but let's >> say >> we'd better first reach a consensus on the overall design :-) >> >> Looking forward to your reply! >> >> Best, >> tison. >> >> [1] https://issues.apache.org/jira/browse/FLINK-9953 >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> >> [2] https://issues.apache.org/jira/browse/FLINK-10879 >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> >> [3] >> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> >> >> >> >> ------------------------------ >> >> Your Personal Data: We may collect and process information about you that >> may be subject to data protection laws. For more information about how we >> use and disclose your personal data, how we protect your information, our >> legal basis to use your information, your rights and who you can contact, >> please refer to: www.gs.com/privacy-notices >> > |
Hi all,
we're using a lot the multiple jobs in one program and this is why: when you fetch data from a huge number of sources and, for each source, you do some transformation and then you want to write into a single directory the union of all outputs (this assumes you're doing batch). When the number of sources is large, if you want to do this in a single job, the graph becomes very big and this is a problem for several reasons: - too many substasks /threadsi per slot - increase of back pressure - if a single "sub-job" fails all the job fails..this is very annoying if this happens after a half a day for example - In our use case, the big-graph mode takes much longer than running each job separately (but maybe this is true only if you don't have much hardware resources) - debugging the cause of a fail could become a daunting task if the job graph is too large - we faced may strange errors when trying to run the single big-job mode (due to serialization corruption) So, summarizing our overall experience with Flink batch is: the easier is the job graph the better! Best, Flavio On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <[hidden email]> wrote: > Thanks for tison starting this exciting discussion. We also suffer a lot > from the per job mode. > I think the per-job cluster is a dedicated cluster for only one job and > will not accept more other > jobs. It has the advantage of one-step submission, do not need to start > dispatcher first and > then submit the job. And it does not matter where the job graph is > generated and job is submitted. > Now we have two cases. > > (1) Current Yarn detached cluster. The job graph is generated in client > and then use distributed > cache to flink master container. And the MiniDispatcher uses > `FileJobGraphRetrieve` to get it. > The job will be submitted at flink master side. > > > (2) Standalone per job cluster. User jars are already built into image. So > the job graph will be > generated at flink master side and `ClasspathJobGraphRetriver` is used to > get it. The job will > also be submitted at flink master side. > > > For the (1) and (2), only one job in user program could be supported. The > per job means > per job-graph, so it works just as expected. > > > Tison suggests to add a new mode "per-program”. The user jar will be > transferred to flink master > container, and a local client will be started to generate job graph and > submit job. I think it could > cover all the functionality of current per job, both (1) and (2). Also the > detach mode and attach > mode could be unified. We do not need to start a session cluster to > simulate per job for multiple parts. > > > I am in favor of the “per-program” mode. Just two concerns. > 1. How many users are using multiple jobs in one program? > 2. Why do not always use session cluster to simulate per job? Maybe > one-step submission > is a convincing reason. > > > > Best, > Yang > > tison <[hidden email]> 于2019年10月31日周四 上午9:18写道: > >> Thanks for your attentions! >> >> @[hidden email] <[hidden email]> >> >> Yes correct. We try to avoid jobs affect one another. Also a local >> ClusterClient >> in case saves the overhead about retry before leader elected and persist >> JobGraph before submission in RestClusterClient as well as the net cost. >> >> @Paul Lam <[hidden email]> >> >> 1. Here is already a note[1] about multiple part jobs. I am also confused >> a bit >> on this concept at first :-) Things go in similar way if you program >> contains the >> only JobGraph so that I think per-program acts like per-job-graph in this >> case >> which provides compatibility for many of one job graph program. >> >> Besides, we have to respect user program which doesn't with current >> implementation because we return abruptly when calling env#execute which >> hijack user control so that they cannot deal with the job result or the >> future of >> it. I think this is why we have to add a detach/attach option. >> >> 2. For compilation part, I think it could be a workaround that you upload >> those >> resources in a commonly known address such as HDFS so that compilation >> can read from either client or cluster. >> >> Best, >> tison. >> >> [1] >> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 >> >> >> Newport, Billy <[hidden email]> 于2019年10月30日周三 下午10:41写道: >> >>> We execute multiple job graphs routinely because we cannot submit a >>> single graph without it blowing up. I believe Regina spoke to this in >>> Berlin during her talk. We instead if we are processing a database >>> ingestion with 200 tables in it, we do a job graph per table rather than a >>> single job graph that does all tables instead. A single job graph can be in >>> the tens of thousands of nodes in our largest cases and we have found flink >>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently >>> testing 1.9.1 but have not retested the large graph scenario. >>> >>> >>> >>> Billy >>> >>> >>> >>> >>> >>> *From:* Paul Lam [mailto:[hidden email]] >>> *Sent:* Wednesday, October 30, 2019 8:41 AM >>> *To:* SHI Xiaogang >>> *Cc:* tison; dev; user >>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >>> >>> >>> >>> Hi, >>> >>> >>> >>> Thanks for starting the discussion. >>> >>> >>> >>> WRT the per-job semantic, it looks natural to me that per-job means >>> per-job-graph, >>> >>> because in my understanding JobGraph is the representation of a job. >>> Could you >>> >>> share some use case in which a user program should contain multiple job >>> graphs? >>> >>> >>> >>> WRT the per-program mode, I’m also in flavor of a unified cluster-side >>> execution >>> >>> for user program, so +1 from my side. >>> >>> >>> >>> But I think there may be some values for the current per-job mode: we >>> now have >>> >>> some common resources available on the client machine that would be read >>> by main >>> >>> methods in user programs. If migrated to per-program mode, we must >>> explicitly >>> >>> set the specific resources for each user program and ship them to the >>> cluster, >>> >>> it would be a bit inconvenient. Also, as the job graph is compiled at >>> the client, >>> >>> we can recognize the errors caused by user code before starting the >>> cluster >>> >>> and easily get access to the logs. >>> >>> >>> >>> Best, >>> >>> Paul Lam >>> >>> >>> >>> 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: >>> >>> >>> >>> Hi >>> >>> >>> >>> Thanks for bringing this. >>> >>> >>> >>> The design looks very nice to me in that >>> >>> 1. In the new per-job mode, we don't need to compile user programs in >>> the client and can directly run user programs with user jars. That way, >>> it's easier for resource isolation in multi-tenant platforms and is much >>> safer. >>> >>> 2. The execution of user programs can be unified in session and per-job >>> modes. In session mode, user jobs are submitted via a remote ClusterClient >>> while in per-job mode user jobs are submitted via a local ClusterClient. >>> >>> >>> >>> Regards, >>> >>> Xiaogang >>> >>> >>> >>> tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: >>> >>> (CC user list because I think users may have ideas on how per-job mode >>> should look like) >>> >>> >>> >>> Hi all, >>> >>> In the discussion about Flink on k8s[1] we encounter a problem that >>> opinions >>> diverge in how so-called per-job mode works. This thread is aimed at >>> stating >>> a dedicated discussion about per-job semantic and how to implement it. >>> >>> **The AS IS per-job mode** >>> >>> * in standalone deployment, we bundle user jar with Flink jar, retrieve >>> JobGraph which is the very first JobGraph from user program in classpath, >>> and then start a Dispatcher with this JobGraph preconfigured, which >>> launches it as "recovered" job. >>> >>> * in YARN deployment, we accept submission via CliFrontend, extract >>> JobGraph >>> which is the very first JobGraph from user program submitted, serialize >>> the JobGraph and upload it to YARN as resource, and then when AM starts, >>> retrieve the JobGraph as resource and start Dispatcher with this JobGraph >>> preconfigured, follows are the same. >>> >>> Specifically, in order to support multiple parts job, if YARN deployment >>> configured as "attached", it starts a SessionCluster, proceeds the >>> progress >>> and shutdown the cluster on job finished. >>> >>> **Motivation** >>> >>> The implementation mentioned above, however, suffers from problems. The >>> major >>> two of them are 1. only respect the very first JobGraph from user >>> program 2. >>> compile job in client side >>> >>> 1. Only respect the very first JobGraph from user program >>> >>> There is already issue about this topic[2]. As we extract JobGraph from >>> user >>> program by hijacking Environment#execute we actually abort any execution >>> after the first call to #execute. Besides it surprises users many times >>> that >>> any logic they write in the program is possibly never executed, here the >>> problem is that the semantic of "job" from Flink perspective. I'd like >>> to say >>> in current implementation "per-job" is actually "per-job-graph". However, >>> in practices since we support jar submission it is "per-program" semantic >>> wanted. >>> >>> 2. Compile job in client side >>> >>> Well, standalone deployment is not in the case. But in YARN deployment, >>> we >>> compile job and get JobGraph in client side, and then upload it to YARN. >>> This approach, however, somehow breaks isolation. We have observed that >>> user >>> program contains exception handling logic which call System.exit in main >>> method, which causes a compilation of the job exit the whole client at >>> once. >>> It is a critical problem if we manage multiple Flink job in a unique >>> platform. >>> In this case, it shut down the whole service. >>> >>> Besides there are many times I was asked why per-job mode doesn't run >>> "just like" session mode but with a dedicated cluster. It might imply >>> that >>> current implementation mismatches users' demand. >>> >>> **Proposal** >>> >>> In order to provide a "per-program" semantic mode which acts "just like" >>> session >>> mode but with a dedicated cluster, I propose a workflow as below. It >>> acts like >>> starting a drive on cluster but is not a general driver solution as >>> proposed >>> here[3], the main purpose of the workflow below is for providing a >>> "per-program" >>> semantic mode. >>> >>> *From CliFrontend* >>> >>> 1. CliFrontend receives submission, gathers all configuration and starts >>> a >>> corresponding ClusterDescriptor. >>> >>> 2. ClusterDescriptor deploys a cluster with main class >>> ProgramClusterEntrypoint >>> while shipping resources including user program. >>> >>> 3. ProgramClusterEntrypoint#main contains logic starting components >>> including >>> Standalone Dispatcher, configuring user program to start a >>> RpcClusterClient, >>> and then invoking main method of user program. >>> >>> 4. RpcClusterClient acts like MiniClusterClient which is able to submit >>> the >>> JobGraph after leader elected so that we don't fallback to round-robin or >>> fail submission due to no leader. >>> >>> 5. Whether or not deliver job result depends on user program logic, >>> since we >>> can already get a JobClient from execute. ProgramClusterEntrypoint exits >>> on >>> user program exits and all jobs submitted globally terminate. >>> >>> This way fits in the direction of FLIP-73 because strategy starting a >>> RpcClusterClient can be regarded as a special Executor. After >>> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes >>> configuration to >>> user program so that when Executor generated, it knows to use a >>> RpcClusterClient >>> for submission and the address of Dispatcher. >>> >>> **Compatibility** >>> >>> In my opinion this mode can be totally an add-on to current codebase. We >>> actually don't replace current per-job mode with so-called "per-program" >>> mode. >>> It happens that current per-job mode would be useless if we have such >>> "per-program" mode so that we possibly deprecate it for preferring the >>> other. >>> >>> I'm glad to discuss more into details if you're interested in, but let's >>> say >>> we'd better first reach a consensus on the overall design :-) >>> >>> Looking forward to your reply! >>> >>> Best, >>> tison. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-9953 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> >>> [2] https://issues.apache.org/jira/browse/FLINK-10879 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> >>> [3] >>> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> >>> >>> >>> >>> ------------------------------ >>> >>> Your Personal Data: We may collect and process information about you >>> that may be subject to data protection laws. For more information about how >>> we use and disclose your personal data, how we protect your information, >>> our legal basis to use your information, your rights and who you can >>> contact, please refer to: www.gs.com/privacy-notices >>> >> |
In reply to this post by tison
Hi all,
Firstly thanks @tison for bring this up and strongly +1 for the overall design. I’d like to add one more example of "multiple jobs in one program" with what I’m currently working on. I’m trying to run a TPC-DS benchmark testing (including tens of sql query job) on Flink and sufferring a lot from maintaining the client because I can’t run this program in per-job mode and have to make the client attached. Back to our discussion, I can see now there is a divergence of compiling the job graph between in client and in #ClusterEntrypoint. And up and downsides exist in either way. As for the opt-in solution, I have a question, what if the user chooses detach mode, compiling in the client and runs a multi-job program at the same time? And it still not gonna work. Besides, by adding an compiling option, we need to consider more things when submitting a job like "Is my program including multiple job?" or "Does the program need to be initialized before submitting to a remote cluster?", which looks a bit complicated and confusing to me. By summarizing, I'll vote for the per-program new concept but I may not prefer the opt-in option mentioned in the mailing list or maybe we need to reconsider a better concept and definition which is easy to understand. Best, Jiayi Liao Original Message Sender: Rong Rong<[hidden email]> Recipient: Regina" <[hidden email]> Cc: Theo Diefenthal<[hidden email]>; [hidden email]<[hidden email]> Date: Friday, Nov 1, 2019 11:01 Subject: Re: [DISCUSS] Semantic and implementation of per-job mode Hi All, Thanks @Tison for starting the discussion and I think we have very similar scenario with Theo's use cases. In our case we also generates the job graph using a client service (which serves multiple job graph generation from multiple user code) and we've found that managing the upload/download between the cluster and the DFS to be trick and error-prone. In addition, the management of different environment and requirement from different user in a single service posts even more trouble for us. However, shifting the job graph generation towards the cluster side also requires some thoughts regarding how to manage the driver-job as well as some dependencies conflicts - In the case for shipping the job graph generation to the cluster, some unnecessary dependencies for the runtime will be pulled in by the driver-job (correct me if I were wrong Theo) I think in general I agree with @Gyula's main point: unless there is a very strong reason, it is better if we put the driver-mode as an opt-in (at least at the beginning). I left some comments on the document as well. Please kindly take a look. Thanks, Rong On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina <[hidden email]> wrote: Yeah just chiming in this conversation as well. We heavily use multiple job graphs to get isolation around retry logic and resource allocation across the job graphs. Putting all these parallel flows into a single graph would mean sharing of TaskManagers across what was meant to be truly independent. We also build our job graphs dynamically based off of the state of the world at the start of the job. While we’ve had a share of the pain described, my understanding is that there would be a tradeoff in number of jobs being submitted to the cluster and corresponding resource allocation requests. In the model with multiple jobs in a program, there’s at least the opportunity to reuse idle taskmanagers. From: Theo Diefenthal <[hidden email]> Sent: Thursday, October 31, 2019 10:56 AM To: [hidden email] Subject: Re: [DISCUSS] Semantic and implementation of per-job mode I agree with Gyula Fora, In our case, we have a client-machine in the middle between our YARN cluster and some backend services, which can not be reached directly from the cluster nodes. On application startup, we connect to some external systems, get some information crucial for the job runtime and finally build up the job graph to be committed. It is true that we could workaround this, but it would be pretty annoying to connect to the remote services, collect the data, upload it to HDFS, start the job and make sure, housekeeping of those files is also done at some later time. The current behavior also corresponds to the behavior of Sparks driver mode, which made the transition from Spark to Flink easier for us. But I see the point, especially in terms of Kubernetes and would thus also vote for an opt-in solution, being the client compilation the default and having an option for the per-program mode as well. Best regards Von: "Flavio Pompermaier" <[hidden email]> An: "Yang Wang" <[hidden email]> CC: "tison" <[hidden email]>, "Newport, Billy" <[hidden email]>, "Paul Lam" <[hidden email]>, "SHI Xiaogang" <[hidden email]>, "dev" <[hidden email]>, "user" <[hidden email]> Gesendet: Donnerstag, 31. Oktober 2019 10:45:36 Betreff: Re: [DISCUSS] Semantic and implementation of per-job mode Hi all, we're using a lot the multiple jobs in one program and this is why: when you fetch data from a huge number of sources and, for each source, you do some transformation and then you want to write into a single directory the union of all outputs (this assumes you're doing batch). When the number of sources is large, if you want to do this in a single job, the graph becomes very big and this is a problem for several reasons: too many substasks /threadsi per slot increase of back pressure if a single "sub-job" fails all the job fails..this is very annoying if this happens after a half a day for example In our use case, the big-graph mode takes much longer than running each job separately (but maybe this is true only if you don't have much hardware resources) debugging the cause of a fail could become a daunting task if the job graph is too large we faced may strange errors when trying to run the single big-job mode (due to serialization corruption) So, summarizing our overall experience with Flink batch is: the easier is the job graph the better! Best, Flavio On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <[hidden email]> wrote: Thanks for tison starting this exciting discussion. We also suffer a lot from the per job mode. I think the per-job cluster is a dedicated cluster for only one job and will not accept more other jobs. It has the advantage of one-step submission, do not need to start dispatcher first and then submit the job. And it does not matter where the job graph is generated and job is submitted. Now we have two cases. (1) Current Yarn detached cluster. The job graph is generated in client and then use distributed cache to flink master container. And the MiniDispatcher uses `FileJobGraphRetrieve` to get it. The job will be submitted at flink master side. (2) Standalone per job cluster. User jars are already built into image. So the job graph will be generated at flink master side and `ClasspathJobGraphRetriver` is used to get it. The job will also be submitted at flink master side. For the (1) and (2), only one job in user program could be supported. The per job means per job-graph, so it works just as expected. Tison suggests to add a new mode "per-program”. The user jar will be transferred to flink master container, and a local client will be started to generate job graph and submit job. I think it could cover all the functionality of current per job, both (1) and (2). Also the detach mode and attach mode could be unified. We do not need to start a session cluster to simulate per job for multiple parts. I am in favor of the “per-program” mode. Just two concerns. 1. How many users are using multiple jobs in one program? 2. Why do not always use session cluster to simulate per job? Maybe one-step submission is a convincing reason. Best, Yang tison <[hidden email]> 于2019年10月31日周四 上午9:18写道: Thanks for your attentions! @[hidden email] Yes correct. We try to avoid jobs affect one another. Also a local ClusterClient in case saves the overhead about retry before leader elected and persist JobGraph before submission in RestClusterClient as well as the net cost. @Paul Lam 1. Here is already a note[1] about multiple part jobs. I am also confused a bit on this concept at first :-) Things go in similar way if you program contains the only JobGraph so that I think per-program acts like per-job-graph in this case which provides compatibility for many of one job graph program. Besides, we have to respect user program which doesn't with current implementation because we return abruptly when calling env#execute which hijack user control so that they cannot deal with the job result or the future of it. I think this is why we have to add a detach/attach option. 2. For compilation part, I think it could be a workaround that you upload those resources in a commonly known address such as HDFS so that compilation can read from either client or cluster. Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 Newport, Billy <[hidden email]> 于2019年10月30日周三 下午10:41写道: We execute multiple job graphs routinely because we cannot submit a single graph without it blowing up. I believe Regina spoke to this in Berlin during her talk. We instead if we are processing a database ingestion with 200 tables in it, we do a job graph per table rather than a single job graph that does all tables instead. A single job graph can be in the tens of thousands of nodes in our largest cases and we have found flink (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently testing 1.9.1 but have not retested the large graph scenario. Billy From: Paul Lam [mailto:[hidden email]] Sent: Wednesday, October 30, 2019 8:41 AM To: SHI Xiaogang Cc: tison; dev; user Subject: Re: [DISCUSS] Semantic and implementation of per-job mode Hi, Thanks for starting the discussion. WRT the per-job semantic, it looks natural to me that per-job means per-job-graph, because in my understanding JobGraph is the representation of a job. Could you share some use case in which a user program should contain multiple job graphs? WRT the per-program mode, I’m also in flavor of a unified cluster-side execution for user program, so +1 from my side. But I think there may be some values for the current per-job mode: we now have some common resources available on the client machine that would be read by main methods in user programs. If migrated to per-program mode, we must explicitly set the specific resources for each user program and ship them to the cluster, it would be a bit inconvenient. Also, as the job graph is compiled at the client, we can recognize the errors caused by user code before starting the cluster and easily get access to the logs. Best, Paul Lam 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: Hi Thanks for bringing this. The design looks very nice to me in that 1. In the new per-job mode, we don't need to compile user programs in the client and can directly run user programs with user jars. That way, it's easier for resource isolation in multi-tenant platforms and is much safer. 2. The execution of user programs can be unified in session and per-job modes. In session mode, user jobs are submitted via a remote ClusterClient while in per-job mode user jobs are submitted via a local ClusterClient. Regards, Xiaogang tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: (CC user list because I think users may have ideas on how per-job mode should look like) Hi all, In the discussion about Flink on k8s[1] we encounter a problem that opinions diverge in how so-called per-job mode works. This thread is aimed at stating a dedicated discussion about per-job semantic and how to implement it. **The AS IS per-job mode** * in standalone deployment, we bundle user jar with Flink jar, retrieve JobGraph which is the very first JobGraph from user program in classpath, and then start a Dispatcher with this JobGraph preconfigured, which launches it as "recovered" job. * in YARN deployment, we accept submission via CliFrontend, extract JobGraph which is the very first JobGraph from user program submitted, serialize the JobGraph and upload it to YARN as resource, and then when AM starts, retrieve the JobGraph as resource and start Dispatcher with this JobGraph preconfigured, follows are the same. Specifically, in order to support multiple parts job, if YARN deployment configured as "attached", it starts a SessionCluster, proceeds the progress and shutdown the cluster on job finished. **Motivation** The implementation mentioned above, however, suffers from problems. The major two of them are 1. only respect the very first JobGraph from user program 2. compile job in client side 1. Only respect the very first JobGraph from user program There is already issue about this topic[2]. As we extract JobGraph from user program by hijacking Environment#execute we actually abort any execution after the first call to #execute. Besides it surprises users many times that any logic they write in the program is possibly never executed, here the problem is that the semantic of "job" from Flink perspective. I'd like to say in current implementation "per-job" is actually "per-job-graph". However, in practices since we support jar submission it is "per-program" semantic wanted. 2. Compile job in client side Well, standalone deployment is not in the case. But in YARN deployment, we compile job and get JobGraph in client side, and then upload it to YARN. This approach, however, somehow breaks isolation. We have observed that user program contains exception handling logic which call System.exit in main method, which causes a compilation of the job exit the whole client at once. It is a critical problem if we manage multiple Flink job in a unique platform. In this case, it shut down the whole service. Besides there are many times I was asked why per-job mode doesn't run "just like" session mode but with a dedicated cluster. It might imply that current implementation mismatches users' demand. **Proposal** In order to provide a "per-program" semantic mode which acts "just like" session mode but with a dedicated cluster, I propose a workflow as below. It acts like starting a drive on cluster but is not a general driver solution as proposed here[3], the main purpose of the workflow below is for providing a "per-program" semantic mode. *From CliFrontend* 1. CliFrontend receives submission, gathers all configuration and starts a corresponding ClusterDescriptor. 2. ClusterDescriptor deploys a cluster with main class ProgramClusterEntrypoint while shipping resources including user program. 3. ProgramClusterEntrypoint#main contains logic starting components including Standalone Dispatcher, configuring user program to start a RpcClusterClient, and then invoking main method of user program. 4. RpcClusterClient acts like MiniClusterClient which is able to submit the JobGraph after leader elected so that we don't fallback to round-robin or fail submission due to no leader. 5. Whether or not deliver job result depends on user program logic, since we can already get a JobClient from execute. ProgramClusterEntrypoint exits on user program exits and all jobs submitted globally terminate. This way fits in the direction of FLIP-73 because strategy starting a RpcClusterClient can be regarded as a special Executor. After ProgramClusterEntrypoint#main starts a Cluster, it generates and passes configuration to user program so that when Executor generated, it knows to use a RpcClusterClient for submission and the address of Dispatcher. **Compatibility** In my opinion this mode can be totally an add-on to current codebase. We actually don't replace current per-job mode with so-called "per-program" mode. It happens that current per-job mode would be useless if we have such "per-program" mode so that we possibly deprecate it for preferring the other. I'm glad to discuss more into details if you're interested in, but let's say we'd better first reach a consensus on the overall design :-) Looking forward to your reply! Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-9953 [2] https://issues.apache.org/jira/browse/FLINK-10879 [3] https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices |
Hi Tison and Community,
Thanks for bringing it up. Actually, we meet a similar bottleneck of using per cluster mode. Our team built a service for deploying and operating Flink jobs. The service sits in front of yarn clusters. To submit different job jars, we need to download client jar into the service and generate a job graph which is time-consuming. Thus, we find an idea of Delayed Job Graph to make the job graph generation in ClusterEntryPoint rather than on the client-side. Compare to your proposal, it is more lightweight, and it is an option for existing per job mode. But it is not a solution for handling multiple job graph within a program. I am looking forward to more comments on the proposal, and also definitely cooperation on this effort. I hope both of our pain points can be resolved and contribute back to the community. https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t Best Regards Peter Huang On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy <[hidden email]> wrote: > Hi all, > > > Firstly thanks @tison for bring this up and strongly +1 for the overall > design. > > > I’d like to add one more example of "multiple jobs in one program" with > what I’m currently working on. I’m trying to run a TPC-DS benchmark testing > (including tens of sql query job) on Flink and sufferring a lot from > maintaining the client because I can’t run this program in per-job mode and > have to make the client attached. > > > Back to our discussion, I can see now there is a divergence of compiling > the job graph between in client and in #ClusterEntrypoint. And up and > downsides exist in either way. As for the opt-in solution, I have a > question, what if the user chooses detach mode, compiling in the client and > runs a multi-job program at the same time? And it still not gonna work. > > Besides, by adding an compiling option, we need to consider more things > when submitting a job like "Is my program including multiple job?" or "Does > the program need to be initialized before submitting to a remote cluster?", > which looks a bit complicated and confusing to me. > > > By summarizing, I'll vote for the per-program new concept but I may not > prefer the opt-in option mentioned in the mailing list or maybe we need to > reconsider a better concept and definition which is easy to understand. > > > > Best, > > Jiayi Liao > > Original Message > *Sender:* Rong Rong<[hidden email]> > *Recipient:* Regina" <[hidden email]> > *Cc:* Theo Diefenthal<[hidden email]>; > [hidden email]<[hidden email]> > *Date:* Friday, Nov 1, 2019 11:01 > *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode > > Hi All, > > Thanks @Tison for starting the discussion and I think we have very similar > scenario with Theo's use cases. > In our case we also generates the job graph using a client service (which > serves multiple job graph generation from multiple user code) and we've > found that managing the upload/download between the cluster and the DFS to > be trick and error-prone. In addition, the management of different > environment and requirement from different user in a single service posts > even more trouble for us. > > However, shifting the job graph generation towards the cluster side also > requires some thoughts regarding how to manage the driver-job as well as > some dependencies conflicts - In the case for shipping the job graph > generation to the cluster, some unnecessary dependencies for the runtime > will be pulled in by the driver-job (correct me if I were wrong Theo) > > I think in general I agree with @Gyula's main point: unless there is a > very strong reason, it is better if we put the driver-mode as an opt-in (at > least at the beginning). > I left some comments on the document as well. Please kindly take a look. > > Thanks, > Rong > > On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina <[hidden email]> wrote: > >> Yeah just chiming in this conversation as well. We heavily use multiple >> job graphs to get isolation around retry logic and resource allocation >> across the job graphs. Putting all these parallel flows into a single graph >> would mean sharing of TaskManagers across what was meant to be truly >> independent. >> >> >> >> We also build our job graphs dynamically based off of the state of the >> world at the start of the job. While we’ve had a share of the pain >> described, my understanding is that there would be a tradeoff in number of >> jobs being submitted to the cluster and corresponding resource allocation >> requests. In the model with multiple jobs in a program, there’s at least >> the opportunity to reuse idle taskmanagers. >> >> >> >> >> >> >> >> >> >> *From:* Theo Diefenthal <[hidden email]> >> *Sent:* Thursday, October 31, 2019 10:56 AM >> *To:* [hidden email] >> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >> >> >> >> I agree with Gyula Fora, >> >> >> >> In our case, we have a client-machine in the middle between our YARN >> cluster and some backend services, which can not be reached directly from >> the cluster nodes. On application startup, we connect to some external >> systems, get some information crucial for the job runtime and finally build >> up the job graph to be committed. >> >> >> >> It is true that we could workaround this, but it would be pretty annoying >> to connect to the remote services, collect the data, upload it to HDFS, >> start the job and make sure, housekeeping of those files is also done at >> some later time. >> >> >> >> The current behavior also corresponds to the behavior of Sparks driver >> mode, which made the transition from Spark to Flink easier for us. >> >> >> >> But I see the point, especially in terms of Kubernetes and would thus >> also vote for an opt-in solution, being the client compilation the default >> and having an option for the per-program mode as well. >> >> >> >> Best regards >> >> >> ------------------------------ >> >> *Von: *"Flavio Pompermaier" <[hidden email]> >> *An: *"Yang Wang" <[hidden email]> >> *CC: *"tison" <[hidden email]>, "Newport, Billy" < >> [hidden email]>, "Paul Lam" <[hidden email]>, "SHI >> Xiaogang" <[hidden email]>, "dev" <[hidden email]>, "user" >> <[hidden email]> >> *Gesendet: *Donnerstag, 31. Oktober 2019 10:45:36 >> *Betreff: *Re: [DISCUSS] Semantic and implementation of per-job mode >> >> >> >> Hi all, >> >> we're using a lot the multiple jobs in one program and this is why: when >> you fetch data from a huge number of sources and, for each source, you do >> some transformation and then you want to write into a single directory the >> union of all outputs (this assumes you're doing batch). When the number of >> sources is large, if you want to do this in a single job, the graph becomes >> very big and this is a problem for several reasons: >> >> - too many substasks /threadsi per slot >> - increase of back pressure >> - if a single "sub-job" fails all the job fails..this is very >> annoying if this happens after a half a day for example >> - In our use case, the big-graph mode takes much longer than running >> each job separately (but maybe this is true only if you don't have much >> hardware resources) >> - debugging the cause of a fail could become a daunting task if the >> job graph is too large >> - we faced may strange errors when trying to run the single big-job >> mode (due to serialization corruption) >> >> So, summarizing our overall experience with Flink batch is: the easier is >> the job graph the better! >> >> >> >> Best, >> >> Flavio >> >> >> >> >> >> On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <[hidden email]> wrote: >> >> Thanks for tison starting this exciting discussion. We also suffer a lot >> from the per job mode. >> >> I think the per-job cluster is a dedicated cluster for only one job and >> will not accept more other >> >> jobs. It has the advantage of one-step submission, do not need to start >> dispatcher first and >> >> then submit the job. And it does not matter where the job graph is >> generated and job is submitted. >> >> Now we have two cases. >> >> >> (1) Current Yarn detached cluster. The job graph is generated in client >> and then use distributed >> >> cache to flink master container. And the MiniDispatcher uses >> `FileJobGraphRetrieve` to get it. >> >> The job will be submitted at flink master side. >> >> >> (2) Standalone per job cluster. User jars are already built into image. >> So the job graph will be >> >> generated at flink master side and `ClasspathJobGraphRetriver` is used to >> get it. The job will >> >> also be submitted at flink master side. >> >> >> For the (1) and (2), only one job in user program could be supported. The >> per job means >> >> per job-graph, so it works just as expected. >> >> >> >> Tison suggests to add a new mode "per-program”. The user jar will be >> transferred to flink master >> >> container, and a local client will be started to generate job graph and >> submit job. I think it could >> >> cover all the functionality of current per job, both (1) and (2). Also >> the detach mode and attach >> >> mode could be unified. We do not need to start a session cluster to >> simulate per job for multiple parts. >> >> >> I am in favor of the “per-program” mode. Just two concerns. >> 1. How many users are using multiple jobs in one program? >> 2. Why do not always use session cluster to simulate per job? Maybe >> one-step submission >> >> is a convincing reason. >> >> Best, >> >> Yang >> >> >> >> tison <[hidden email]> 于2019年10月31日周四 上午9:18写道: >> >> Thanks for your attentions! >> >> >> >> @[hidden email] <[hidden email]> >> >> >> >> Yes correct. We try to avoid jobs affect one another. Also a local >> ClusterClient >> >> in case saves the overhead about retry before leader elected and persist >> >> JobGraph before submission in RestClusterClient as well as the net cost. >> >> >> >> @Paul Lam <[hidden email]> >> >> >> >> 1. Here is already a note[1] about multiple part jobs. I am also confused >> a bit >> >> on this concept at first :-) Things go in similar way if you program >> contains the >> >> only JobGraph so that I think per-program acts like per-job-graph in this >> case >> >> which provides compatibility for many of one job graph program. >> >> >> >> Besides, we have to respect user program which doesn't with current >> >> implementation because we return abruptly when calling env#execute which >> >> hijack user control so that they cannot deal with the job result or the >> future of >> >> it. I think this is why we have to add a detach/attach option. >> >> >> >> 2. For compilation part, I think it could be a workaround that you upload >> those >> >> resources in a commonly known address such as HDFS so that compilation >> >> can read from either client or cluster. >> >> >> >> Best, >> >> tison. >> >> >> >> [1] >> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14051-3FfocusedCommentId-3D16927430-26page-3Dcom.atlassian.jira.plugin.system.issuetabpanels-253Acomment-2Dtabpanel-23comment-2D16927430&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=yc-Yzv-tHE6HrxNokngJS1rc9d43qyH8bA63kBsSj-Y&s=lZq8trXN1U301YYMxXKDXySRlDfsl8ewJNhDkYEegWw&e=> >> >> >> >> >> >> Newport, Billy <[hidden email]> 于2019年10月30日周三 下午10:41写道: >> >> We execute multiple job graphs routinely because we cannot submit a >> single graph without it blowing up. I believe Regina spoke to this in >> Berlin during her talk. We instead if we are processing a database >> ingestion with 200 tables in it, we do a job graph per table rather than a >> single job graph that does all tables instead. A single job graph can be in >> the tens of thousands of nodes in our largest cases and we have found flink >> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently >> testing 1.9.1 but have not retested the large graph scenario. >> >> >> >> Billy >> >> >> >> >> >> *From:* Paul Lam [mailto:[hidden email]] >> *Sent:* Wednesday, October 30, 2019 8:41 AM >> *To:* SHI Xiaogang >> *Cc:* tison; dev; user >> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >> >> >> >> Hi, >> >> >> >> Thanks for starting the discussion. >> >> >> >> WRT the per-job semantic, it looks natural to me that per-job means >> per-job-graph, >> >> because in my understanding JobGraph is the representation of a job. >> Could you >> >> share some use case in which a user program should contain multiple job >> graphs? >> >> >> >> WRT the per-program mode, I’m also in flavor of a unified cluster-side >> execution >> >> for user program, so +1 from my side. >> >> >> >> But I think there may be some values for the current per-job mode: we now >> have >> >> some common resources available on the client machine that would be read >> by main >> >> methods in user programs. If migrated to per-program mode, we must >> explicitly >> >> set the specific resources for each user program and ship them to the >> cluster, >> >> it would be a bit inconvenient. Also, as the job graph is compiled at >> the client, >> >> we can recognize the errors caused by user code before starting the >> cluster >> >> and easily get access to the logs. >> >> >> >> Best, >> >> Paul Lam >> >> >> >> 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: >> >> >> >> Hi >> >> >> >> Thanks for bringing this. >> >> >> >> The design looks very nice to me in that >> >> 1. In the new per-job mode, we don't need to compile user programs in the >> client and can directly run user programs with user jars. That way, it's >> easier for resource isolation in multi-tenant platforms and is much safer. >> >> 2. The execution of user programs can be unified in session and per-job >> modes. In session mode, user jobs are submitted via a remote ClusterClient >> while in per-job mode user jobs are submitted via a local ClusterClient. >> >> >> >> Regards, >> >> Xiaogang >> >> >> >> tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: >> >> (CC user list because I think users may have ideas on how per-job mode >> should look like) >> >> >> >> Hi all, >> >> In the discussion about Flink on k8s[1] we encounter a problem that >> opinions >> diverge in how so-called per-job mode works. This thread is aimed at >> stating >> a dedicated discussion about per-job semantic and how to implement it. >> >> **The AS IS per-job mode** >> >> * in standalone deployment, we bundle user jar with Flink jar, retrieve >> JobGraph which is the very first JobGraph from user program in classpath, >> and then start a Dispatcher with this JobGraph preconfigured, which >> launches it as "recovered" job. >> >> * in YARN deployment, we accept submission via CliFrontend, extract >> JobGraph >> which is the very first JobGraph from user program submitted, serialize >> the JobGraph and upload it to YARN as resource, and then when AM starts, >> retrieve the JobGraph as resource and start Dispatcher with this JobGraph >> preconfigured, follows are the same. >> >> Specifically, in order to support multiple parts job, if YARN deployment >> configured as "attached", it starts a SessionCluster, proceeds the >> progress >> and shutdown the cluster on job finished. >> >> **Motivation** >> >> The implementation mentioned above, however, suffers from problems. The >> major >> two of them are 1. only respect the very first JobGraph from user program >> 2. >> compile job in client side >> >> 1. Only respect the very first JobGraph from user program >> >> There is already issue about this topic[2]. As we extract JobGraph from >> user >> program by hijacking Environment#execute we actually abort any execution >> after the first call to #execute. Besides it surprises users many times >> that >> any logic they write in the program is possibly never executed, here the >> problem is that the semantic of "job" from Flink perspective. I'd like to >> say >> in current implementation "per-job" is actually "per-job-graph". However, >> in practices since we support jar submission it is "per-program" semantic >> wanted. >> >> 2. Compile job in client side >> >> Well, standalone deployment is not in the case. But in YARN deployment, we >> compile job and get JobGraph in client side, and then upload it to YARN. >> This approach, however, somehow breaks isolation. We have observed that >> user >> program contains exception handling logic which call System.exit in main >> method, which causes a compilation of the job exit the whole client at >> once. >> It is a critical problem if we manage multiple Flink job in a unique >> platform. >> In this case, it shut down the whole service. >> >> Besides there are many times I was asked why per-job mode doesn't run >> "just like" session mode but with a dedicated cluster. It might imply that >> current implementation mismatches users' demand. >> >> **Proposal** >> >> In order to provide a "per-program" semantic mode which acts "just like" >> session >> mode but with a dedicated cluster, I propose a workflow as below. It acts >> like >> starting a drive on cluster but is not a general driver solution as >> proposed >> here[3], the main purpose of the workflow below is for providing a >> "per-program" >> semantic mode. >> >> *From CliFrontend* >> >> 1. CliFrontend receives submission, gathers all configuration and starts a >> corresponding ClusterDescriptor. >> >> 2. ClusterDescriptor deploys a cluster with main class >> ProgramClusterEntrypoint >> while shipping resources including user program. >> >> 3. ProgramClusterEntrypoint#main contains logic starting components >> including >> Standalone Dispatcher, configuring user program to start a >> RpcClusterClient, >> and then invoking main method of user program. >> >> 4. RpcClusterClient acts like MiniClusterClient which is able to submit >> the >> JobGraph after leader elected so that we don't fallback to round-robin or >> fail submission due to no leader. >> >> 5. Whether or not deliver job result depends on user program logic, since >> we >> can already get a JobClient from execute. ProgramClusterEntrypoint exits >> on >> user program exits and all jobs submitted globally terminate. >> >> This way fits in the direction of FLIP-73 because strategy starting a >> RpcClusterClient can be regarded as a special Executor. After >> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes >> configuration to >> user program so that when Executor generated, it knows to use a >> RpcClusterClient >> for submission and the address of Dispatcher. >> >> **Compatibility** >> >> In my opinion this mode can be totally an add-on to current codebase. We >> actually don't replace current per-job mode with so-called "per-program" >> mode. >> It happens that current per-job mode would be useless if we have such >> "per-program" mode so that we possibly deprecate it for preferring the >> other. >> >> I'm glad to discuss more into details if you're interested in, but let's >> say >> we'd better first reach a consensus on the overall design :-) >> >> Looking forward to your reply! >> >> Best, >> tison. >> >> [1] https://issues.apache.org/jira/browse/FLINK-9953 >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> >> [2] https://issues.apache.org/jira/browse/FLINK-10879 >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> >> [3] >> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> >> >> >> >> >> ------------------------------ >> >> >> Your Personal Data: We may collect and process information about you that >> may be subject to data protection laws. For more information about how we >> use and disclose your personal data, how we protect your information, our >> legal basis to use your information, your rights and who you can contact, >> please refer to: www.gs.com/privacy-notices >> >> >> >> ------------------------------ >> >> Your Personal Data: We may collect and process information about you that >> may be subject to data protection laws. For more information about how we >> use and disclose your personal data, how we protect your information, our >> legal basis to use your information, your rights and who you can contact, >> please refer to: www.gs.com/privacy-notices >> > |
Hi all,
Thanks for your participation! First of all I have to clarify two confusion in this thread. 1. The proposed "pre-program" mode is definitely a new mode opt-in. It is described in "Compatibility" section of the original email. 2. The documentation linked in the original email "Flink driver" is NOT the proposed design. See also the original paragraph below I'm sorry for link that in the first email which causes further confusion. >In order to provide a "per-program" semantic mode which acts "just like" session >mode but with a dedicated cluster, I propose a workflow as below. It acts like >starting a drive on cluster but is NOT a general driver solution as proposed >here[3], the main purpose of the workflow below is for providing a "per-program" >semantic mode. I'm reading detailedly your ideas and writing reply now :-) Best, tison. Peter Huang <[hidden email]> 于2019年11月1日周五 下午12:47写道: > Hi Tison and Community, > > Thanks for bringing it up. Actually, we meet a similar bottleneck of using > per cluster mode. Our team built a service for deploying and operating > Flink jobs. > The service sits in front of yarn clusters. To submit different job jars, > we need to download client jar into the service and generate a job > graph which is time-consuming. > Thus, we find an idea of Delayed Job Graph to make the job graph > generation in ClusterEntryPoint rather than on the client-side. Compare to > your proposal, it is more lightweight, > and it is an option for existing per job mode. But it is not a solution > for handling multiple job graph within a program. > > I am looking forward to more comments on the proposal, and also definitely > cooperation on this effort. > I hope both of our pain points can be resolved and contribute back to the > community. > > > > https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t > > > Best Regards > Peter Huang > > > > > > > > > > > > > > > > > > On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy <[hidden email]> wrote: > >> Hi all, >> >> >> Firstly thanks @tison for bring this up and strongly +1 for the overall >> design. >> >> >> I’d like to add one more example of "multiple jobs in one program" with >> what I’m currently working on. I’m trying to run a TPC-DS benchmark testing >> (including tens of sql query job) on Flink and sufferring a lot from >> maintaining the client because I can’t run this program in per-job mode and >> have to make the client attached. >> >> >> Back to our discussion, I can see now there is a divergence of compiling >> the job graph between in client and in #ClusterEntrypoint. And up and >> downsides exist in either way. As for the opt-in solution, I have a >> question, what if the user chooses detach mode, compiling in the client and >> runs a multi-job program at the same time? And it still not gonna work. >> >> Besides, by adding an compiling option, we need to consider more things >> when submitting a job like "Is my program including multiple job?" or "Does >> the program need to be initialized before submitting to a remote cluster?", >> which looks a bit complicated and confusing to me. >> >> >> By summarizing, I'll vote for the per-program new concept but I may not >> prefer the opt-in option mentioned in the mailing list or maybe we need to >> reconsider a better concept and definition which is easy to understand. >> >> >> >> Best, >> >> Jiayi Liao >> >> Original Message >> *Sender:* Rong Rong<[hidden email]> >> *Recipient:* Regina" <[hidden email]> >> *Cc:* Theo Diefenthal<[hidden email]>; >> [hidden email]<[hidden email]> >> *Date:* Friday, Nov 1, 2019 11:01 >> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >> >> Hi All, >> >> Thanks @Tison for starting the discussion and I think we have very >> similar scenario with Theo's use cases. >> In our case we also generates the job graph using a client service (which >> serves multiple job graph generation from multiple user code) and we've >> found that managing the upload/download between the cluster and the DFS to >> be trick and error-prone. In addition, the management of different >> environment and requirement from different user in a single service posts >> even more trouble for us. >> >> However, shifting the job graph generation towards the cluster side also >> requires some thoughts regarding how to manage the driver-job as well as >> some dependencies conflicts - In the case for shipping the job graph >> generation to the cluster, some unnecessary dependencies for the runtime >> will be pulled in by the driver-job (correct me if I were wrong Theo) >> >> I think in general I agree with @Gyula's main point: unless there is a >> very strong reason, it is better if we put the driver-mode as an opt-in (at >> least at the beginning). >> I left some comments on the document as well. Please kindly take a look. >> >> Thanks, >> Rong >> >> On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina <[hidden email]> wrote: >> >>> Yeah just chiming in this conversation as well. We heavily use multiple >>> job graphs to get isolation around retry logic and resource allocation >>> across the job graphs. Putting all these parallel flows into a single graph >>> would mean sharing of TaskManagers across what was meant to be truly >>> independent. >>> >>> >>> >>> We also build our job graphs dynamically based off of the state of the >>> world at the start of the job. While we’ve had a share of the pain >>> described, my understanding is that there would be a tradeoff in number of >>> jobs being submitted to the cluster and corresponding resource allocation >>> requests. In the model with multiple jobs in a program, there’s at least >>> the opportunity to reuse idle taskmanagers. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *From:* Theo Diefenthal <[hidden email]> >>> *Sent:* Thursday, October 31, 2019 10:56 AM >>> *To:* [hidden email] >>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >>> >>> >>> >>> I agree with Gyula Fora, >>> >>> >>> >>> In our case, we have a client-machine in the middle between our YARN >>> cluster and some backend services, which can not be reached directly from >>> the cluster nodes. On application startup, we connect to some external >>> systems, get some information crucial for the job runtime and finally build >>> up the job graph to be committed. >>> >>> >>> >>> It is true that we could workaround this, but it would be pretty >>> annoying to connect to the remote services, collect the data, upload it to >>> HDFS, start the job and make sure, housekeeping of those files is also done >>> at some later time. >>> >>> >>> >>> The current behavior also corresponds to the behavior of Sparks driver >>> mode, which made the transition from Spark to Flink easier for us. >>> >>> >>> >>> But I see the point, especially in terms of Kubernetes and would thus >>> also vote for an opt-in solution, being the client compilation the default >>> and having an option for the per-program mode as well. >>> >>> >>> >>> Best regards >>> >>> >>> ------------------------------ >>> >>> *Von: *"Flavio Pompermaier" <[hidden email]> >>> *An: *"Yang Wang" <[hidden email]> >>> *CC: *"tison" <[hidden email]>, "Newport, Billy" < >>> [hidden email]>, "Paul Lam" <[hidden email]>, "SHI >>> Xiaogang" <[hidden email]>, "dev" <[hidden email]>, >>> "user" <[hidden email]> >>> *Gesendet: *Donnerstag, 31. Oktober 2019 10:45:36 >>> *Betreff: *Re: [DISCUSS] Semantic and implementation of per-job mode >>> >>> >>> >>> Hi all, >>> >>> we're using a lot the multiple jobs in one program and this is why: when >>> you fetch data from a huge number of sources and, for each source, you do >>> some transformation and then you want to write into a single directory the >>> union of all outputs (this assumes you're doing batch). When the number of >>> sources is large, if you want to do this in a single job, the graph becomes >>> very big and this is a problem for several reasons: >>> >>> - too many substasks /threadsi per slot >>> - increase of back pressure >>> - if a single "sub-job" fails all the job fails..this is very >>> annoying if this happens after a half a day for example >>> - In our use case, the big-graph mode takes much longer than running >>> each job separately (but maybe this is true only if you don't have much >>> hardware resources) >>> - debugging the cause of a fail could become a daunting task if the >>> job graph is too large >>> - we faced may strange errors when trying to run the single big-job >>> mode (due to serialization corruption) >>> >>> So, summarizing our overall experience with Flink batch is: the easier >>> is the job graph the better! >>> >>> >>> >>> Best, >>> >>> Flavio >>> >>> >>> >>> >>> >>> On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <[hidden email]> >>> wrote: >>> >>> Thanks for tison starting this exciting discussion. We also suffer a lot >>> from the per job mode. >>> >>> I think the per-job cluster is a dedicated cluster for only one job and >>> will not accept more other >>> >>> jobs. It has the advantage of one-step submission, do not need to start >>> dispatcher first and >>> >>> then submit the job. And it does not matter where the job graph is >>> generated and job is submitted. >>> >>> Now we have two cases. >>> >>> >>> (1) Current Yarn detached cluster. The job graph is generated in client >>> and then use distributed >>> >>> cache to flink master container. And the MiniDispatcher uses >>> `FileJobGraphRetrieve` to get it. >>> >>> The job will be submitted at flink master side. >>> >>> >>> (2) Standalone per job cluster. User jars are already built into image. >>> So the job graph will be >>> >>> generated at flink master side and `ClasspathJobGraphRetriver` is used >>> to get it. The job will >>> >>> also be submitted at flink master side. >>> >>> >>> For the (1) and (2), only one job in user program could be supported. >>> The per job means >>> >>> per job-graph, so it works just as expected. >>> >>> >>> >>> Tison suggests to add a new mode "per-program”. The user jar will be >>> transferred to flink master >>> >>> container, and a local client will be started to generate job graph and >>> submit job. I think it could >>> >>> cover all the functionality of current per job, both (1) and (2). Also >>> the detach mode and attach >>> >>> mode could be unified. We do not need to start a session cluster to >>> simulate per job for multiple parts. >>> >>> >>> I am in favor of the “per-program” mode. Just two concerns. >>> 1. How many users are using multiple jobs in one program? >>> 2. Why do not always use session cluster to simulate per job? Maybe >>> one-step submission >>> >>> is a convincing reason. >>> >>> Best, >>> >>> Yang >>> >>> >>> >>> tison <[hidden email]> 于2019年10月31日周四 上午9:18写道: >>> >>> Thanks for your attentions! >>> >>> >>> >>> @[hidden email] <[hidden email]> >>> >>> >>> >>> Yes correct. We try to avoid jobs affect one another. Also a local >>> ClusterClient >>> >>> in case saves the overhead about retry before leader elected and persist >>> >>> JobGraph before submission in RestClusterClient as well as the net cost. >>> >>> >>> >>> @Paul Lam <[hidden email]> >>> >>> >>> >>> 1. Here is already a note[1] about multiple part jobs. I am also >>> confused a bit >>> >>> on this concept at first :-) Things go in similar way if you program >>> contains the >>> >>> only JobGraph so that I think per-program acts like per-job-graph in >>> this case >>> >>> which provides compatibility for many of one job graph program. >>> >>> >>> >>> Besides, we have to respect user program which doesn't with current >>> >>> implementation because we return abruptly when calling env#execute which >>> >>> hijack user control so that they cannot deal with the job result or the >>> future of >>> >>> it. I think this is why we have to add a detach/attach option. >>> >>> >>> >>> 2. For compilation part, I think it could be a workaround that you >>> upload those >>> >>> resources in a commonly known address such as HDFS so that compilation >>> >>> can read from either client or cluster. >>> >>> >>> >>> Best, >>> >>> tison. >>> >>> >>> >>> [1] >>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14051-3FfocusedCommentId-3D16927430-26page-3Dcom.atlassian.jira.plugin.system.issuetabpanels-253Acomment-2Dtabpanel-23comment-2D16927430&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=yc-Yzv-tHE6HrxNokngJS1rc9d43qyH8bA63kBsSj-Y&s=lZq8trXN1U301YYMxXKDXySRlDfsl8ewJNhDkYEegWw&e=> >>> >>> >>> >>> >>> >>> Newport, Billy <[hidden email]> 于2019年10月30日周三 下午10:41写道: >>> >>> We execute multiple job graphs routinely because we cannot submit a >>> single graph without it blowing up. I believe Regina spoke to this in >>> Berlin during her talk. We instead if we are processing a database >>> ingestion with 200 tables in it, we do a job graph per table rather than a >>> single job graph that does all tables instead. A single job graph can be in >>> the tens of thousands of nodes in our largest cases and we have found flink >>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently >>> testing 1.9.1 but have not retested the large graph scenario. >>> >>> >>> >>> Billy >>> >>> >>> >>> >>> >>> *From:* Paul Lam [mailto:[hidden email]] >>> *Sent:* Wednesday, October 30, 2019 8:41 AM >>> *To:* SHI Xiaogang >>> *Cc:* tison; dev; user >>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >>> >>> >>> >>> Hi, >>> >>> >>> >>> Thanks for starting the discussion. >>> >>> >>> >>> WRT the per-job semantic, it looks natural to me that per-job means >>> per-job-graph, >>> >>> because in my understanding JobGraph is the representation of a job. >>> Could you >>> >>> share some use case in which a user program should contain multiple job >>> graphs? >>> >>> >>> >>> WRT the per-program mode, I’m also in flavor of a unified cluster-side >>> execution >>> >>> for user program, so +1 from my side. >>> >>> >>> >>> But I think there may be some values for the current per-job mode: we >>> now have >>> >>> some common resources available on the client machine that would be read >>> by main >>> >>> methods in user programs. If migrated to per-program mode, we must >>> explicitly >>> >>> set the specific resources for each user program and ship them to the >>> cluster, >>> >>> it would be a bit inconvenient. Also, as the job graph is compiled at >>> the client, >>> >>> we can recognize the errors caused by user code before starting the >>> cluster >>> >>> and easily get access to the logs. >>> >>> >>> >>> Best, >>> >>> Paul Lam >>> >>> >>> >>> 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: >>> >>> >>> >>> Hi >>> >>> >>> >>> Thanks for bringing this. >>> >>> >>> >>> The design looks very nice to me in that >>> >>> 1. In the new per-job mode, we don't need to compile user programs in >>> the client and can directly run user programs with user jars. That way, >>> it's easier for resource isolation in multi-tenant platforms and is much >>> safer. >>> >>> 2. The execution of user programs can be unified in session and per-job >>> modes. In session mode, user jobs are submitted via a remote ClusterClient >>> while in per-job mode user jobs are submitted via a local ClusterClient. >>> >>> >>> >>> Regards, >>> >>> Xiaogang >>> >>> >>> >>> tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: >>> >>> (CC user list because I think users may have ideas on how per-job mode >>> should look like) >>> >>> >>> >>> Hi all, >>> >>> In the discussion about Flink on k8s[1] we encounter a problem that >>> opinions >>> diverge in how so-called per-job mode works. This thread is aimed at >>> stating >>> a dedicated discussion about per-job semantic and how to implement it. >>> >>> **The AS IS per-job mode** >>> >>> * in standalone deployment, we bundle user jar with Flink jar, retrieve >>> JobGraph which is the very first JobGraph from user program in classpath, >>> and then start a Dispatcher with this JobGraph preconfigured, which >>> launches it as "recovered" job. >>> >>> * in YARN deployment, we accept submission via CliFrontend, extract >>> JobGraph >>> which is the very first JobGraph from user program submitted, serialize >>> the JobGraph and upload it to YARN as resource, and then when AM starts, >>> retrieve the JobGraph as resource and start Dispatcher with this JobGraph >>> preconfigured, follows are the same. >>> >>> Specifically, in order to support multiple parts job, if YARN deployment >>> configured as "attached", it starts a SessionCluster, proceeds the >>> progress >>> and shutdown the cluster on job finished. >>> >>> **Motivation** >>> >>> The implementation mentioned above, however, suffers from problems. The >>> major >>> two of them are 1. only respect the very first JobGraph from user >>> program 2. >>> compile job in client side >>> >>> 1. Only respect the very first JobGraph from user program >>> >>> There is already issue about this topic[2]. As we extract JobGraph from >>> user >>> program by hijacking Environment#execute we actually abort any execution >>> after the first call to #execute. Besides it surprises users many times >>> that >>> any logic they write in the program is possibly never executed, here the >>> problem is that the semantic of "job" from Flink perspective. I'd like >>> to say >>> in current implementation "per-job" is actually "per-job-graph". However, >>> in practices since we support jar submission it is "per-program" semantic >>> wanted. >>> >>> 2. Compile job in client side >>> >>> Well, standalone deployment is not in the case. But in YARN deployment, >>> we >>> compile job and get JobGraph in client side, and then upload it to YARN. >>> This approach, however, somehow breaks isolation. We have observed that >>> user >>> program contains exception handling logic which call System.exit in main >>> method, which causes a compilation of the job exit the whole client at >>> once. >>> It is a critical problem if we manage multiple Flink job in a unique >>> platform. >>> In this case, it shut down the whole service. >>> >>> Besides there are many times I was asked why per-job mode doesn't run >>> "just like" session mode but with a dedicated cluster. It might imply >>> that >>> current implementation mismatches users' demand. >>> >>> **Proposal** >>> >>> In order to provide a "per-program" semantic mode which acts "just like" >>> session >>> mode but with a dedicated cluster, I propose a workflow as below. It >>> acts like >>> starting a drive on cluster but is not a general driver solution as >>> proposed >>> here[3], the main purpose of the workflow below is for providing a >>> "per-program" >>> semantic mode. >>> >>> *From CliFrontend* >>> >>> 1. CliFrontend receives submission, gathers all configuration and starts >>> a >>> corresponding ClusterDescriptor. >>> >>> 2. ClusterDescriptor deploys a cluster with main class >>> ProgramClusterEntrypoint >>> while shipping resources including user program. >>> >>> 3. ProgramClusterEntrypoint#main contains logic starting components >>> including >>> Standalone Dispatcher, configuring user program to start a >>> RpcClusterClient, >>> and then invoking main method of user program. >>> >>> 4. RpcClusterClient acts like MiniClusterClient which is able to submit >>> the >>> JobGraph after leader elected so that we don't fallback to round-robin or >>> fail submission due to no leader. >>> >>> 5. Whether or not deliver job result depends on user program logic, >>> since we >>> can already get a JobClient from execute. ProgramClusterEntrypoint exits >>> on >>> user program exits and all jobs submitted globally terminate. >>> >>> This way fits in the direction of FLIP-73 because strategy starting a >>> RpcClusterClient can be regarded as a special Executor. After >>> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes >>> configuration to >>> user program so that when Executor generated, it knows to use a >>> RpcClusterClient >>> for submission and the address of Dispatcher. >>> >>> **Compatibility** >>> >>> In my opinion this mode can be totally an add-on to current codebase. We >>> actually don't replace current per-job mode with so-called "per-program" >>> mode. >>> It happens that current per-job mode would be useless if we have such >>> "per-program" mode so that we possibly deprecate it for preferring the >>> other. >>> >>> I'm glad to discuss more into details if you're interested in, but let's >>> say >>> we'd better first reach a consensus on the overall design :-) >>> >>> Looking forward to your reply! >>> >>> Best, >>> tison. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-9953 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> >>> [2] https://issues.apache.org/jira/browse/FLINK-10879 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> >>> [3] >>> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> >>> >>> >>> >>> >>> ------------------------------ >>> >>> >>> Your Personal Data: We may collect and process information about you >>> that may be subject to data protection laws. For more information about how >>> we use and disclose your personal data, how we protect your information, >>> our legal basis to use your information, your rights and who you can >>> contact, please refer to: www.gs.com/privacy-notices >>> >>> >>> >>> ------------------------------ >>> >>> Your Personal Data: We may collect and process information about you >>> that may be subject to data protection laws. For more information about how >>> we use and disclose your personal data, how we protect your information, >>> our legal basis to use your information, your rights and who you can >>> contact, please refer to: www.gs.com/privacy-notices >>> >> |
Hi Peter,
I checked out you proposal FLIP-85 and think that we are in the very similar direction. For any reason in your proposal we can create PackagedProgram in server side and thus if we can configure environment properly we can directly invoke main method. In addition to your design document, in fact PackagedProgram is unnecessary to be a class in flink-client. With related Exceptions move to flink-runtime it can be a flink-runtime concept now. And thus we don't suffer from dependency conflict actually. Best, tison. tison <[hidden email]> 于2019年11月1日周五 下午2:17写道: > Hi all, > > Thanks for your participation! First of all I have to clarify two > confusion in this thread. > > 1. The proposed "pre-program" mode is definitely a new mode opt-in. It is > described in > "Compatibility" section of the original email. > > 2. The documentation linked in the original email "Flink driver" is NOT > the proposed design. > See also the original paragraph below I'm sorry for link that in the first > email which causes > further confusion. > > >In order to provide a "per-program" semantic mode which acts "just like" > session > >mode but with a dedicated cluster, I propose a workflow as below. It acts > like > >starting a drive on cluster but is NOT a general driver solution as > proposed > >here[3], the main purpose of the workflow below is for providing a > "per-program" > >semantic mode. > > I'm reading detailedly your ideas and writing reply now :-) > > Best, > tison. > > > Peter Huang <[hidden email]> 于2019年11月1日周五 下午12:47写道: > >> Hi Tison and Community, >> >> Thanks for bringing it up. Actually, we meet a similar bottleneck of >> using per cluster mode. Our team built a service for deploying and >> operating Flink jobs. >> The service sits in front of yarn clusters. To submit different job jars, >> we need to download client jar into the service and generate a job >> graph which is time-consuming. >> Thus, we find an idea of Delayed Job Graph to make the job graph >> generation in ClusterEntryPoint rather than on the client-side. Compare to >> your proposal, it is more lightweight, >> and it is an option for existing per job mode. But it is not a solution >> for handling multiple job graph within a program. >> >> I am looking forward to more comments on the proposal, and also >> definitely cooperation on this effort. >> I hope both of our pain points can be resolved and contribute back to the >> community. >> >> >> >> https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t >> >> >> Best Regards >> Peter Huang >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy <[hidden email]> wrote: >> >>> Hi all, >>> >>> >>> Firstly thanks @tison for bring this up and strongly +1 for the overall >>> design. >>> >>> >>> I’d like to add one more example of "multiple jobs in one program" with >>> what I’m currently working on. I’m trying to run a TPC-DS benchmark testing >>> (including tens of sql query job) on Flink and sufferring a lot from >>> maintaining the client because I can’t run this program in per-job mode and >>> have to make the client attached. >>> >>> >>> Back to our discussion, I can see now there is a divergence of compiling >>> the job graph between in client and in #ClusterEntrypoint. And up and >>> downsides exist in either way. As for the opt-in solution, I have a >>> question, what if the user chooses detach mode, compiling in the client and >>> runs a multi-job program at the same time? And it still not gonna work. >>> >>> Besides, by adding an compiling option, we need to consider more things >>> when submitting a job like "Is my program including multiple job?" or "Does >>> the program need to be initialized before submitting to a remote cluster?", >>> which looks a bit complicated and confusing to me. >>> >>> >>> By summarizing, I'll vote for the per-program new concept but I may not >>> prefer the opt-in option mentioned in the mailing list or maybe we need to >>> reconsider a better concept and definition which is easy to understand. >>> >>> >>> >>> Best, >>> >>> Jiayi Liao >>> >>> Original Message >>> *Sender:* Rong Rong<[hidden email]> >>> *Recipient:* Regina" <[hidden email]> >>> *Cc:* Theo Diefenthal<[hidden email]>; >>> [hidden email]<[hidden email]> >>> *Date:* Friday, Nov 1, 2019 11:01 >>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >>> >>> Hi All, >>> >>> Thanks @Tison for starting the discussion and I think we have very >>> similar scenario with Theo's use cases. >>> In our case we also generates the job graph using a client service >>> (which serves multiple job graph generation from multiple user code) and >>> we've found that managing the upload/download between the cluster and the >>> DFS to be trick and error-prone. In addition, the management of different >>> environment and requirement from different user in a single service posts >>> even more trouble for us. >>> >>> However, shifting the job graph generation towards the cluster side also >>> requires some thoughts regarding how to manage the driver-job as well as >>> some dependencies conflicts - In the case for shipping the job graph >>> generation to the cluster, some unnecessary dependencies for the runtime >>> will be pulled in by the driver-job (correct me if I were wrong Theo) >>> >>> I think in general I agree with @Gyula's main point: unless there is a >>> very strong reason, it is better if we put the driver-mode as an opt-in (at >>> least at the beginning). >>> I left some comments on the document as well. Please kindly take a look. >>> >>> Thanks, >>> Rong >>> >>> On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina <[hidden email]> wrote: >>> >>>> Yeah just chiming in this conversation as well. We heavily use multiple >>>> job graphs to get isolation around retry logic and resource allocation >>>> across the job graphs. Putting all these parallel flows into a single graph >>>> would mean sharing of TaskManagers across what was meant to be truly >>>> independent. >>>> >>>> >>>> >>>> We also build our job graphs dynamically based off of the state of the >>>> world at the start of the job. While we’ve had a share of the pain >>>> described, my understanding is that there would be a tradeoff in number of >>>> jobs being submitted to the cluster and corresponding resource allocation >>>> requests. In the model with multiple jobs in a program, there’s at least >>>> the opportunity to reuse idle taskmanagers. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *From:* Theo Diefenthal <[hidden email]> >>>> *Sent:* Thursday, October 31, 2019 10:56 AM >>>> *To:* [hidden email] >>>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >>>> >>>> >>>> >>>> I agree with Gyula Fora, >>>> >>>> >>>> >>>> In our case, we have a client-machine in the middle between our YARN >>>> cluster and some backend services, which can not be reached directly from >>>> the cluster nodes. On application startup, we connect to some external >>>> systems, get some information crucial for the job runtime and finally build >>>> up the job graph to be committed. >>>> >>>> >>>> >>>> It is true that we could workaround this, but it would be pretty >>>> annoying to connect to the remote services, collect the data, upload it to >>>> HDFS, start the job and make sure, housekeeping of those files is also done >>>> at some later time. >>>> >>>> >>>> >>>> The current behavior also corresponds to the behavior of Sparks driver >>>> mode, which made the transition from Spark to Flink easier for us. >>>> >>>> >>>> >>>> But I see the point, especially in terms of Kubernetes and would thus >>>> also vote for an opt-in solution, being the client compilation the default >>>> and having an option for the per-program mode as well. >>>> >>>> >>>> >>>> Best regards >>>> >>>> >>>> ------------------------------ >>>> >>>> *Von: *"Flavio Pompermaier" <[hidden email]> >>>> *An: *"Yang Wang" <[hidden email]> >>>> *CC: *"tison" <[hidden email]>, "Newport, Billy" < >>>> [hidden email]>, "Paul Lam" <[hidden email]>, "SHI >>>> Xiaogang" <[hidden email]>, "dev" <[hidden email]>, >>>> "user" <[hidden email]> >>>> *Gesendet: *Donnerstag, 31. Oktober 2019 10:45:36 >>>> *Betreff: *Re: [DISCUSS] Semantic and implementation of per-job mode >>>> >>>> >>>> >>>> Hi all, >>>> >>>> we're using a lot the multiple jobs in one program and this is why: >>>> when you fetch data from a huge number of sources and, for each source, you >>>> do some transformation and then you want to write into a single directory >>>> the union of all outputs (this assumes you're doing batch). When the number >>>> of sources is large, if you want to do this in a single job, the graph >>>> becomes very big and this is a problem for several reasons: >>>> >>>> - too many substasks /threadsi per slot >>>> - increase of back pressure >>>> - if a single "sub-job" fails all the job fails..this is very >>>> annoying if this happens after a half a day for example >>>> - In our use case, the big-graph mode takes much longer than >>>> running each job separately (but maybe this is true only if you don't have >>>> much hardware resources) >>>> - debugging the cause of a fail could become a daunting task if the >>>> job graph is too large >>>> - we faced may strange errors when trying to run the single big-job >>>> mode (due to serialization corruption) >>>> >>>> So, summarizing our overall experience with Flink batch is: the easier >>>> is the job graph the better! >>>> >>>> >>>> >>>> Best, >>>> >>>> Flavio >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <[hidden email]> >>>> wrote: >>>> >>>> Thanks for tison starting this exciting discussion. We also suffer a >>>> lot from the per job mode. >>>> >>>> I think the per-job cluster is a dedicated cluster for only one job and >>>> will not accept more other >>>> >>>> jobs. It has the advantage of one-step submission, do not need to start >>>> dispatcher first and >>>> >>>> then submit the job. And it does not matter where the job graph is >>>> generated and job is submitted. >>>> >>>> Now we have two cases. >>>> >>>> >>>> (1) Current Yarn detached cluster. The job graph is generated in client >>>> and then use distributed >>>> >>>> cache to flink master container. And the MiniDispatcher uses >>>> `FileJobGraphRetrieve` to get it. >>>> >>>> The job will be submitted at flink master side. >>>> >>>> >>>> (2) Standalone per job cluster. User jars are already built into image. >>>> So the job graph will be >>>> >>>> generated at flink master side and `ClasspathJobGraphRetriver` is used >>>> to get it. The job will >>>> >>>> also be submitted at flink master side. >>>> >>>> >>>> For the (1) and (2), only one job in user program could be supported. >>>> The per job means >>>> >>>> per job-graph, so it works just as expected. >>>> >>>> >>>> >>>> Tison suggests to add a new mode "per-program”. The user jar will be >>>> transferred to flink master >>>> >>>> container, and a local client will be started to generate job graph and >>>> submit job. I think it could >>>> >>>> cover all the functionality of current per job, both (1) and (2). Also >>>> the detach mode and attach >>>> >>>> mode could be unified. We do not need to start a session cluster to >>>> simulate per job for multiple parts. >>>> >>>> >>>> I am in favor of the “per-program” mode. Just two concerns. >>>> 1. How many users are using multiple jobs in one program? >>>> 2. Why do not always use session cluster to simulate per job? Maybe >>>> one-step submission >>>> >>>> is a convincing reason. >>>> >>>> Best, >>>> >>>> Yang >>>> >>>> >>>> >>>> tison <[hidden email]> 于2019年10月31日周四 上午9:18写道: >>>> >>>> Thanks for your attentions! >>>> >>>> >>>> >>>> @[hidden email] <[hidden email]> >>>> >>>> >>>> >>>> Yes correct. We try to avoid jobs affect one another. Also a local >>>> ClusterClient >>>> >>>> in case saves the overhead about retry before leader elected and persist >>>> >>>> JobGraph before submission in RestClusterClient as well as the net cost. >>>> >>>> >>>> >>>> @Paul Lam <[hidden email]> >>>> >>>> >>>> >>>> 1. Here is already a note[1] about multiple part jobs. I am also >>>> confused a bit >>>> >>>> on this concept at first :-) Things go in similar way if you program >>>> contains the >>>> >>>> only JobGraph so that I think per-program acts like per-job-graph in >>>> this case >>>> >>>> which provides compatibility for many of one job graph program. >>>> >>>> >>>> >>>> Besides, we have to respect user program which doesn't with current >>>> >>>> implementation because we return abruptly when calling env#execute which >>>> >>>> hijack user control so that they cannot deal with the job result or the >>>> future of >>>> >>>> it. I think this is why we have to add a detach/attach option. >>>> >>>> >>>> >>>> 2. For compilation part, I think it could be a workaround that you >>>> upload those >>>> >>>> resources in a commonly known address such as HDFS so that compilation >>>> >>>> can read from either client or cluster. >>>> >>>> >>>> >>>> Best, >>>> >>>> tison. >>>> >>>> >>>> >>>> [1] >>>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 >>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14051-3FfocusedCommentId-3D16927430-26page-3Dcom.atlassian.jira.plugin.system.issuetabpanels-253Acomment-2Dtabpanel-23comment-2D16927430&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=yc-Yzv-tHE6HrxNokngJS1rc9d43qyH8bA63kBsSj-Y&s=lZq8trXN1U301YYMxXKDXySRlDfsl8ewJNhDkYEegWw&e=> >>>> >>>> >>>> >>>> >>>> >>>> Newport, Billy <[hidden email]> 于2019年10月30日周三 下午10:41写道: >>>> >>>> We execute multiple job graphs routinely because we cannot submit a >>>> single graph without it blowing up. I believe Regina spoke to this in >>>> Berlin during her talk. We instead if we are processing a database >>>> ingestion with 200 tables in it, we do a job graph per table rather than a >>>> single job graph that does all tables instead. A single job graph can be in >>>> the tens of thousands of nodes in our largest cases and we have found flink >>>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently >>>> testing 1.9.1 but have not retested the large graph scenario. >>>> >>>> >>>> >>>> Billy >>>> >>>> >>>> >>>> >>>> >>>> *From:* Paul Lam [mailto:[hidden email]] >>>> *Sent:* Wednesday, October 30, 2019 8:41 AM >>>> *To:* SHI Xiaogang >>>> *Cc:* tison; dev; user >>>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >>>> >>>> >>>> >>>> Hi, >>>> >>>> >>>> >>>> Thanks for starting the discussion. >>>> >>>> >>>> >>>> WRT the per-job semantic, it looks natural to me that per-job means >>>> per-job-graph, >>>> >>>> because in my understanding JobGraph is the representation of a job. >>>> Could you >>>> >>>> share some use case in which a user program should contain multiple job >>>> graphs? >>>> >>>> >>>> >>>> WRT the per-program mode, I’m also in flavor of a unified cluster-side >>>> execution >>>> >>>> for user program, so +1 from my side. >>>> >>>> >>>> >>>> But I think there may be some values for the current per-job mode: we >>>> now have >>>> >>>> some common resources available on the client machine that would be >>>> read by main >>>> >>>> methods in user programs. If migrated to per-program mode, we must >>>> explicitly >>>> >>>> set the specific resources for each user program and ship them to the >>>> cluster, >>>> >>>> it would be a bit inconvenient. Also, as the job graph is compiled at >>>> the client, >>>> >>>> we can recognize the errors caused by user code before starting the >>>> cluster >>>> >>>> and easily get access to the logs. >>>> >>>> >>>> >>>> Best, >>>> >>>> Paul Lam >>>> >>>> >>>> >>>> 在 2019年10月30日,16:22,SHI Xiaogang <[hidden email]> 写道: >>>> >>>> >>>> >>>> Hi >>>> >>>> >>>> >>>> Thanks for bringing this. >>>> >>>> >>>> >>>> The design looks very nice to me in that >>>> >>>> 1. In the new per-job mode, we don't need to compile user programs in >>>> the client and can directly run user programs with user jars. That way, >>>> it's easier for resource isolation in multi-tenant platforms and is much >>>> safer. >>>> >>>> 2. The execution of user programs can be unified in session and per-job >>>> modes. In session mode, user jobs are submitted via a remote ClusterClient >>>> while in per-job mode user jobs are submitted via a local ClusterClient. >>>> >>>> >>>> >>>> Regards, >>>> >>>> Xiaogang >>>> >>>> >>>> >>>> tison <[hidden email]> 于2019年10月30日周三 下午3:30写道: >>>> >>>> (CC user list because I think users may have ideas on how per-job mode >>>> should look like) >>>> >>>> >>>> >>>> Hi all, >>>> >>>> In the discussion about Flink on k8s[1] we encounter a problem that >>>> opinions >>>> diverge in how so-called per-job mode works. This thread is aimed at >>>> stating >>>> a dedicated discussion about per-job semantic and how to implement it. >>>> >>>> **The AS IS per-job mode** >>>> >>>> * in standalone deployment, we bundle user jar with Flink jar, retrieve >>>> JobGraph which is the very first JobGraph from user program in >>>> classpath, >>>> and then start a Dispatcher with this JobGraph preconfigured, which >>>> launches it as "recovered" job. >>>> >>>> * in YARN deployment, we accept submission via CliFrontend, extract >>>> JobGraph >>>> which is the very first JobGraph from user program submitted, serialize >>>> the JobGraph and upload it to YARN as resource, and then when AM starts, >>>> retrieve the JobGraph as resource and start Dispatcher with this >>>> JobGraph >>>> preconfigured, follows are the same. >>>> >>>> Specifically, in order to support multiple parts job, if YARN deployment >>>> configured as "attached", it starts a SessionCluster, proceeds the >>>> progress >>>> and shutdown the cluster on job finished. >>>> >>>> **Motivation** >>>> >>>> The implementation mentioned above, however, suffers from problems. The >>>> major >>>> two of them are 1. only respect the very first JobGraph from user >>>> program 2. >>>> compile job in client side >>>> >>>> 1. Only respect the very first JobGraph from user program >>>> >>>> There is already issue about this topic[2]. As we extract JobGraph from >>>> user >>>> program by hijacking Environment#execute we actually abort any execution >>>> after the first call to #execute. Besides it surprises users many times >>>> that >>>> any logic they write in the program is possibly never executed, here the >>>> problem is that the semantic of "job" from Flink perspective. I'd like >>>> to say >>>> in current implementation "per-job" is actually "per-job-graph". >>>> However, >>>> in practices since we support jar submission it is "per-program" >>>> semantic >>>> wanted. >>>> >>>> 2. Compile job in client side >>>> >>>> Well, standalone deployment is not in the case. But in YARN deployment, >>>> we >>>> compile job and get JobGraph in client side, and then upload it to YARN. >>>> This approach, however, somehow breaks isolation. We have observed that >>>> user >>>> program contains exception handling logic which call System.exit in main >>>> method, which causes a compilation of the job exit the whole client at >>>> once. >>>> It is a critical problem if we manage multiple Flink job in a unique >>>> platform. >>>> In this case, it shut down the whole service. >>>> >>>> Besides there are many times I was asked why per-job mode doesn't run >>>> "just like" session mode but with a dedicated cluster. It might imply >>>> that >>>> current implementation mismatches users' demand. >>>> >>>> **Proposal** >>>> >>>> In order to provide a "per-program" semantic mode which acts "just >>>> like" session >>>> mode but with a dedicated cluster, I propose a workflow as below. It >>>> acts like >>>> starting a drive on cluster but is not a general driver solution as >>>> proposed >>>> here[3], the main purpose of the workflow below is for providing a >>>> "per-program" >>>> semantic mode. >>>> >>>> *From CliFrontend* >>>> >>>> 1. CliFrontend receives submission, gathers all configuration and >>>> starts a >>>> corresponding ClusterDescriptor. >>>> >>>> 2. ClusterDescriptor deploys a cluster with main class >>>> ProgramClusterEntrypoint >>>> while shipping resources including user program. >>>> >>>> 3. ProgramClusterEntrypoint#main contains logic starting components >>>> including >>>> Standalone Dispatcher, configuring user program to start a >>>> RpcClusterClient, >>>> and then invoking main method of user program. >>>> >>>> 4. RpcClusterClient acts like MiniClusterClient which is able to submit >>>> the >>>> JobGraph after leader elected so that we don't fallback to round-robin >>>> or >>>> fail submission due to no leader. >>>> >>>> 5. Whether or not deliver job result depends on user program logic, >>>> since we >>>> can already get a JobClient from execute. ProgramClusterEntrypoint >>>> exits on >>>> user program exits and all jobs submitted globally terminate. >>>> >>>> This way fits in the direction of FLIP-73 because strategy starting a >>>> RpcClusterClient can be regarded as a special Executor. After >>>> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes >>>> configuration to >>>> user program so that when Executor generated, it knows to use a >>>> RpcClusterClient >>>> for submission and the address of Dispatcher. >>>> >>>> **Compatibility** >>>> >>>> In my opinion this mode can be totally an add-on to current codebase. We >>>> actually don't replace current per-job mode with so-called >>>> "per-program" mode. >>>> It happens that current per-job mode would be useless if we have such >>>> "per-program" mode so that we possibly deprecate it for preferring the >>>> other. >>>> >>>> I'm glad to discuss more into details if you're interested in, but >>>> let's say >>>> we'd better first reach a consensus on the overall design :-) >>>> >>>> Looking forward to your reply! >>>> >>>> Best, >>>> tison. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-9953 >>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> >>>> [2] https://issues.apache.org/jira/browse/FLINK-10879 >>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> >>>> [3] >>>> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# >>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> >>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> >>>> Your Personal Data: We may collect and process information about you >>>> that may be subject to data protection laws. For more information about how >>>> we use and disclose your personal data, how we protect your information, >>>> our legal basis to use your information, your rights and who you can >>>> contact, please refer to: www.gs.com/privacy-notices >>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> Your Personal Data: We may collect and process information about you >>>> that may be subject to data protection laws. For more information about how >>>> we use and disclose your personal data, how we protect your information, >>>> our legal basis to use your information, your rights and who you can >>>> contact, please refer to: www.gs.com/privacy-notices >>>> >>> |
Free forum by Nabble | Edit this page |