Hi all,
Currently cloud native architectures has been introduced to many companies in production. They use kubernetes to run deep learning, web server, etc. If we could deploy the per-job/session flink cluster on kubernetes to make it mix-run with other workloads, the cluster resource utilization will be better. Also many kubernetes users are more easier to have a taste on the flink. By now we have three options to run flink jobs on k8s. [1]. Create jm/tm/service yaml and apply, then you will get a flink standalone cluster on k8s. Use flink run to submit job to the existed flink cluster. Some companies may have their own deploy system to manage the flink cluster. [2]. Use flink-k8s-operator to manage multiple flink clusters, including session and perjob. It could manage the complete deployment lifecycle of the application. I think this option is really easy to use for the k8s users. They are familiar with k8s-opertor, kubectl and other tools of k8s. They could debug and run the flink cluster just like other k8s applications. [3]. Natively integration with k8s, use the flink run or kubernetes-session.sh to start a flink cluster. It is very similar to submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to k8s api server to start a flink master deployment of 1. KubernetesResourceManager dynamically allocates resource from k8s to start task manager as demand. This option is very easy for flink users to get started. In the simplest case, we just need to update the '-m yarn-cluster' to -m '-m kubernetes-cluster'. We have make an internal implementation of option [3] and use it in production. After fully tested, we hope to contribute it to the community. Now we want to get some feedbacks about the three options. Any comments are welcome. > What do we need to prepare when start a flink cluster on k8s using native integration? Download the flink release binary and create the ~/.kube/config file corresponding to the k8s cluster. It is all what you need. > Flink Session cluster * start a session cluster ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm flink-session-example -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT * You will get an address to submit job, specify it through ’-ksa’ option ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > Flink Job Cluster * running with official flink image ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 -ki flink:latest examples/streaming/WindowJoin.jar * running with user image ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 -ki flink-user:latest examples/streaming/WindowJoin.jar [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html [2].https://github.com/lyft/flinkk8soperator [3]. https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# |
Thanks Yang. K8s natively integration is very necessary and important for
the adoption of flink IMO. I notice that the design doc is written in 2018, is there any changes or update ? >>> Download the flink release binary and create the ~/.kube/config file corresponding to the k8s cluster. It is all what you need. How can I specify which k8s cluster to run in case I have multiple k8s clusters ? Can I do it via specifying flink cluster in flink cli ? Yang Wang <[hidden email]> 于2019年8月9日周五 下午9:12写道: > Hi all, > > Currently cloud native architectures has been introduced to many companies > in production. They use kubernetes to run deep learning, web server, etc. > If we could deploy the per-job/session flink cluster on kubernetes to make > it mix-run with other workloads, the cluster resource utilization will be > better. Also many kubernetes users are more easier to have a taste on the > flink. > > By now we have three options to run flink jobs on k8s. > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > standalone cluster on k8s. Use flink run to submit job to the existed flink > cluster. Some companies may have their own deploy system to manage the > flink cluster. > > [2]. Use flink-k8s-operator to manage multiple flink clusters, including > session and perjob. It could manage the complete deployment lifecycle of > the application. I think this option is really easy to use for the k8s > users. They are familiar with k8s-opertor, kubectl and other tools of k8s. > They could debug and run the flink cluster just like other k8s > applications. > > [3]. Natively integration with k8s, use the flink run or > kubernetes-session.sh to start a flink cluster. It is very similar to > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to > k8s api server to start a flink master deployment of 1. > KubernetesResourceManager dynamically allocates resource from k8s to start > task manager as demand. This option is very easy for flink users to get > started. In the simplest case, we just need to update the '-m yarn-cluster' > to -m '-m kubernetes-cluster'. > > We have make an internal implementation of option [3] and use it in > production. After fully tested, we hope to contribute it to the community. > Now we want to get some feedbacks about the three options. Any comments are > welcome. > > > > What do we need to prepare when start a flink cluster on k8s using native > integration? > > Download the flink release binary and create the ~/.kube/config file > corresponding to the k8s cluster. It is all what you need. > > > > Flink Session cluster > > * start a session cluster > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm flink-session-example > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > * You will get an address to submit job, specify it through ’-ksa’ option > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > Flink Job Cluster > > * running with official flink image > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 > -ki flink:latest examples/streaming/WindowJoin.jar > > * running with user image > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > [1]. > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > [2].https://github.com/lyft/flinkk8soperator > > [3]. > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > -- Best Regards Jeff Zhang |
Thanks for bringing this up. Obviously, option 2 and 3 are both useful for
fink users on kubernetes. But option 3 is easy for users that not have many concepts of kubernetes, they can start flink on kubernetes quickly, I think it should have a higher priority. I have worked some time to integrate flink with our platform based on kubernetes, and have some concerns on option 3 from the platform user's perspective. First, I think users can be divided into common users and downstream platform users. For common users, kubernetes-session.sh (or yarn-session.sh) is convenient for them, just run shell scripts and get the jobmanager address. Then run ./bin/flink to submit a job. But for the platform users, the shell scripts are not friendly to be integrated. I need to use Java ProcessBuilder to run a shell script and redirect the stdout/stderr. I need to parse the stdout log to get the jobId, and need to process the exit code, and need to do some idempotence logic to avoid duplicate jobs to be submitted. The way our platform integrates with flink on k8s is: 1. Generate a job Id, and prepare jobmanager/taskmanager/service/configmap resource files. In the jobmanager and taskmanager resource file, we defined an initContainer to download user jar from http/hdfs/s3..., so the user jar is already on the jm and tm pod before they start. And StandaloneJobClusterEntryPoint can accept "--job-id" to pass pre-generated jobId and accept "--job-classname" to pass user jar entry class and other args[1]. 2. Submit resource files to k8s directly, and that is all. Not need other steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence natural, the same resources will be ignored. 3. Just use the pre-configured job id to query status, the platform knows the job id. The above steps are convenient for platform users. So my concern for option 3 is: 1. Besides to use kubernetes-session.sh to submit a job, can we retain the ability to let users submit k8s resources files directly, not forced to submit jobs from shell scripts. As you know, everything in kubernetes is a resource, submit a resource to kubernetes is more natural. 2. Retain the ability to pass job-classname to start Flink Job Cluster, so the platform users do not need a step to submit jar whether from ./bin/flink or from restful API. And for Flink Session Cluster, the platform uses can submit kubernetes resource files to start a session cluster, and then submit jar job from restful API to avoid call the shell scripts. 3. Retain the ability to pass job-id, It is not convenient and friendly to find which job id you have just submitted whether parse the submit log or query jobmanager restful API. And it is impossible to find the jobId in the session cluster scene, there will be many jobs with the same name and same submit time. I think it's better to retain these features already provided by the StandaloneJobClusterEntryPoint in option 3. This will make flink easier to be integrated with other platforms based on kubernetes. Thanks Kaibo [1]. https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45 Jeff Zhang <[hidden email]> 于2019年8月10日周六 下午1:52写道: > Thanks Yang. K8s natively integration is very necessary and important for > the adoption of flink IMO. > I notice that the design doc is written in 2018, is there any changes or > update ? > > >>> Download the flink release binary and create the ~/.kube/config file > corresponding to the k8s cluster. It is all what you need. > > How can I specify which k8s cluster to run in case I have multiple k8s > clusters ? Can I do it via specifying flink cluster in flink cli ? > > Yang Wang <[hidden email]> 于2019年8月9日周五 下午9:12写道: > > > Hi all, > > > > Currently cloud native architectures has been introduced to many > companies > > in production. They use kubernetes to run deep learning, web server, etc. > > If we could deploy the per-job/session flink cluster on kubernetes to > make > > it mix-run with other workloads, the cluster resource utilization will be > > better. Also many kubernetes users are more easier to have a taste on the > > flink. > > > > By now we have three options to run flink jobs on k8s. > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > standalone cluster on k8s. Use flink run to submit job to the existed > flink > > cluster. Some companies may have their own deploy system to manage the > > flink cluster. > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, including > > session and perjob. It could manage the complete deployment lifecycle of > > the application. I think this option is really easy to use for the k8s > > users. They are familiar with k8s-opertor, kubectl and other tools of > k8s. > > They could debug and run the flink cluster just like other k8s > > applications. > > > > [3]. Natively integration with k8s, use the flink run or > > kubernetes-session.sh to start a flink cluster. It is very similar to > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to > > k8s api server to start a flink master deployment of 1. > > KubernetesResourceManager dynamically allocates resource from k8s to > start > > task manager as demand. This option is very easy for flink users to get > > started. In the simplest case, we just need to update the '-m > yarn-cluster' > > to -m '-m kubernetes-cluster'. > > > > We have make an internal implementation of option [3] and use it in > > production. After fully tested, we hope to contribute it to the > community. > > Now we want to get some feedbacks about the three options. Any comments > are > > welcome. > > > > > > > What do we need to prepare when start a flink cluster on k8s using > native > > integration? > > > > Download the flink release binary and create the ~/.kube/config file > > corresponding to the k8s cluster. It is all what you need. > > > > > > > Flink Session cluster > > > > * start a session cluster > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > flink-session-example > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > * You will get an address to submit job, specify it through ’-ksa’ > option > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > > > > Flink Job Cluster > > > > * running with official flink image > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 > > -ki flink:latest examples/streaming/WindowJoin.jar > > > > * running with user image > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 > > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > > > > > [1]. > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > > > [2].https://github.com/lyft/flinkk8soperator > > > > [3]. > > > > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > > > > > -- > Best Regards > > Jeff Zhang > |
Hi Jeff,
Thank you for your attention. You are right, the design doc is out of date. I will try to contact with Till and Sun Jin to get the edit permission. And then update the design doc. Currently we get the k8s cluster informations from the kube config file. You may have multiple contexts in your kube config file, just set current-context to which you want to submit flink cluster to. Also i think we could use a flink config option(api server address or context name) to override the current-context. Best, Yang Kaibo Zhou <[hidden email]> 于2019年8月11日周日 下午1:23写道: > Thanks for bringing this up. Obviously, option 2 and 3 are both useful for > fink users on kubernetes. But option 3 is easy for users that not have many > concepts of kubernetes, they can start flink on kubernetes quickly, I think > it should have a higher priority. > > I have worked some time to integrate flink with our platform based on > kubernetes, and have some concerns on option 3 from the platform user's > perspective. > > First, I think users can be divided into common users and downstream > platform users. > > For common users, kubernetes-session.sh (or yarn-session.sh) is convenient > for them, just run shell scripts and get the jobmanager address. Then run > ./bin/flink to submit a job. > > But for the platform users, the shell scripts are not friendly to be > integrated. I need to use Java ProcessBuilder to run a shell script and > redirect the stdout/stderr. I need to parse the stdout log to get the > jobId, and need to process the exit code, and need to do some idempotence > logic to avoid duplicate jobs to be submitted. > > The way our platform integrates with flink on k8s is: > 1. Generate a job Id, and prepare jobmanager/taskmanager/service/configmap > resource files. > In the jobmanager and taskmanager resource file, we defined an > initContainer to download user jar from http/hdfs/s3..., so the user jar is > already on the jm and tm pod before they start. And > StandaloneJobClusterEntryPoint can accept "--job-id" to pass pre-generated > jobId and accept "--job-classname" to pass user jar entry class and other > args[1]. > > 2. Submit resource files to k8s directly, and that is all. Not need other > steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence > natural, the same resources will be ignored. > > 3. Just use the pre-configured job id to query status, the platform knows > the job id. > > The above steps are convenient for platform users. So my concern for option > 3 is: > 1. Besides to use kubernetes-session.sh to submit a job, can we retain the > ability to let users submit k8s resources files directly, not forced to > submit jobs from shell scripts. As you know, everything in kubernetes is a > resource, submit a resource to kubernetes is more natural. > > 2. Retain the ability to pass job-classname to start Flink Job Cluster, so > the platform users do not need a step to submit jar whether from > ./bin/flink or from restful API. > And for Flink Session Cluster, the platform uses can submit kubernetes > resource files to start a session cluster, and then submit jar job from > restful API to avoid call the shell scripts. > > 3. Retain the ability to pass job-id, It is not convenient and friendly to > find which job id you have just submitted whether parse the submit log or > query jobmanager restful API. And it is impossible to find the jobId in the > session cluster scene, there will be many jobs with the same name and same > submit time. > > I think it's better to retain these features already provided by the > StandaloneJobClusterEntryPoint in option 3. This will make flink easier to > be integrated with other platforms based on kubernetes. > > Thanks > Kaibo > > [1]. > > https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45 > > Jeff Zhang <[hidden email]> 于2019年8月10日周六 下午1:52写道: > > > Thanks Yang. K8s natively integration is very necessary and important for > > the adoption of flink IMO. > > I notice that the design doc is written in 2018, is there any changes or > > update ? > > > > >>> Download the flink release binary and create the ~/.kube/config file > > corresponding to the k8s cluster. It is all what you need. > > > > How can I specify which k8s cluster to run in case I have multiple k8s > > clusters ? Can I do it via specifying flink cluster in flink cli ? > > > > Yang Wang <[hidden email]> 于2019年8月9日周五 下午9:12写道: > > > > > Hi all, > > > > > > Currently cloud native architectures has been introduced to many > > companies > > > in production. They use kubernetes to run deep learning, web server, > etc. > > > If we could deploy the per-job/session flink cluster on kubernetes to > > make > > > it mix-run with other workloads, the cluster resource utilization will > be > > > better. Also many kubernetes users are more easier to have a taste on > the > > > flink. > > > > > > By now we have three options to run flink jobs on k8s. > > > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > > standalone cluster on k8s. Use flink run to submit job to the existed > > flink > > > cluster. Some companies may have their own deploy system to manage the > > > flink cluster. > > > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, > including > > > session and perjob. It could manage the complete deployment lifecycle > of > > > the application. I think this option is really easy to use for the k8s > > > users. They are familiar with k8s-opertor, kubectl and other tools of > > k8s. > > > They could debug and run the flink cluster just like other k8s > > > applications. > > > > > > [3]. Natively integration with k8s, use the flink run or > > > kubernetes-session.sh to start a flink cluster. It is very similar to > > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks > to > > > k8s api server to start a flink master deployment of 1. > > > KubernetesResourceManager dynamically allocates resource from k8s to > > start > > > task manager as demand. This option is very easy for flink users to get > > > started. In the simplest case, we just need to update the '-m > > yarn-cluster' > > > to -m '-m kubernetes-cluster'. > > > > > > We have make an internal implementation of option [3] and use it in > > > production. After fully tested, we hope to contribute it to the > > community. > > > Now we want to get some feedbacks about the three options. Any comments > > are > > > welcome. > > > > > > > > > > What do we need to prepare when start a flink cluster on k8s using > > native > > > integration? > > > > > > Download the flink release binary and create the ~/.kube/config file > > > corresponding to the k8s cluster. It is all what you need. > > > > > > > > > > Flink Session cluster > > > > > > * start a session cluster > > > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > > flink-session-example > > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > > > * You will get an address to submit job, specify it through ’-ksa’ > > option > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-session-example > > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > > > > > > > Flink Job Cluster > > > > > > * running with official flink image > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-perjob-example-1 > > > -ki flink:latest examples/streaming/WindowJoin.jar > > > > > > * running with user image > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-perjob-example-1 > > > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > > > > > > > > > [1]. > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > > > > > [2].https://github.com/lyft/flinkk8soperator > > > > > > [3]. > > > > > > > > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > > > |
In reply to this post by Kaibo Zhou
Hi kaibo,
I am really appreciated that you could share your use case. As you say, our users in production also could be divided into two groups. The common users have more knowledge about flink, they could use the command line to submit job and debug job from logs of job manager and taskmanager in the kubenetes. And for platform users, they use the yaml config files or platform web to submit flink jobs. Regarding your comments: 1. Of course, the option 1(standalone on k8s) should always work as expected. Users could submit the jm/tm/svc resource files to start a flink cluster. The option 3(k8s native integration) will support both resource files and command line submission. The resource file below is to create a flink perjob cluster. apiVersion: extensions/v1beta1 kind: Deployment metadata: name: flink-word-count spec: image: flink-wordcount:latest flinkConfig: state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints jobManagerConfig: resources: requests: memory: “1024Mi" cpu: “1” taskManagerConfig: taskSlots: 2 resources: requests: memory: “1024Mi" cpu: “1” jobId: “aaaabbbbccccddddaaaabbbbccccdddd” parallelism: 3 jobClassName: "org.apache.flink.streaming.examples.wordcount.WordCount" 2. The ability to pass job-classname will be retained. The class should be found in the classpath of taskmanager image. The flink per-job cluster describe by yaml resource in section 1 could also be submitted by flink command. flink run -m kubernetes-cluster -p 3 -knm flink-word-count -ki flink-wordcount:latest -kjm 1024 -ktm 1024 -kD kubernetes.jobmanager.cpu=1 -kD kubernetes.taskmanager.cpu=1 -kjid aaaabbbbccccddddaaaabbbbccccdddd -kjc org.apache.flink.streaming.examples.wordcount.WordCount -kD state.checkpoints.dir= file:///checkpoints/flink/externalized-checkpoints 3. The job-id could also be specified by -kjid just like the command above. In a nutshell, the option 3 should have all the abilities in option 1. Common users and platform users are all satisfied. Best, Yang Kaibo Zhou <[hidden email]> 于2019年8月11日周日 下午1:23写道: > Thanks for bringing this up. Obviously, option 2 and 3 are both useful for > fink users on kubernetes. But option 3 is easy for users that not have many > concepts of kubernetes, they can start flink on kubernetes quickly, I think > it should have a higher priority. > > I have worked some time to integrate flink with our platform based on > kubernetes, and have some concerns on option 3 from the platform user's > perspective. > > First, I think users can be divided into common users and downstream > platform users. > > For common users, kubernetes-session.sh (or yarn-session.sh) is convenient > for them, just run shell scripts and get the jobmanager address. Then run > ./bin/flink to submit a job. > > But for the platform users, the shell scripts are not friendly to be > integrated. I need to use Java ProcessBuilder to run a shell script and > redirect the stdout/stderr. I need to parse the stdout log to get the > jobId, and need to process the exit code, and need to do some idempotence > logic to avoid duplicate jobs to be submitted. > > The way our platform integrates with flink on k8s is: > 1. Generate a job Id, and prepare jobmanager/taskmanager/service/configmap > resource files. > In the jobmanager and taskmanager resource file, we defined an > initContainer to download user jar from http/hdfs/s3..., so the user jar is > already on the jm and tm pod before they start. And > StandaloneJobClusterEntryPoint can accept "--job-id" to pass pre-generated > jobId and accept "--job-classname" to pass user jar entry class and other > args[1]. > > 2. Submit resource files to k8s directly, and that is all. Not need other > steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence > natural, the same resources will be ignored. > > 3. Just use the pre-configured job id to query status, the platform knows > the job id. > > The above steps are convenient for platform users. So my concern for option > 3 is: > 1. Besides to use kubernetes-session.sh to submit a job, can we retain the > ability to let users submit k8s resources files directly, not forced to > submit jobs from shell scripts. As you know, everything in kubernetes is a > resource, submit a resource to kubernetes is more natural. > > 2. Retain the ability to pass job-classname to start Flink Job Cluster, so > the platform users do not need a step to submit jar whether from > ./bin/flink or from restful API. > And for Flink Session Cluster, the platform uses can submit kubernetes > resource files to start a session cluster, and then submit jar job from > restful API to avoid call the shell scripts. > > 3. Retain the ability to pass job-id, It is not convenient and friendly to > find which job id you have just submitted whether parse the submit log or > query jobmanager restful API. And it is impossible to find the jobId in the > session cluster scene, there will be many jobs with the same name and same > submit time. > > I think it's better to retain these features already provided by the > StandaloneJobClusterEntryPoint in option 3. This will make flink easier to > be integrated with other platforms based on kubernetes. > > Thanks > Kaibo > > [1]. > > https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45 > > Jeff Zhang <[hidden email]> 于2019年8月10日周六 下午1:52写道: > > > Thanks Yang. K8s natively integration is very necessary and important for > > the adoption of flink IMO. > > I notice that the design doc is written in 2018, is there any changes or > > update ? > > > > >>> Download the flink release binary and create the ~/.kube/config file > > corresponding to the k8s cluster. It is all what you need. > > > > How can I specify which k8s cluster to run in case I have multiple k8s > > clusters ? Can I do it via specifying flink cluster in flink cli ? > > > > Yang Wang <[hidden email]> 于2019年8月9日周五 下午9:12写道: > > > > > Hi all, > > > > > > Currently cloud native architectures has been introduced to many > > companies > > > in production. They use kubernetes to run deep learning, web server, > etc. > > > If we could deploy the per-job/session flink cluster on kubernetes to > > make > > > it mix-run with other workloads, the cluster resource utilization will > be > > > better. Also many kubernetes users are more easier to have a taste on > the > > > flink. > > > > > > By now we have three options to run flink jobs on k8s. > > > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > > standalone cluster on k8s. Use flink run to submit job to the existed > > flink > > > cluster. Some companies may have their own deploy system to manage the > > > flink cluster. > > > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, > including > > > session and perjob. It could manage the complete deployment lifecycle > of > > > the application. I think this option is really easy to use for the k8s > > > users. They are familiar with k8s-opertor, kubectl and other tools of > > k8s. > > > They could debug and run the flink cluster just like other k8s > > > applications. > > > > > > [3]. Natively integration with k8s, use the flink run or > > > kubernetes-session.sh to start a flink cluster. It is very similar to > > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks > to > > > k8s api server to start a flink master deployment of 1. > > > KubernetesResourceManager dynamically allocates resource from k8s to > > start > > > task manager as demand. This option is very easy for flink users to get > > > started. In the simplest case, we just need to update the '-m > > yarn-cluster' > > > to -m '-m kubernetes-cluster'. > > > > > > We have make an internal implementation of option [3] and use it in > > > production. After fully tested, we hope to contribute it to the > > community. > > > Now we want to get some feedbacks about the three options. Any comments > > are > > > welcome. > > > > > > > > > > What do we need to prepare when start a flink cluster on k8s using > > native > > > integration? > > > > > > Download the flink release binary and create the ~/.kube/config file > > > corresponding to the k8s cluster. It is all what you need. > > > > > > > > > > Flink Session cluster > > > > > > * start a session cluster > > > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > > flink-session-example > > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > > > * You will get an address to submit job, specify it through ’-ksa’ > > option > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-session-example > > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > > > > > > > Flink Job Cluster > > > > > > * running with official flink image > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-perjob-example-1 > > > -ki flink:latest examples/streaming/WindowJoin.jar > > > > > > * running with user image > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-perjob-example-1 > > > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > > > > > > > > > [1]. > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > > > > > [2].https://github.com/lyft/flinkk8soperator > > > > > > [3]. > > > > > > > > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > > > |
Hi Yang,
thanks for reviving the discussion about Flink's Kubernetes integration. In a nutshell, I think that Flink should support option 1) and 3). Concretely, option 1) would be covered by the reactive mode [1] which is not necessarily bound to Kubernetes and works in all environments equally well. Option 3) is the native Kubernetes integration which is described in the design document. Actually, the discussion had been concluded already some time ago and there are already multiple PRs open for adding this feature [2]. So maybe you could check these PRs out and help the community reviewing and merging this code. Based on this we could then think about additions/improvements which are necessary. For option 2), I think a Kubernetes operator would be a good project for Flink's ecosystem website [3] and does not need to be necessarily part of Flink's repository. [1] https://issues.apache.org/jira/browse/FLINK-10407 [2] https://issues.apache.org/jira/browse/FLINK-9953 [3] https://lists.apache.org/thread.html/9b873f9dc1dd56d79e0f71418b123def896ed02f57e84461bc0bacb0@%3Cdev.flink.apache.org%3E Cheers, Till On Mon, Aug 12, 2019 at 5:46 AM Yang Wang <[hidden email]> wrote: > Hi kaibo, > > > I am really appreciated that you could share your use case. > > As you say, our users in production also could be divided into two groups. > The common users have more knowledge about flink, they could use the > command line to submit job and debug job from logs of job manager and > taskmanager in the kubenetes. And for platform users, they use the yaml > config files or platform web to submit flink jobs. > > Regarding your comments: > > 1. Of course, the option 1(standalone on k8s) should always work as > expected. Users could submit the jm/tm/svc resource files to start a flink > cluster. The option 3(k8s native integration) will support both resource > files and command line submission. The resource file below is to create a > flink perjob cluster. > > apiVersion: extensions/v1beta1 > > kind: Deployment > > metadata: > > name: flink-word-count > > spec: > > image: flink-wordcount:latest > > flinkConfig: > > state.checkpoints.dir: > file:///checkpoints/flink/externalized-checkpoints > > jobManagerConfig: > > resources: > > requests: > > memory: “1024Mi" > > cpu: “1” > > taskManagerConfig: > > taskSlots: 2 > > resources: > > requests: > > memory: “1024Mi" > > cpu: “1” > > jobId: “aaaabbbbccccddddaaaabbbbccccdddd” > > parallelism: 3 > > jobClassName: "org.apache.flink.streaming.examples.wordcount.WordCount" > > 2. The ability to pass job-classname will be retained. The class should be > found in the classpath of taskmanager image. The flink per-job cluster > describe by yaml resource in section 1 could also be submitted by flink > command. > > flink run -m kubernetes-cluster -p 3 -knm flink-word-count -ki > flink-wordcount:latest -kjm 1024 -ktm 1024 -kD kubernetes.jobmanager.cpu=1 > -kD kubernetes.taskmanager.cpu=1 -kjid aaaabbbbccccddddaaaabbbbccccdddd > -kjc org.apache.flink.streaming.examples.wordcount.WordCount -kD > state.checkpoints.dir= file:///checkpoints/flink/externalized-checkpoints > > 3. The job-id could also be specified by -kjid just like the command above. > > In a nutshell, the option 3 should have all the abilities in option 1. > Common users and platform users are all satisfied. > > > > Best, > > Yang > > > Kaibo Zhou <[hidden email]> 于2019年8月11日周日 下午1:23写道: > > > Thanks for bringing this up. Obviously, option 2 and 3 are both useful > for > > fink users on kubernetes. But option 3 is easy for users that not have > many > > concepts of kubernetes, they can start flink on kubernetes quickly, I > think > > it should have a higher priority. > > > > I have worked some time to integrate flink with our platform based on > > kubernetes, and have some concerns on option 3 from the platform user's > > perspective. > > > > First, I think users can be divided into common users and downstream > > platform users. > > > > For common users, kubernetes-session.sh (or yarn-session.sh) is > convenient > > for them, just run shell scripts and get the jobmanager address. Then run > > ./bin/flink to submit a job. > > > > But for the platform users, the shell scripts are not friendly to be > > integrated. I need to use Java ProcessBuilder to run a shell script and > > redirect the stdout/stderr. I need to parse the stdout log to get the > > jobId, and need to process the exit code, and need to do some idempotence > > logic to avoid duplicate jobs to be submitted. > > > > The way our platform integrates with flink on k8s is: > > 1. Generate a job Id, and prepare > jobmanager/taskmanager/service/configmap > > resource files. > > In the jobmanager and taskmanager resource file, we defined an > > initContainer to download user jar from http/hdfs/s3..., so the user jar > is > > already on the jm and tm pod before they start. And > > StandaloneJobClusterEntryPoint can accept "--job-id" to pass > pre-generated > > jobId and accept "--job-classname" to pass user jar entry class and other > > args[1]. > > > > 2. Submit resource files to k8s directly, and that is all. Not need other > > steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence > > natural, the same resources will be ignored. > > > > 3. Just use the pre-configured job id to query status, the platform knows > > the job id. > > > > The above steps are convenient for platform users. So my concern for > option > > 3 is: > > 1. Besides to use kubernetes-session.sh to submit a job, can we retain > the > > ability to let users submit k8s resources files directly, not forced to > > submit jobs from shell scripts. As you know, everything in kubernetes is > a > > resource, submit a resource to kubernetes is more natural. > > > > 2. Retain the ability to pass job-classname to start Flink Job Cluster, > so > > the platform users do not need a step to submit jar whether from > > ./bin/flink or from restful API. > > And for Flink Session Cluster, the platform uses can submit kubernetes > > resource files to start a session cluster, and then submit jar job from > > restful API to avoid call the shell scripts. > > > > 3. Retain the ability to pass job-id, It is not convenient and friendly > to > > find which job id you have just submitted whether parse the submit log or > > query jobmanager restful API. And it is impossible to find the jobId in > the > > session cluster scene, there will be many jobs with the same name and > same > > submit time. > > > > I think it's better to retain these features already provided by the > > StandaloneJobClusterEntryPoint in option 3. This will make flink easier > to > > be integrated with other platforms based on kubernetes. > > > > Thanks > > Kaibo > > > > [1]. > > > > > https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45 > > > > Jeff Zhang <[hidden email]> 于2019年8月10日周六 下午1:52写道: > > > > > Thanks Yang. K8s natively integration is very necessary and important > for > > > the adoption of flink IMO. > > > I notice that the design doc is written in 2018, is there any changes > or > > > update ? > > > > > > >>> Download the flink release binary and create the ~/.kube/config > file > > > corresponding to the k8s cluster. It is all what you need. > > > > > > How can I specify which k8s cluster to run in case I have multiple k8s > > > clusters ? Can I do it via specifying flink cluster in flink cli ? > > > > > > Yang Wang <[hidden email]> 于2019年8月9日周五 下午9:12写道: > > > > > > > Hi all, > > > > > > > > Currently cloud native architectures has been introduced to many > > > companies > > > > in production. They use kubernetes to run deep learning, web server, > > etc. > > > > If we could deploy the per-job/session flink cluster on kubernetes to > > > make > > > > it mix-run with other workloads, the cluster resource utilization > will > > be > > > > better. Also many kubernetes users are more easier to have a taste on > > the > > > > flink. > > > > > > > > By now we have three options to run flink jobs on k8s. > > > > > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > > > standalone cluster on k8s. Use flink run to submit job to the existed > > > flink > > > > cluster. Some companies may have their own deploy system to manage > the > > > > flink cluster. > > > > > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, > > including > > > > session and perjob. It could manage the complete deployment lifecycle > > of > > > > the application. I think this option is really easy to use for the > k8s > > > > users. They are familiar with k8s-opertor, kubectl and other tools of > > > k8s. > > > > They could debug and run the flink cluster just like other k8s > > > > applications. > > > > > > > > [3]. Natively integration with k8s, use the flink run or > > > > kubernetes-session.sh to start a flink cluster. It is very similar to > > > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor > talks > > to > > > > k8s api server to start a flink master deployment of 1. > > > > KubernetesResourceManager dynamically allocates resource from k8s to > > > start > > > > task manager as demand. This option is very easy for flink users to > get > > > > started. In the simplest case, we just need to update the '-m > > > yarn-cluster' > > > > to -m '-m kubernetes-cluster'. > > > > > > > > We have make an internal implementation of option [3] and use it in > > > > production. After fully tested, we hope to contribute it to the > > > community. > > > > Now we want to get some feedbacks about the three options. Any > comments > > > are > > > > welcome. > > > > > > > > > > > > > What do we need to prepare when start a flink cluster on k8s using > > > native > > > > integration? > > > > > > > > Download the flink release binary and create the ~/.kube/config file > > > > corresponding to the k8s cluster. It is all what you need. > > > > > > > > > > > > > Flink Session cluster > > > > > > > > * start a session cluster > > > > > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > > > flink-session-example > > > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > > > > > * You will get an address to submit job, specify it through ’-ksa’ > > > option > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > flink-session-example > > > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > > > > > > > > > > Flink Job Cluster > > > > > > > > * running with official flink image > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > flink-perjob-example-1 > > > > -ki flink:latest examples/streaming/WindowJoin.jar > > > > > > > > * running with user image > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > flink-perjob-example-1 > > > > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > > > > > > > > > > > > > [1]. > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > > > > > > > [2].https://github.com/lyft/flinkk8soperator > > > > > > > > [3]. > > > > > > > > > > > > > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > > > > > > > > > > > > > -- > > > Best Regards > > > > > > Jeff Zhang > > > > > > |
Hi till,
Thanks for your reply. I agree with you that both option 1 and 3 need to be supported. Option 1 is reactive mode of resource management and flink is not aware of underlying cluster. If a user has limited resources to run flink jobs, this option will be very useful. On the other side, option 3 is active mode resource management. Compared with option 1, the biggest advantage is that we could allocate resource from k8s cluster on demand. Especially batch jobs will benefit a lot from this. I do not mean to abandon the proposal and Implementation in FLINK-9953. Actually i have contacted with the assignee(Chunhui Shi) to help to review and test the PRs. After all the basic Implementations have been merged, the production features will be considered. Best, Yang Till Rohrmann <[hidden email]> 于2019年8月13日周二 下午5:36写道: > Hi Yang, > > thanks for reviving the discussion about Flink's Kubernetes integration. In > a nutshell, I think that Flink should support option 1) and 3). Concretely, > option 1) would be covered by the reactive mode [1] which is not > necessarily bound to Kubernetes and works in all environments equally well. > Option 3) is the native Kubernetes integration which is described in the > design document. Actually, the discussion had been concluded already some > time ago and there are already multiple PRs open for adding this feature > [2]. So maybe you could check these PRs out and help the community > reviewing and merging this code. Based on this we could then think about > additions/improvements which are necessary. > > For option 2), I think a Kubernetes operator would be a good project for > Flink's ecosystem website [3] and does not need to be necessarily part of > Flink's repository. > > [1] https://issues.apache.org/jira/browse/FLINK-10407 > [2] https://issues.apache.org/jira/browse/FLINK-9953 > [3] > > https://lists.apache.org/thread.html/9b873f9dc1dd56d79e0f71418b123def896ed02f57e84461bc0bacb0@%3Cdev.flink.apache.org%3E > > Cheers, > Till > > On Mon, Aug 12, 2019 at 5:46 AM Yang Wang <[hidden email]> wrote: > > > Hi kaibo, > > > > > > I am really appreciated that you could share your use case. > > > > As you say, our users in production also could be divided into two > groups. > > The common users have more knowledge about flink, they could use the > > command line to submit job and debug job from logs of job manager and > > taskmanager in the kubenetes. And for platform users, they use the yaml > > config files or platform web to submit flink jobs. > > > > Regarding your comments: > > > > 1. Of course, the option 1(standalone on k8s) should always work as > > expected. Users could submit the jm/tm/svc resource files to start a > flink > > cluster. The option 3(k8s native integration) will support both resource > > files and command line submission. The resource file below is to create a > > flink perjob cluster. > > > > apiVersion: extensions/v1beta1 > > > > kind: Deployment > > > > metadata: > > > > name: flink-word-count > > > > spec: > > > > image: flink-wordcount:latest > > > > flinkConfig: > > > > state.checkpoints.dir: > > file:///checkpoints/flink/externalized-checkpoints > > > > jobManagerConfig: > > > > resources: > > > > requests: > > > > memory: “1024Mi" > > > > cpu: “1” > > > > taskManagerConfig: > > > > taskSlots: 2 > > > > resources: > > > > requests: > > > > memory: “1024Mi" > > > > cpu: “1” > > > > jobId: “aaaabbbbccccddddaaaabbbbccccdddd” > > > > parallelism: 3 > > > > jobClassName: "org.apache.flink.streaming.examples.wordcount.WordCount" > > > > 2. The ability to pass job-classname will be retained. The class should > be > > found in the classpath of taskmanager image. The flink per-job cluster > > describe by yaml resource in section 1 could also be submitted by flink > > command. > > > > flink run -m kubernetes-cluster -p 3 -knm flink-word-count -ki > > flink-wordcount:latest -kjm 1024 -ktm 1024 -kD > kubernetes.jobmanager.cpu=1 > > -kD kubernetes.taskmanager.cpu=1 -kjid aaaabbbbccccddddaaaabbbbccccdddd > > -kjc org.apache.flink.streaming.examples.wordcount.WordCount -kD > > state.checkpoints.dir= file:///checkpoints/flink/externalized-checkpoints > > > > 3. The job-id could also be specified by -kjid just like the command > above. > > > > In a nutshell, the option 3 should have all the abilities in option 1. > > Common users and platform users are all satisfied. > > > > > > > > Best, > > > > Yang > > > > > > Kaibo Zhou <[hidden email]> 于2019年8月11日周日 下午1:23写道: > > > > > Thanks for bringing this up. Obviously, option 2 and 3 are both useful > > for > > > fink users on kubernetes. But option 3 is easy for users that not have > > many > > > concepts of kubernetes, they can start flink on kubernetes quickly, I > > think > > > it should have a higher priority. > > > > > > I have worked some time to integrate flink with our platform based on > > > kubernetes, and have some concerns on option 3 from the platform user's > > > perspective. > > > > > > First, I think users can be divided into common users and downstream > > > platform users. > > > > > > For common users, kubernetes-session.sh (or yarn-session.sh) is > > convenient > > > for them, just run shell scripts and get the jobmanager address. Then > run > > > ./bin/flink to submit a job. > > > > > > But for the platform users, the shell scripts are not friendly to be > > > integrated. I need to use Java ProcessBuilder to run a shell script and > > > redirect the stdout/stderr. I need to parse the stdout log to get the > > > jobId, and need to process the exit code, and need to do some > idempotence > > > logic to avoid duplicate jobs to be submitted. > > > > > > The way our platform integrates with flink on k8s is: > > > 1. Generate a job Id, and prepare > > jobmanager/taskmanager/service/configmap > > > resource files. > > > In the jobmanager and taskmanager resource file, we defined an > > > initContainer to download user jar from http/hdfs/s3..., so the user > jar > > is > > > already on the jm and tm pod before they start. And > > > StandaloneJobClusterEntryPoint can accept "--job-id" to pass > > pre-generated > > > jobId and accept "--job-classname" to pass user jar entry class and > other > > > args[1]. > > > > > > 2. Submit resource files to k8s directly, and that is all. Not need > other > > > steps, e.g. upload/submit jar to flink, and k8s guarantee the > idempotence > > > natural, the same resources will be ignored. > > > > > > 3. Just use the pre-configured job id to query status, the platform > knows > > > the job id. > > > > > > The above steps are convenient for platform users. So my concern for > > option > > > 3 is: > > > 1. Besides to use kubernetes-session.sh to submit a job, can we retain > > the > > > ability to let users submit k8s resources files directly, not forced to > > > submit jobs from shell scripts. As you know, everything in kubernetes > is > > a > > > resource, submit a resource to kubernetes is more natural. > > > > > > 2. Retain the ability to pass job-classname to start Flink Job Cluster, > > so > > > the platform users do not need a step to submit jar whether from > > > ./bin/flink or from restful API. > > > And for Flink Session Cluster, the platform uses can submit kubernetes > > > resource files to start a session cluster, and then submit jar job from > > > restful API to avoid call the shell scripts. > > > > > > 3. Retain the ability to pass job-id, It is not convenient and friendly > > to > > > find which job id you have just submitted whether parse the submit log > or > > > query jobmanager restful API. And it is impossible to find the jobId in > > the > > > session cluster scene, there will be many jobs with the same name and > > same > > > submit time. > > > > > > I think it's better to retain these features already provided by the > > > StandaloneJobClusterEntryPoint in option 3. This will make flink easier > > to > > > be integrated with other platforms based on kubernetes. > > > > > > Thanks > > > Kaibo > > > > > > [1]. > > > > > > > > > https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45 > > > > > > Jeff Zhang <[hidden email]> 于2019年8月10日周六 下午1:52写道: > > > > > > > Thanks Yang. K8s natively integration is very necessary and important > > for > > > > the adoption of flink IMO. > > > > I notice that the design doc is written in 2018, is there any changes > > or > > > > update ? > > > > > > > > >>> Download the flink release binary and create the ~/.kube/config > > file > > > > corresponding to the k8s cluster. It is all what you need. > > > > > > > > How can I specify which k8s cluster to run in case I have multiple > k8s > > > > clusters ? Can I do it via specifying flink cluster in flink cli ? > > > > > > > > Yang Wang <[hidden email]> 于2019年8月9日周五 下午9:12写道: > > > > > > > > > Hi all, > > > > > > > > > > Currently cloud native architectures has been introduced to many > > > > companies > > > > > in production. They use kubernetes to run deep learning, web > server, > > > etc. > > > > > If we could deploy the per-job/session flink cluster on kubernetes > to > > > > make > > > > > it mix-run with other workloads, the cluster resource utilization > > will > > > be > > > > > better. Also many kubernetes users are more easier to have a taste > on > > > the > > > > > flink. > > > > > > > > > > By now we have three options to run flink jobs on k8s. > > > > > > > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > > > > standalone cluster on k8s. Use flink run to submit job to the > existed > > > > flink > > > > > cluster. Some companies may have their own deploy system to manage > > the > > > > > flink cluster. > > > > > > > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, > > > including > > > > > session and perjob. It could manage the complete deployment > lifecycle > > > of > > > > > the application. I think this option is really easy to use for the > > k8s > > > > > users. They are familiar with k8s-opertor, kubectl and other tools > of > > > > k8s. > > > > > They could debug and run the flink cluster just like other k8s > > > > > applications. > > > > > > > > > > [3]. Natively integration with k8s, use the flink run or > > > > > kubernetes-session.sh to start a flink cluster. It is very similar > to > > > > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor > > talks > > > to > > > > > k8s api server to start a flink master deployment of 1. > > > > > KubernetesResourceManager dynamically allocates resource from k8s > to > > > > start > > > > > task manager as demand. This option is very easy for flink users to > > get > > > > > started. In the simplest case, we just need to update the '-m > > > > yarn-cluster' > > > > > to -m '-m kubernetes-cluster'. > > > > > > > > > > We have make an internal implementation of option [3] and use it in > > > > > production. After fully tested, we hope to contribute it to the > > > > community. > > > > > Now we want to get some feedbacks about the three options. Any > > comments > > > > are > > > > > welcome. > > > > > > > > > > > > > > > > What do we need to prepare when start a flink cluster on k8s > using > > > > native > > > > > integration? > > > > > > > > > > Download the flink release binary and create the ~/.kube/config > file > > > > > corresponding to the k8s cluster. It is all what you need. > > > > > > > > > > > > > > > > Flink Session cluster > > > > > > > > > > * start a session cluster > > > > > > > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > > > > flink-session-example > > > > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > > > > > > > * You will get an address to submit job, specify it through ’-ksa’ > > > > option > > > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > > flink-session-example > > > > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > > > > > > > > > > > > > Flink Job Cluster > > > > > > > > > > * running with official flink image > > > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > > flink-perjob-example-1 > > > > > -ki flink:latest examples/streaming/WindowJoin.jar > > > > > > > > > > * running with user image > > > > > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > > > flink-perjob-example-1 > > > > > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > > > > > > > > > > > > > > > > > [1]. > > > > > > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > > > > > > > > > [2].https://github.com/lyft/flinkk8soperator > > > > > > > > > > [3]. > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > > > > > > > > > > > > > > > > > -- > > > > Best Regards > > > > > > > > Jeff Zhang > > > > > > > > > > |
In reply to this post by Yang Wang
Thanks Yang for bringing this up. I think option1 is very useful for early
adopters. People do not know much about k8s and can easily set up on minikube to have a taste. For option2 and option3, i prefer option3 because i am familiar yarn and don't have much concept of k8s. And there is some doube about starting a session cluster in option3: > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm flink-session-example > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT Is the -n option means number of TaskManager? Do we pre-running taskmanager pods or requesting and launching taskmanager pods dynamically? *Best Regards,* *Zhenghua Gao* On Fri, Aug 9, 2019 at 9:12 PM Yang Wang <[hidden email]> wrote: > Hi all, > > Currently cloud native architectures has been introduced to many companies > in production. They use kubernetes to run deep learning, web server, etc. > If we could deploy the per-job/session flink cluster on kubernetes to make > it mix-run with other workloads, the cluster resource utilization will be > better. Also many kubernetes users are more easier to have a taste on the > flink. > > By now we have three options to run flink jobs on k8s. > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > standalone cluster on k8s. Use flink run to submit job to the existed flink > cluster. Some companies may have their own deploy system to manage the > flink cluster. > > [2]. Use flink-k8s-operator to manage multiple flink clusters, including > session and perjob. It could manage the complete deployment lifecycle of > the application. I think this option is really easy to use for the k8s > users. They are familiar with k8s-opertor, kubectl and other tools of k8s. > They could debug and run the flink cluster just like other k8s > applications. > > [3]. Natively integration with k8s, use the flink run or > kubernetes-session.sh to start a flink cluster. It is very similar to > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to > k8s api server to start a flink master deployment of 1. > KubernetesResourceManager dynamically allocates resource from k8s to start > task manager as demand. This option is very easy for flink users to get > started. In the simplest case, we just need to update the '-m yarn-cluster' > to -m '-m kubernetes-cluster'. > > We have make an internal implementation of option [3] and use it in > production. After fully tested, we hope to contribute it to the community. > Now we want to get some feedbacks about the three options. Any comments are > welcome. > > > > What do we need to prepare when start a flink cluster on k8s using native > integration? > > Download the flink release binary and create the ~/.kube/config file > corresponding to the k8s cluster. It is all what you need. > > > > Flink Session cluster > > * start a session cluster > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm flink-session-example > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > * You will get an address to submit job, specify it through ’-ksa’ option > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > Flink Job Cluster > > * running with official flink image > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 > -ki flink:latest examples/streaming/WindowJoin.jar > > * running with user image > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > [1]. > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > [2].https://github.com/lyft/flinkk8soperator > > [3]. > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > |
Till had already summed it up, but I want to emphasize that Flink as
project only needs to provide #1 (reactive mode) and #3 (active mode, which necessarily is tied to the cluster manager of choice). The latter would be needed for Flink jobs to be elastic (in the future), although we may want to discuss how such capability can be made easier with #1 as well. For users #1 alone is of little value, since they need to solve their deployment problem. So it will be good to list options such as the Lyft Flink k8s operator on the ecosystem page and possibly point to that from the Flink documentation as well. I also want to point out that #3, while it looks easy to start with, has an important limitation when it comes to manage long running streaming applications. Such application essentially will be a sequence of jobs that come and go across stateful upgrades or rollbacks. Any solution that is designed to manage a single Flink job instance can't address that need. That is why the k8s operator was created. It specifically understands the concept of an application. Thomas On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao <[hidden email]> wrote: > Thanks Yang for bringing this up. I think option1 is very useful for early > adopters. > People do not know much about k8s and can easily set up on minikube to have > a taste. > > For option2 and option3, i prefer option3 because i am familiar yarn and > don't have much concept of k8s. > And there is some doube about starting a session cluster in option3: > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > flink-session-example > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > Is the -n option means number of TaskManager? > Do we pre-running taskmanager pods or requesting and launching taskmanager > pods dynamically? > > *Best Regards,* > *Zhenghua Gao* > > > On Fri, Aug 9, 2019 at 9:12 PM Yang Wang <[hidden email]> wrote: > > > Hi all, > > > > Currently cloud native architectures has been introduced to many > companies > > in production. They use kubernetes to run deep learning, web server, etc. > > If we could deploy the per-job/session flink cluster on kubernetes to > make > > it mix-run with other workloads, the cluster resource utilization will be > > better. Also many kubernetes users are more easier to have a taste on the > > flink. > > > > By now we have three options to run flink jobs on k8s. > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > standalone cluster on k8s. Use flink run to submit job to the existed > flink > > cluster. Some companies may have their own deploy system to manage the > > flink cluster. > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, including > > session and perjob. It could manage the complete deployment lifecycle of > > the application. I think this option is really easy to use for the k8s > > users. They are familiar with k8s-opertor, kubectl and other tools of > k8s. > > They could debug and run the flink cluster just like other k8s > > applications. > > > > [3]. Natively integration with k8s, use the flink run or > > kubernetes-session.sh to start a flink cluster. It is very similar to > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks to > > k8s api server to start a flink master deployment of 1. > > KubernetesResourceManager dynamically allocates resource from k8s to > start > > task manager as demand. This option is very easy for flink users to get > > started. In the simplest case, we just need to update the '-m > yarn-cluster' > > to -m '-m kubernetes-cluster'. > > > > We have make an internal implementation of option [3] and use it in > > production. After fully tested, we hope to contribute it to the > community. > > Now we want to get some feedbacks about the three options. Any comments > are > > welcome. > > > > > > > What do we need to prepare when start a flink cluster on k8s using > native > > integration? > > > > Download the flink release binary and create the ~/.kube/config file > > corresponding to the k8s cluster. It is all what you need. > > > > > > > Flink Session cluster > > > > * start a session cluster > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > flink-session-example > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > * You will get an address to submit job, specify it through ’-ksa’ > option > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-session-example > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > > > > Flink Job Cluster > > > > * running with official flink image > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 > > -ki flink:latest examples/streaming/WindowJoin.jar > > > > * running with user image > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm flink-perjob-example-1 > > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > > > > > [1]. > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > > > [2].https://github.com/lyft/flinkk8soperator > > > > [3]. > > > > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > > > |
Hi Zhenghua,
You are right. For per-job cluster, the taskmanagers will be allocated dynamically by KubernetesResourceManager. For session cluster, we hope taskmangers could be pre-allocated even though it does not work now. Please navigate to the doc[1] for more details. Hi Thomas, We have no doubt that flink only need to support #1 and #3. For #1, we need external deployment management tools to make it in production. I also think kubernetes operator is good choice. It makes managing multiple flink jobs and long running streaming applications easier. Also in some companies, they have their own flink job management platform. Platform users submit flink job through webui. Update the flink configuration and restart the the job. For #3, we just want to make it possible to start flink job cluster and session cluster through cli. These users who used to run flink workloads on yarn are very convenient to migrate to kubernetes cluster. Compared to #1, the dynamic resource allocation is an important advantage. Maybe it could also be introduced to #1 in the future by some way. [1]. https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing Thomas Weise <[hidden email]> 于2019年8月29日周四 下午10:24写道: > Till had already summed it up, but I want to emphasize that Flink as > project only needs to provide #1 (reactive mode) and #3 (active mode, which > necessarily is tied to the cluster manager of choice). The latter would be > needed for Flink jobs to be elastic (in the future), although we may want > to discuss how such capability can be made easier with #1 as well. > > For users #1 alone is of little value, since they need to solve their > deployment problem. So it will be good to list options such as the Lyft > Flink k8s operator on the ecosystem page and possibly point to that from > the Flink documentation as well. > > I also want to point out that #3, while it looks easy to start with, has an > important limitation when it comes to manage long running streaming > applications. Such application essentially will be a sequence of jobs that > come and go across stateful upgrades or rollbacks. Any solution that is > designed to manage a single Flink job instance can't address that need. > That is why the k8s operator was created. It specifically understands the > concept of an application. > > Thomas > > > On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao <[hidden email]> wrote: > > > Thanks Yang for bringing this up. I think option1 is very useful for > early > > adopters. > > People do not know much about k8s and can easily set up on minikube to > have > > a taste. > > > > For option2 and option3, i prefer option3 because i am familiar yarn and > > don't have much concept of k8s. > > And there is some doube about starting a session cluster in option3: > > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > > flink-session-example > > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > Is the -n option means number of TaskManager? > > Do we pre-running taskmanager pods or requesting and launching > taskmanager > > pods dynamically? > > > > *Best Regards,* > > *Zhenghua Gao* > > > > > > On Fri, Aug 9, 2019 at 9:12 PM Yang Wang <[hidden email]> wrote: > > > > > Hi all, > > > > > > Currently cloud native architectures has been introduced to many > > companies > > > in production. They use kubernetes to run deep learning, web server, > etc. > > > If we could deploy the per-job/session flink cluster on kubernetes to > > make > > > it mix-run with other workloads, the cluster resource utilization will > be > > > better. Also many kubernetes users are more easier to have a taste on > the > > > flink. > > > > > > By now we have three options to run flink jobs on k8s. > > > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > > standalone cluster on k8s. Use flink run to submit job to the existed > > flink > > > cluster. Some companies may have their own deploy system to manage the > > > flink cluster. > > > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, > including > > > session and perjob. It could manage the complete deployment lifecycle > of > > > the application. I think this option is really easy to use for the k8s > > > users. They are familiar with k8s-opertor, kubectl and other tools of > > k8s. > > > They could debug and run the flink cluster just like other k8s > > > applications. > > > > > > [3]. Natively integration with k8s, use the flink run or > > > kubernetes-session.sh to start a flink cluster. It is very similar to > > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks > to > > > k8s api server to start a flink master deployment of 1. > > > KubernetesResourceManager dynamically allocates resource from k8s to > > start > > > task manager as demand. This option is very easy for flink users to get > > > started. In the simplest case, we just need to update the '-m > > yarn-cluster' > > > to -m '-m kubernetes-cluster'. > > > > > > We have make an internal implementation of option [3] and use it in > > > production. After fully tested, we hope to contribute it to the > > community. > > > Now we want to get some feedbacks about the three options. Any comments > > are > > > welcome. > > > > > > > > > > What do we need to prepare when start a flink cluster on k8s using > > native > > > integration? > > > > > > Download the flink release binary and create the ~/.kube/config file > > > corresponding to the k8s cluster. It is all what you need. > > > > > > > > > > Flink Session cluster > > > > > > * start a session cluster > > > > > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm > > flink-session-example > > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT > > > > > > * You will get an address to submit job, specify it through ’-ksa’ > > option > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-session-example > > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar > > > > > > > > > > Flink Job Cluster > > > > > > * running with official flink image > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-perjob-example-1 > > > -ki flink:latest examples/streaming/WindowJoin.jar > > > > > > * running with user image > > > > > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm > flink-perjob-example-1 > > > -ki flink-user:latest examples/streaming/WindowJoin.jar > > > > > > > > > > > > [1]. > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html > > > > > > [2].https://github.com/lyft/flinkk8soperator > > > > > > [3]. > > > > > > > > > https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# > > > > > > |
Hi dev and users,
I just want to revive this discussion because we have some meaningful progress about kubernetes native integration. I have made a draft implementation to complete the poc. Cli and submission are both working as expected. The design doc[1] has been updated, including the detailed submission progress, the cli and yaml user interface and the implementation plan. All comments and suggestions are welcome. BTW, we have made a speech at alibaba apsara conference last friday in "Big Data Ecosystem" session[2]. And we heard that many companies and users are planning to migrate their big data workloads to kubernetes cluster. Through mixed-run with online services, they could get better resource utilization and reduce the cost. Flink, as an important case, the dynamical resource allocation is the basic requirement. That's why we want to move the progress more faster. Best, Yang [1]. https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing [2]. https://www.alibabacloud.com/zh/apsara-conference-2019?spm=a2c4e.11165380.1395221.13 Yang Wang <[hidden email]> 于2019年8月30日周五 下午2:23写道: > Hi Zhenghua, > > You are right. For per-job cluster, the taskmanagers will be allocated > > dynamically by KubernetesResourceManager. For session cluster, we hope > > taskmangers could be pre-allocated even though it does not work now. > > Please navigate to the doc[1] for more details. > > > > > Hi Thomas, > > We have no doubt that flink only need to support #1 and #3. For #1, > > we need external deployment management tools to make it in production. > > I also think kubernetes operator is good choice. It makes managing > multiple > > flink jobs and long running streaming applications easier. > > > Also in some companies, they have their own flink job management platform. > > Platform users submit flink job through webui. Update the flink > configuration > > and restart the the job. > > > For #3, we just want to make it possible to start flink job cluster and > session > > cluster through cli. These users who used to run flink workloads on yarn > are > > very convenient to migrate to kubernetes cluster. Compared to #1, the > dynamic > > resource allocation is an important advantage. Maybe it could also be > introduced > > to #1 in the future by some way. > > > > > [1]. > https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing > > Thomas Weise <[hidden email]> 于2019年8月29日周四 下午10:24写道: > >> Till had already summed it up, but I want to emphasize that Flink as >> project only needs to provide #1 (reactive mode) and #3 (active mode, >> which >> necessarily is tied to the cluster manager of choice). The latter would be >> needed for Flink jobs to be elastic (in the future), although we may want >> to discuss how such capability can be made easier with #1 as well. >> >> For users #1 alone is of little value, since they need to solve their >> deployment problem. So it will be good to list options such as the Lyft >> Flink k8s operator on the ecosystem page and possibly point to that from >> the Flink documentation as well. >> >> I also want to point out that #3, while it looks easy to start with, has >> an >> important limitation when it comes to manage long running streaming >> applications. Such application essentially will be a sequence of jobs that >> come and go across stateful upgrades or rollbacks. Any solution that is >> designed to manage a single Flink job instance can't address that need. >> That is why the k8s operator was created. It specifically understands the >> concept of an application. >> >> Thomas >> >> >> On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao <[hidden email]> wrote: >> >> > Thanks Yang for bringing this up. I think option1 is very useful for >> early >> > adopters. >> > People do not know much about k8s and can easily set up on minikube to >> have >> > a taste. >> > >> > For option2 and option3, i prefer option3 because i am familiar yarn and >> > don't have much concept of k8s. >> > And there is some doube about starting a session cluster in option3: >> > >> > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm >> > flink-session-example >> > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT >> > >> > Is the -n option means number of TaskManager? >> > Do we pre-running taskmanager pods or requesting and launching >> taskmanager >> > pods dynamically? >> > >> > *Best Regards,* >> > *Zhenghua Gao* >> > >> > >> > On Fri, Aug 9, 2019 at 9:12 PM Yang Wang <[hidden email]> wrote: >> > >> > > Hi all, >> > > >> > > Currently cloud native architectures has been introduced to many >> > companies >> > > in production. They use kubernetes to run deep learning, web server, >> etc. >> > > If we could deploy the per-job/session flink cluster on kubernetes to >> > make >> > > it mix-run with other workloads, the cluster resource utilization >> will be >> > > better. Also many kubernetes users are more easier to have a taste on >> the >> > > flink. >> > > >> > > By now we have three options to run flink jobs on k8s. >> > > >> > > [1]. Create jm/tm/service yaml and apply, then you will get a flink >> > > standalone cluster on k8s. Use flink run to submit job to the existed >> > flink >> > > cluster. Some companies may have their own deploy system to manage the >> > > flink cluster. >> > > >> > > [2]. Use flink-k8s-operator to manage multiple flink clusters, >> including >> > > session and perjob. It could manage the complete deployment lifecycle >> of >> > > the application. I think this option is really easy to use for the k8s >> > > users. They are familiar with k8s-opertor, kubectl and other tools of >> > k8s. >> > > They could debug and run the flink cluster just like other k8s >> > > applications. >> > > >> > > [3]. Natively integration with k8s, use the flink run or >> > > kubernetes-session.sh to start a flink cluster. It is very similar to >> > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor >> talks to >> > > k8s api server to start a flink master deployment of 1. >> > > KubernetesResourceManager dynamically allocates resource from k8s to >> > start >> > > task manager as demand. This option is very easy for flink users to >> get >> > > started. In the simplest case, we just need to update the '-m >> > yarn-cluster' >> > > to -m '-m kubernetes-cluster'. >> > > >> > > We have make an internal implementation of option [3] and use it in >> > > production. After fully tested, we hope to contribute it to the >> > community. >> > > Now we want to get some feedbacks about the three options. Any >> comments >> > are >> > > welcome. >> > > >> > > >> > > > What do we need to prepare when start a flink cluster on k8s using >> > native >> > > integration? >> > > >> > > Download the flink release binary and create the ~/.kube/config file >> > > corresponding to the k8s cluster. It is all what you need. >> > > >> > > >> > > > Flink Session cluster >> > > >> > > * start a session cluster >> > > >> > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm >> > flink-session-example >> > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT >> > > >> > > * You will get an address to submit job, specify it through ’-ksa’ >> > option >> > > >> > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm >> flink-session-example >> > > -ksa {x.x.x.x:12345} examples/streaming/WindowJoin.jar >> > > >> > > >> > > > Flink Job Cluster >> > > >> > > * running with official flink image >> > > >> > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm >> flink-perjob-example-1 >> > > -ki flink:latest examples/streaming/WindowJoin.jar >> > > >> > > * running with user image >> > > >> > > ./bin/flink run -d -p 4 -m kubernetes-cluster -knm >> flink-perjob-example-1 >> > > -ki flink-user:latest examples/streaming/WindowJoin.jar >> > > >> > > >> > > >> > > [1]. >> > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html >> > > >> > > [2].https://github.com/lyft/flinkk8soperator >> > > >> > > [3]. >> > > >> > > >> > >> https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit# >> > > >> > >> > |
Free forum by Nabble | Edit this page |