Hi,
I am working to allow Zeppelin's flink interpreter to connect an existing yarn cluster. Yarn cluster has started via yarn-session and flink's version is 1.0.0. My approach is to read host and port from .yarn-properties and pass them to IFlinkLoop. Now I am facing an issue with Session ID when I submit a paragraph to yarn cluster. The yarn cluster throws a warning similar to: 2016-04-12 10:14:32,666 WARN org.apache.flink.yarn.YarnJobManager - Discard message LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received leader session ID None. My Zeppelin's paragraph throws a JobClientActorSubmissionTimeoutException, maybe is it due to the missing sessionId? Do I need to pass extra params to connect correctly to the yarn cluster or host and port are enough? Thanks in advance, Andrea |
Hi Andrea,
have you started the Flink Yarn cluster in HA mode? Then the job manager address is stored in ZooKeeper and you have to tell your FlinkILoop that it should retrieve the JobManager address from there. In order to do that you have to set conf.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"), conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "address of your zookeeper cluster") and conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, "flink dir you've set") where conf is the flink configuration object. The values for the different configuration values must match the values specified in the flink-conf.yaml file. You then give the FlinkILoop the conf object. I’m not sure whether you can specify a custom flink configuration in Zeppelin. I think you can only specify a host and port. So either you start you Flink cluster in non-HA mode or you have to patch Zeppelin. Cheers, Till On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella <[hidden email]> wrote: > Hi, > > I am working to allow Zeppelin's flink interpreter to connect an existing > yarn cluster. Yarn cluster has started via yarn-session and flink's version > is 1.0.0. > > My approach is to read host and port from .yarn-properties and pass them to > IFlinkLoop. > Now I am facing an issue with Session ID when I submit a paragraph to yarn > cluster. > The yarn cluster throws a warning similar to: > > 2016-04-12 10:14:32,666 WARN org.apache.flink.yarn.YarnJobManager > - Discard message > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: > 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) > because the expected leader session ID > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received > leader session ID None. > > My Zeppelin's paragraph throws a JobClientActorSubmissionTimeoutException, > maybe is it due to the missing sessionId? Do I need to pass extra params to > connect correctly to the yarn cluster or host and port are enough? > > Thanks in advance, > Andrea > |
Hi Till,
The cluster has started in HA. I already patched Flink interpreter to allow passing the Configuration to FlinkILoop. Neverthless I have to pass host and port to FlinkILoop, there are required from FlinkILoop constructor and I retrieve them from .yarn-properties file. I logged Flink Configuration: INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2} FlinkInterpreter.java[open]:96) - Flink Configuration: { recovery.mode=zookeeper, host=yarn, yarn-properties=/tmp/.yarn-properties-flink, recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181, recovery.zookeeper.path.root=/flink/recovery} and I attach some logs: Error displayed in paragraph of Zeppelin <https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9> JobManager log <https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a> Interpreter/FlinkILoop log <https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7> I was looking Flink shell and it works similar to the interpreter, do it works with HA cluster? Thank you, Andrea 2016-04-14 16:09 GMT+02:00 Till Rohrmann <[hidden email]>: > Hi Andrea, > > have you started the Flink Yarn cluster in HA mode? Then the job manager > address is stored in ZooKeeper and you have to tell your FlinkILoop that it > should retrieve the JobManager address from there. In order to do that you > have to set conf.setString(ConfigConstants.RECOVERY_MODE, > "zookeeper"), conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, > "address of your zookeeper cluster") and > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, > "flink dir you've set") where conf is the flink configuration object. The > values for the different configuration values must match the values > specified in the flink-conf.yaml file. You then give the FlinkILoop the > conf object. > > I’m not sure whether you can specify a custom flink configuration in > Zeppelin. I think you can only specify a host and port. So either you start > you Flink cluster in non-HA mode or you have to patch Zeppelin. > > Cheers, > Till > > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella <[hidden email]> > wrote: > > > Hi, > > > > I am working to allow Zeppelin's flink interpreter to connect an existing > > yarn cluster. Yarn cluster has started via yarn-session and flink's > version > > is 1.0.0. > > > > My approach is to read host and port from .yarn-properties and pass them > to > > IFlinkLoop. > > Now I am facing an issue with Session ID when I submit a paragraph to > yarn > > cluster. > > The yarn cluster throws a warning similar to: > > > > 2016-04-12 10:14:32,666 WARN org.apache.flink.yarn.YarnJobManager > > - Discard message > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: > > 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) > > because the expected leader session ID > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received > > leader session ID None. > > > > My Zeppelin's paragraph throws a > JobClientActorSubmissionTimeoutException, > > maybe is it due to the missing sessionId? Do I need to pass extra params > to > > connect correctly to the yarn cluster or host and port are enough? > > > > Thanks in advance, > > Andrea > > > |
In HA mode, the host and port information you provide to the Shell should
be simply ignored. So you don't have to retrieve them from the .yarn-properties file. Could you maybe run the FlinkInterpreter with debug log level and share the logs with me? You can also do that privately, if you don't want to share them on the mailing list. I haven't tried it myself, but I thought that the Shell also works with an HA cluster, because it uses the same mechanism as the CLI, for example. I'll try it out later this day. Cheers, Till On Fri, Apr 15, 2016 at 12:22 AM, Andrea Sella <[hidden email]> wrote: > Hi Till, > > The cluster has started in HA. > I already patched Flink interpreter to allow passing the Configuration to > FlinkILoop. Neverthless I have to pass host and port to FlinkILoop, there > are required from FlinkILoop constructor and I retrieve them from > .yarn-properties file. > > I logged Flink Configuration: > > INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2} > FlinkInterpreter.java[open]:96) - Flink Configuration: { > recovery.mode=zookeeper, host=yarn, > yarn-properties=/tmp/.yarn-properties-flink, > recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181, > recovery.zookeeper.path.root=/flink/recovery} > > and I attach some logs: > > Error displayed in paragraph of Zeppelin > <https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9> > JobManager log > <https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a> > Interpreter/FlinkILoop log > <https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7> > > I was looking Flink shell and it works similar to the interpreter, do it > works with HA cluster? > > Thank you, > Andrea > > > 2016-04-14 16:09 GMT+02:00 Till Rohrmann <[hidden email]>: > > > Hi Andrea, > > > > have you started the Flink Yarn cluster in HA mode? Then the job manager > > address is stored in ZooKeeper and you have to tell your FlinkILoop that > it > > should retrieve the JobManager address from there. In order to do that > you > > have to set conf.setString(ConfigConstants.RECOVERY_MODE, > > "zookeeper"), conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, > > "address of your zookeeper cluster") and > > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, > > "flink dir you've set") where conf is the flink configuration object. The > > values for the different configuration values must match the values > > specified in the flink-conf.yaml file. You then give the FlinkILoop the > > conf object. > > > > I’m not sure whether you can specify a custom flink configuration in > > Zeppelin. I think you can only specify a host and port. So either you > start > > you Flink cluster in non-HA mode or you have to patch Zeppelin. > > > > Cheers, > > Till > > > > > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella < > [hidden email]> > > wrote: > > > > > Hi, > > > > > > I am working to allow Zeppelin's flink interpreter to connect an > existing > > > yarn cluster. Yarn cluster has started via yarn-session and flink's > > version > > > is 1.0.0. > > > > > > My approach is to read host and port from .yarn-properties and pass > them > > to > > > IFlinkLoop. > > > Now I am facing an issue with Session ID when I submit a paragraph to > > yarn > > > cluster. > > > The yarn cluster throws a warning similar to: > > > > > > 2016-04-12 10:14:32,666 WARN org.apache.flink.yarn.YarnJobManager > > > - Discard message > > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: > > > 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) > > > because the expected leader session ID > > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not equal the received > > > leader session ID None. > > > > > > My Zeppelin's paragraph throws a > > JobClientActorSubmissionTimeoutException, > > > maybe is it due to the missing sessionId? Do I need to pass extra > params > > to > > > connect correctly to the yarn cluster or host and port are enough? > > > > > > Thanks in advance, > > > Andrea > > > > > > |
Hi Andrea,
I think your problem should be fixed with the PRs [1,2]. I've tested it locally on my yarn cluster and it worked. [1] https://github.com/apache/flink/pull/1904 [2] https://github.com/apache/flink/pull/1914 Cheers, Till On Tue, Apr 19, 2016 at 2:16 PM, Till Rohrmann <[hidden email]> wrote: > I think this is another issue you’ve detected. I already spotted some > suspicious code in the yarn deployment section. If I’m not mistaken, then > flink-conf.yaml is read too late and is, thus, not respected. I’ll verify > it and if valid, then I’ll open another issue and fix it. > > Thanks for your patience and thorough reporting. It helps a lot :-) > > Cheers, > Till > > > On Tue, Apr 19, 2016 at 2:12 PM, Andrea Sella <[hidden email]> > wrote: > >> No, I tried it via scala-shell as you can see the attachment. >> >> Regards, >> Andrea >> >> 2016-04-19 14:08 GMT+02:00 Till Rohrmann <[hidden email]>: >> >>> Hi Andrea, >>> >>> thanks for testing it. How did you submit the job this time? Via >>> Zeppelin? >>> >>> Cheers, >>> Till >>> >>> On Tue, Apr 19, 2016 at 12:51 PM, Andrea Sella < >>> [hidden email]> wrote: >>> >>>> Hi Till, >>>> >>>> I've used your branch fixScalaShell to test the scala-shell with our HA >>>> cluster, it doesn't work. Same error as before >>>> >>>> 2016-04-19 06:40:35,030 WARN org.apache.flink.yarn.YarnJobManager >>>> - Discard message >>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>> aa5b034e10a850d863642a24aab75d9c),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>> because the expected leader session ID >>>> Some(bc706707-2bab-4b82-b7a7-1426dce696a7) did not equal the received >>>> leader session ID None. >>>> >>>> If I submit a simple job, it works. I think it is not a problem of our >>>> environment. >>>> >>>> Cheers, >>>> Andrea >>>> >>>> 2016-04-18 18:41 GMT+02:00 Till Rohrmann <[hidden email]>: >>>> >>>>> Cool, that helps a lot :-) >>>>> >>>>> On Mon, Apr 18, 2016 at 6:06 PM, Andrea Sella < >>>>> [hidden email]> wrote: >>>>> >>>>>> Hi Till, >>>>>> >>>>>> Don't worry, I am going to test the PR in our HA environment. >>>>>> >>>>>> Cheers, >>>>>> Andrea >>>>>> >>>>>> >>>>>> 2016-04-18 17:46 GMT+02:00 Till Rohrmann <[hidden email]>: >>>>>> >>>>>>> Hi Andrea, >>>>>>> >>>>>>> sorry I've seen your mail too late. I already fixed the problem and >>>>>>> opened a PR [1] for it. I hope you haven't invested too much time for it, >>>>>>> yet. >>>>>>> >>>>>>> [1] https://github.com/apache/flink/pull/1904 >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Mon, Apr 18, 2016 at 11:19 AM, Andrea Sella < >>>>>>> [hidden email]> wrote: >>>>>>> >>>>>>>> Hi Till, >>>>>>>> Thanks for the support, I will take the issue and starting to work >>>>>>>> on it asap. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Andrea >>>>>>>> >>>>>>>> 2016-04-18 10:32 GMT+02:00 Till Rohrmann <[hidden email]>: >>>>>>>> >>>>>>>>> Hi Andrea, >>>>>>>>> >>>>>>>>> I think the problem is simply that it has not been correctly >>>>>>>>> implemented. I just checked and I think the user configuration is not given >>>>>>>>> to the PlanExecutor which is internally created. I’ve opened an >>>>>>>>> issue for that [1]. >>>>>>>>> >>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-3774 >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 15, 2016 at 4:58 PM, Andrea Sella < >>>>>>>>> [hidden email]> wrote: >>>>>>>>> >>>>>>>>>> Hi Till, >>>>>>>>>> >>>>>>>>>> I've tried the Scala-Shell with our HA cluster, no luck again. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Andrea >>>>>>>>>> >>>>>>>>>> 2016-04-15 14:43 GMT+02:00 Andrea Sella < >>>>>>>>>> [hidden email]>: >>>>>>>>>> >>>>>>>>>>> Hi Till, >>>>>>>>>>> >>>>>>>>>>> I am using a branched version of 1.0.1 where I cherry-picked >>>>>>>>>>> FLINK-2935 >>>>>>>>>>> <https://github.com/radicalbit/flink/commit/dfbbb9e48c98b486baf279c396d1bf7de31c1f8c> to >>>>>>>>>>> use FlinkILoop with Configuration. My Flink interpreter is here >>>>>>>>>>> <https://github.com/radicalbit/incubator-zeppelin/blob/flink-yarn-interpreter/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java>, >>>>>>>>>>> I've started tweaking just two days ago and as I can see there is a >>>>>>>>>>> Zeppelin issue >>>>>>>>>>> <https://issues.apache.org/jira/browse/ZEPPELIN-664> to provide >>>>>>>>>>> FlinkInterpeter working with Yarn and I need it too. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Andrea >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2016-04-15 14:20 GMT+02:00 Till Rohrmann <[hidden email]>: >>>>>>>>>>> >>>>>>>>>>>> Hi Andrea, >>>>>>>>>>>> >>>>>>>>>>>> which version of Flink are you using with Zeppelin? How do you >>>>>>>>>>>> pass the Flink configuration to the FlinkILoop? Could you maybe show me >>>>>>>>>>>> your version of Zeppelin (code). >>>>>>>>>>>> >>>>>>>>>>>> According to the log, the ScalaShellRemoteEnvironment didn't >>>>>>>>>>>> get the Flink configuration with the HA settings. Therefore, it still tries >>>>>>>>>>>> to connect to the jobmanager specified by the host and port values. The >>>>>>>>>>>> functionality to pass in a Flink configuration object to FlinkILoop has >>>>>>>>>>>> only been merged recently. You might have to switch to the 1.1-SNAPSHOT >>>>>>>>>>>> version for that. This means that you would have to update the Flink >>>>>>>>>>>> version in your Zeppelin branch to 1.1-SNAPSHOT to make it work. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Till >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Apr 15, 2016 at 1:03 PM, Andrea Sella < >>>>>>>>>>>> [hidden email]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks to follow me with this issue :) >>>>>>>>>>>>> >>>>>>>>>>>>> Here the logs >>>>>>>>>>>>> <https://gist.github.com/alkagin/663fae1fc2993f0acd3ba66697f14093>, >>>>>>>>>>>>> are there enough? >>>>>>>>>>>>> >>>>>>>>>>>>> As I wrote in the previous mail, in the logs you can see also >>>>>>>>>>>>> the Configuration. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Andrea >>>>>>>>>>>>> >>>>>>>>>>>>> 2016-04-15 10:07 GMT+02:00 Till Rohrmann <[hidden email] >>>>>>>>>>>>> >: >>>>>>>>>>>>> >>>>>>>>>>>>>> In HA mode, the host and port information you provide to the >>>>>>>>>>>>>> Shell should >>>>>>>>>>>>>> be simply ignored. So you don't have to retrieve them from the >>>>>>>>>>>>>> .yarn-properties file. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Could you maybe run the FlinkInterpreter with debug log level >>>>>>>>>>>>>> and share the >>>>>>>>>>>>>> logs with me? You can also do that privately, if you don't >>>>>>>>>>>>>> want to share >>>>>>>>>>>>>> them on the mailing list. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I haven't tried it myself, but I thought that the Shell also >>>>>>>>>>>>>> works with an >>>>>>>>>>>>>> HA cluster, because it uses the same mechanism as the CLI, >>>>>>>>>>>>>> for example. >>>>>>>>>>>>>> I'll try it out later this day. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Till >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Apr 15, 2016 at 12:22 AM, Andrea Sella < >>>>>>>>>>>>>> [hidden email]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> > Hi Till, >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > The cluster has started in HA. >>>>>>>>>>>>>> > I already patched Flink interpreter to allow passing the >>>>>>>>>>>>>> Configuration to >>>>>>>>>>>>>> > FlinkILoop. Neverthless I have to pass host and port to >>>>>>>>>>>>>> FlinkILoop, there >>>>>>>>>>>>>> > are required from FlinkILoop constructor and I retrieve >>>>>>>>>>>>>> them from >>>>>>>>>>>>>> > .yarn-properties file. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I logged Flink Configuration: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2} >>>>>>>>>>>>>> > FlinkInterpreter.java[open]:96) - Flink Configuration: { >>>>>>>>>>>>>> > recovery.mode=zookeeper, host=yarn, >>>>>>>>>>>>>> > yarn-properties=/tmp/.yarn-properties-flink, >>>>>>>>>>>>>> > >>>>>>>>>>>>>> recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181, >>>>>>>>>>>>>> > recovery.zookeeper.path.root=/flink/recovery} >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > and I attach some logs: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Error displayed in paragraph of Zeppelin >>>>>>>>>>>>>> > < >>>>>>>>>>>>>> https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9 >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > JobManager log >>>>>>>>>>>>>> > < >>>>>>>>>>>>>> https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Interpreter/FlinkILoop log >>>>>>>>>>>>>> > < >>>>>>>>>>>>>> https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7 >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > I was looking Flink shell and it works similar to the >>>>>>>>>>>>>> interpreter, do it >>>>>>>>>>>>>> > works with HA cluster? >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Thank you, >>>>>>>>>>>>>> > Andrea >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > 2016-04-14 16:09 GMT+02:00 Till Rohrmann < >>>>>>>>>>>>>> [hidden email]>: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > > Hi Andrea, >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > have you started the Flink Yarn cluster in HA mode? Then >>>>>>>>>>>>>> the job manager >>>>>>>>>>>>>> > > address is stored in ZooKeeper and you have to tell your >>>>>>>>>>>>>> FlinkILoop that >>>>>>>>>>>>>> > it >>>>>>>>>>>>>> > > should retrieve the JobManager address from there. In >>>>>>>>>>>>>> order to do that >>>>>>>>>>>>>> > you >>>>>>>>>>>>>> > > have to set conf.setString(ConfigConstants.RECOVERY_MODE, >>>>>>>>>>>>>> > > "zookeeper"), >>>>>>>>>>>>>> conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, >>>>>>>>>>>>>> > > "address of your zookeeper cluster") and >>>>>>>>>>>>>> > > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, >>>>>>>>>>>>>> > > "flink dir you've set") where conf is the flink >>>>>>>>>>>>>> configuration object. The >>>>>>>>>>>>>> > > values for the different configuration values must match >>>>>>>>>>>>>> the values >>>>>>>>>>>>>> > > specified in the flink-conf.yaml file. You then give the >>>>>>>>>>>>>> FlinkILoop the >>>>>>>>>>>>>> > > conf object. >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > I’m not sure whether you can specify a custom flink >>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>> > > Zeppelin. I think you can only specify a host and port. >>>>>>>>>>>>>> So either you >>>>>>>>>>>>>> > start >>>>>>>>>>>>>> > > you Flink cluster in non-HA mode or you have to patch >>>>>>>>>>>>>> Zeppelin. >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > Cheers, >>>>>>>>>>>>>> > > Till >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella < >>>>>>>>>>>>>> > [hidden email]> >>>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > > Hi, >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > I am working to allow Zeppelin's flink interpreter to >>>>>>>>>>>>>> connect an >>>>>>>>>>>>>> > existing >>>>>>>>>>>>>> > > > yarn cluster. Yarn cluster has started via yarn-session >>>>>>>>>>>>>> and flink's >>>>>>>>>>>>>> > > version >>>>>>>>>>>>>> > > > is 1.0.0. >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > My approach is to read host and port from >>>>>>>>>>>>>> .yarn-properties and pass >>>>>>>>>>>>>> > them >>>>>>>>>>>>>> > > to >>>>>>>>>>>>>> > > > IFlinkLoop. >>>>>>>>>>>>>> > > > Now I am facing an issue with Session ID when I submit >>>>>>>>>>>>>> a paragraph to >>>>>>>>>>>>>> > > yarn >>>>>>>>>>>>>> > > > cluster. >>>>>>>>>>>>>> > > > The yarn cluster throws a warning similar to: >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > 2016-04-12 10:14:32,666 WARN >>>>>>>>>>>>>> org.apache.flink.yarn.YarnJobManager >>>>>>>>>>>>>> > > > - Discard message >>>>>>>>>>>>>> > > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>>>>>>>>>>>> > > > because the expected leader session ID >>>>>>>>>>>>>> > > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not >>>>>>>>>>>>>> equal the received >>>>>>>>>>>>>> > > > leader session ID None. >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > My Zeppelin's paragraph throws a >>>>>>>>>>>>>> > > JobClientActorSubmissionTimeoutException, >>>>>>>>>>>>>> > > > maybe is it due to the missing sessionId? Do I need to >>>>>>>>>>>>>> pass extra >>>>>>>>>>>>>> > params >>>>>>>>>>>>>> > > to >>>>>>>>>>>>>> > > > connect correctly to the yarn cluster or host and port >>>>>>>>>>>>>> are enough? >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > Thanks in advance, >>>>>>>>>>>>>> > > > Andrea >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > |
Hi Till,
It works as expected, thanks! Andrea 2016-04-19 15:25 GMT+02:00 Till Rohrmann <[hidden email]>: > Hi Andrea, > > I think your problem should be fixed with the PRs [1,2]. I've tested it > locally on my yarn cluster and it worked. > > [1] https://github.com/apache/flink/pull/1904 > [2] https://github.com/apache/flink/pull/1914 > > Cheers, > Till > > On Tue, Apr 19, 2016 at 2:16 PM, Till Rohrmann <[hidden email]> > wrote: > >> I think this is another issue you’ve detected. I already spotted some >> suspicious code in the yarn deployment section. If I’m not mistaken, then >> flink-conf.yaml is read too late and is, thus, not respected. I’ll >> verify it and if valid, then I’ll open another issue and fix it. >> >> Thanks for your patience and thorough reporting. It helps a lot :-) >> >> Cheers, >> Till >> >> >> On Tue, Apr 19, 2016 at 2:12 PM, Andrea Sella <[hidden email] >> > wrote: >> >>> No, I tried it via scala-shell as you can see the attachment. >>> >>> Regards, >>> Andrea >>> >>> 2016-04-19 14:08 GMT+02:00 Till Rohrmann <[hidden email]>: >>> >>>> Hi Andrea, >>>> >>>> thanks for testing it. How did you submit the job this time? Via >>>> Zeppelin? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Apr 19, 2016 at 12:51 PM, Andrea Sella < >>>> [hidden email]> wrote: >>>> >>>>> Hi Till, >>>>> >>>>> I've used your branch fixScalaShell to test the scala-shell with our >>>>> HA cluster, it doesn't work. Same error as before >>>>> >>>>> 2016-04-19 06:40:35,030 WARN org.apache.flink.yarn.YarnJobManager >>>>> - Discard message >>>>> LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>>> aa5b034e10a850d863642a24aab75d9c),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>>> because the expected leader session ID >>>>> Some(bc706707-2bab-4b82-b7a7-1426dce696a7) did not equal the received >>>>> leader session ID None. >>>>> >>>>> If I submit a simple job, it works. I think it is not a problem of our >>>>> environment. >>>>> >>>>> Cheers, >>>>> Andrea >>>>> >>>>> 2016-04-18 18:41 GMT+02:00 Till Rohrmann <[hidden email]>: >>>>> >>>>>> Cool, that helps a lot :-) >>>>>> >>>>>> On Mon, Apr 18, 2016 at 6:06 PM, Andrea Sella < >>>>>> [hidden email]> wrote: >>>>>> >>>>>>> Hi Till, >>>>>>> >>>>>>> Don't worry, I am going to test the PR in our HA environment. >>>>>>> >>>>>>> Cheers, >>>>>>> Andrea >>>>>>> >>>>>>> >>>>>>> 2016-04-18 17:46 GMT+02:00 Till Rohrmann <[hidden email]>: >>>>>>> >>>>>>>> Hi Andrea, >>>>>>>> >>>>>>>> sorry I've seen your mail too late. I already fixed the problem and >>>>>>>> opened a PR [1] for it. I hope you haven't invested too much time for it, >>>>>>>> yet. >>>>>>>> >>>>>>>> [1] https://github.com/apache/flink/pull/1904 >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Mon, Apr 18, 2016 at 11:19 AM, Andrea Sella < >>>>>>>> [hidden email]> wrote: >>>>>>>> >>>>>>>>> Hi Till, >>>>>>>>> Thanks for the support, I will take the issue and starting to work >>>>>>>>> on it asap. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Andrea >>>>>>>>> >>>>>>>>> 2016-04-18 10:32 GMT+02:00 Till Rohrmann <[hidden email]>: >>>>>>>>> >>>>>>>>>> Hi Andrea, >>>>>>>>>> >>>>>>>>>> I think the problem is simply that it has not been correctly >>>>>>>>>> implemented. I just checked and I think the user configuration is not given >>>>>>>>>> to the PlanExecutor which is internally created. I’ve opened an >>>>>>>>>> issue for that [1]. >>>>>>>>>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-3774 >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Apr 15, 2016 at 4:58 PM, Andrea Sella < >>>>>>>>>> [hidden email]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Till, >>>>>>>>>>> >>>>>>>>>>> I've tried the Scala-Shell with our HA cluster, no luck again. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Andrea >>>>>>>>>>> >>>>>>>>>>> 2016-04-15 14:43 GMT+02:00 Andrea Sella < >>>>>>>>>>> [hidden email]>: >>>>>>>>>>> >>>>>>>>>>>> Hi Till, >>>>>>>>>>>> >>>>>>>>>>>> I am using a branched version of 1.0.1 where I cherry-picked >>>>>>>>>>>> FLINK-2935 >>>>>>>>>>>> <https://github.com/radicalbit/flink/commit/dfbbb9e48c98b486baf279c396d1bf7de31c1f8c> to >>>>>>>>>>>> use FlinkILoop with Configuration. My Flink interpreter is here >>>>>>>>>>>> <https://github.com/radicalbit/incubator-zeppelin/blob/flink-yarn-interpreter/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java>, >>>>>>>>>>>> I've started tweaking just two days ago and as I can see there is a >>>>>>>>>>>> Zeppelin issue >>>>>>>>>>>> <https://issues.apache.org/jira/browse/ZEPPELIN-664> to >>>>>>>>>>>> provide FlinkInterpeter working with Yarn and I need it too. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Andrea >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 2016-04-15 14:20 GMT+02:00 Till Rohrmann <[hidden email]> >>>>>>>>>>>> : >>>>>>>>>>>> >>>>>>>>>>>>> Hi Andrea, >>>>>>>>>>>>> >>>>>>>>>>>>> which version of Flink are you using with Zeppelin? How do you >>>>>>>>>>>>> pass the Flink configuration to the FlinkILoop? Could you maybe show me >>>>>>>>>>>>> your version of Zeppelin (code). >>>>>>>>>>>>> >>>>>>>>>>>>> According to the log, the ScalaShellRemoteEnvironment didn't >>>>>>>>>>>>> get the Flink configuration with the HA settings. Therefore, it still tries >>>>>>>>>>>>> to connect to the jobmanager specified by the host and port values. The >>>>>>>>>>>>> functionality to pass in a Flink configuration object to FlinkILoop has >>>>>>>>>>>>> only been merged recently. You might have to switch to the 1.1-SNAPSHOT >>>>>>>>>>>>> version for that. This means that you would have to update the Flink >>>>>>>>>>>>> version in your Zeppelin branch to 1.1-SNAPSHOT to make it work. >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Till >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Apr 15, 2016 at 1:03 PM, Andrea Sella < >>>>>>>>>>>>> [hidden email]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Till, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks to follow me with this issue :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here the logs >>>>>>>>>>>>>> <https://gist.github.com/alkagin/663fae1fc2993f0acd3ba66697f14093>, >>>>>>>>>>>>>> are there enough? >>>>>>>>>>>>>> >>>>>>>>>>>>>> As I wrote in the previous mail, in the logs you can see also >>>>>>>>>>>>>> the Configuration. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Andrea >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2016-04-15 10:07 GMT+02:00 Till Rohrmann < >>>>>>>>>>>>>> [hidden email]>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> In HA mode, the host and port information you provide to the >>>>>>>>>>>>>>> Shell should >>>>>>>>>>>>>>> be simply ignored. So you don't have to retrieve them from >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> .yarn-properties file. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Could you maybe run the FlinkInterpreter with debug log >>>>>>>>>>>>>>> level and share the >>>>>>>>>>>>>>> logs with me? You can also do that privately, if you don't >>>>>>>>>>>>>>> want to share >>>>>>>>>>>>>>> them on the mailing list. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I haven't tried it myself, but I thought that the Shell also >>>>>>>>>>>>>>> works with an >>>>>>>>>>>>>>> HA cluster, because it uses the same mechanism as the CLI, >>>>>>>>>>>>>>> for example. >>>>>>>>>>>>>>> I'll try it out later this day. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Apr 15, 2016 at 12:22 AM, Andrea Sella < >>>>>>>>>>>>>>> [hidden email]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > Hi Till, >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > The cluster has started in HA. >>>>>>>>>>>>>>> > I already patched Flink interpreter to allow passing the >>>>>>>>>>>>>>> Configuration to >>>>>>>>>>>>>>> > FlinkILoop. Neverthless I have to pass host and port to >>>>>>>>>>>>>>> FlinkILoop, there >>>>>>>>>>>>>>> > are required from FlinkILoop constructor and I retrieve >>>>>>>>>>>>>>> them from >>>>>>>>>>>>>>> > .yarn-properties file. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > I logged Flink Configuration: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > INFO [2016-04-14 17:52:58,141] ({pool-2-thread-2} >>>>>>>>>>>>>>> > FlinkInterpreter.java[open]:96) - Flink Configuration: { >>>>>>>>>>>>>>> > recovery.mode=zookeeper, host=yarn, >>>>>>>>>>>>>>> > yarn-properties=/tmp/.yarn-properties-flink, >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> recovery.zookeeper.quorum=slave01:2181,slave02:2181,master:2181, >>>>>>>>>>>>>>> > recovery.zookeeper.path.root=/flink/recovery} >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > and I attach some logs: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Error displayed in paragraph of Zeppelin >>>>>>>>>>>>>>> > < >>>>>>>>>>>>>>> https://gist.github.com/alkagin/612d736da8af9ee111e766b230559bb9 >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > JobManager log >>>>>>>>>>>>>>> > < >>>>>>>>>>>>>>> https://gist.github.com/alkagin/0a0b2670ce77f7d9c0807b1e4ef7239a >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Interpreter/FlinkILoop log >>>>>>>>>>>>>>> > < >>>>>>>>>>>>>>> https://gist.github.com/alkagin/23e4cec15904448dd2b400a6a37f7fa7 >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > I was looking Flink shell and it works similar to the >>>>>>>>>>>>>>> interpreter, do it >>>>>>>>>>>>>>> > works with HA cluster? >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Thank you, >>>>>>>>>>>>>>> > Andrea >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > 2016-04-14 16:09 GMT+02:00 Till Rohrmann < >>>>>>>>>>>>>>> [hidden email]>: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > > Hi Andrea, >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > have you started the Flink Yarn cluster in HA mode? Then >>>>>>>>>>>>>>> the job manager >>>>>>>>>>>>>>> > > address is stored in ZooKeeper and you have to tell your >>>>>>>>>>>>>>> FlinkILoop that >>>>>>>>>>>>>>> > it >>>>>>>>>>>>>>> > > should retrieve the JobManager address from there. In >>>>>>>>>>>>>>> order to do that >>>>>>>>>>>>>>> > you >>>>>>>>>>>>>>> > > have to set conf.setString(ConfigConstants.RECOVERY_MODE, >>>>>>>>>>>>>>> > > "zookeeper"), >>>>>>>>>>>>>>> conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, >>>>>>>>>>>>>>> > > "address of your zookeeper cluster") and >>>>>>>>>>>>>>> > > conf.setString(ConfigConstants.ZOOKEEPER_DIR_KEY, >>>>>>>>>>>>>>> > > "flink dir you've set") where conf is the flink >>>>>>>>>>>>>>> configuration object. The >>>>>>>>>>>>>>> > > values for the different configuration values must match >>>>>>>>>>>>>>> the values >>>>>>>>>>>>>>> > > specified in the flink-conf.yaml file. You then give the >>>>>>>>>>>>>>> FlinkILoop the >>>>>>>>>>>>>>> > > conf object. >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > I’m not sure whether you can specify a custom flink >>>>>>>>>>>>>>> configuration in >>>>>>>>>>>>>>> > > Zeppelin. I think you can only specify a host and port. >>>>>>>>>>>>>>> So either you >>>>>>>>>>>>>>> > start >>>>>>>>>>>>>>> > > you Flink cluster in non-HA mode or you have to patch >>>>>>>>>>>>>>> Zeppelin. >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > Cheers, >>>>>>>>>>>>>>> > > Till >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > On Tue, Apr 12, 2016 at 5:12 PM, Andrea Sella < >>>>>>>>>>>>>>> > [hidden email]> >>>>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > > Hi, >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > I am working to allow Zeppelin's flink interpreter to >>>>>>>>>>>>>>> connect an >>>>>>>>>>>>>>> > existing >>>>>>>>>>>>>>> > > > yarn cluster. Yarn cluster has started via >>>>>>>>>>>>>>> yarn-session and flink's >>>>>>>>>>>>>>> > > version >>>>>>>>>>>>>>> > > > is 1.0.0. >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > My approach is to read host and port from >>>>>>>>>>>>>>> .yarn-properties and pass >>>>>>>>>>>>>>> > them >>>>>>>>>>>>>>> > > to >>>>>>>>>>>>>>> > > > IFlinkLoop. >>>>>>>>>>>>>>> > > > Now I am facing an issue with Session ID when I submit >>>>>>>>>>>>>>> a paragraph to >>>>>>>>>>>>>>> > > yarn >>>>>>>>>>>>>>> > > > cluster. >>>>>>>>>>>>>>> > > > The yarn cluster throws a warning similar to: >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > 2016-04-12 10:14:32,666 WARN >>>>>>>>>>>>>>> org.apache.flink.yarn.YarnJobManager >>>>>>>>>>>>>>> > > > - Discard message >>>>>>>>>>>>>>> > > > LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 0b6811bc58d781ddb6f5aac994afd903),EXECUTION_RESULT_AND_STATE_CHANGES)) >>>>>>>>>>>>>>> > > > because the expected leader session ID >>>>>>>>>>>>>>> > > > Some(afc85978-f765-488b-acbb-79c2d7cb89e0) did not >>>>>>>>>>>>>>> equal the received >>>>>>>>>>>>>>> > > > leader session ID None. >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > My Zeppelin's paragraph throws a >>>>>>>>>>>>>>> > > JobClientActorSubmissionTimeoutException, >>>>>>>>>>>>>>> > > > maybe is it due to the missing sessionId? Do I need to >>>>>>>>>>>>>>> pass extra >>>>>>>>>>>>>>> > params >>>>>>>>>>>>>>> > > to >>>>>>>>>>>>>>> > > > connect correctly to the yarn cluster or host and port >>>>>>>>>>>>>>> are enough? >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > Thanks in advance, >>>>>>>>>>>>>>> > > > Andrea >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > |
Free forum by Nabble | Edit this page |