Not able to run beam appliction on Flink cluster environment

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Not able to run beam appliction on Flink cluster environment

P. Ramanjaneya Reddy
Hi All,

We followed the steps mentinoned in below link to setup flink cluster
(Standalone)
*https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cluster_setup.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cluster_setup.html>*


In the same cluster we are able to run the flink wordcount example, but the
beam wordcount execution gives below error

*commandline execution:*
root1@master:~/Projects/beam/examples/java/target$      *mvn package
exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \*
*     -Dexec.args="--runner=FlinkRunner --flinkMaster="192.168.56.1:6123
<http://192.168.56.1:6123>" --filesToStage=target/word-count-beam-0.1.jar \*
*                  --inputFile=/home/root1/temp/input.txt
--output=/home/root1/temp/output.txt" -Pflink-runner*

*Logs:*
NFO: Submitting job with JobID: 9edd3c2e1d318da5d3ffda1cdefa52c7. Waiting
for job completion.
Submitting job with JobID: 9edd3c2e1d318da5d3ffda1cdefa52c7. Waiting for
job completion.
Aug 22, 2017 2:56:05 PM
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
INFO: Starting client actor system.
Aug 22, 2017 2:56:05 PM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
applyOrElse
INFO: Slf4jLogger started
Aug 22, 2017 2:56:05 PM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Starting remoting
Aug 22, 2017 2:56:06 PM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Remoting started; listening on addresses :[akka.tcp://flink@master
:44871]
Aug 22, 2017 2:56:06 PM org.apache.flink.runtime.client.JobClientActor
disconnectFromJobManager
*INFO: Disconnect from JobManager null.*
Aug 22, 2017 2:56:06 PM org.apache.flink.runtime.client.JobClientActor
handleMessage
INFO: Received SubmitJobAndWait(JobGraph(jobId:
9edd3c2e1d318da5d3ffda1cdefa52c7)) but there is no connection to a
JobManager yet.
Aug 22, 2017 2:56:06 PM
org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
INFO: Received job wordcount-root1-0822092604-654fbb92
(9edd3c2e1d318da5d3ffda1cdefa52c7).
Aug 22, 2017 2:57:06 PM org.apache.flink.runtime.client.JobClientActor
terminate
INFO: Terminate JobClientActor.
Aug 22, 2017 2:57:06 PM org.apache.flink.runtime.client.JobClientActor
disconnectFromJobManager
INFO: Disconnect from JobManager null.
Aug 22, 2017 2:57:06 PM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Shutting down remote daemon.
Aug 22, 2017 2:57:06 PM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Remote daemon shut down; proceeding with flushing remote transports.
Aug 22, 2017 2:57:06 PM
akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
apply$mcV$sp
INFO: Remoting shut down.
Aug 22, 2017 2:57:06 PM org.apache.beam.runners.flink.FlinkRunner run
SEVERE: Pipeline execution failed
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Couldn't retrieve the JobExecutionResult from the
JobManager.
    at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
    at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
    at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
    at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)
    at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:362)
    at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
    at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
    at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
    at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:112)
    at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
    at org.apache.beam.examples.WordCount.main(WordCount.java:184)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't
retrieve the JobExecutionResult from the JobManager.
    at
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:294)
    at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:382)
    at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
    ... 18 more
Caused by:
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException:
Lost connection to the JobManager.
    at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207)
    at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
    at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
    at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Thanks & Regards,
Ramanji.
Reply | Threaded
Open this post in threaded view
|

Re: Not able to run beam appliction on Flink cluster environment

Till Rohrmann
Hi Ramanji,

do you have the logs of the Flink master running at 192.168.56.1:6123?

Cheers,
Till

On Tue, Aug 22, 2017 at 2:43 PM, P. Ramanjaneya Reddy <[hidden email]>
wrote:

> Hi All,
>
> We followed the steps mentinoned in below link to setup flink cluster
> (Standalone)
> *https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/cluster_setup.html
> <https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/cluster_setup.html>*
>
>
> In the same cluster we are able to run the flink wordcount example, but the
> beam wordcount execution gives below error
>
> *commandline execution:*
> root1@master:~/Projects/beam/examples/java/target$      *mvn package
> exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \*
> *     -Dexec.args="--runner=FlinkRunner --flinkMaster="192.168.56.1:6123
> <http://192.168.56.1:6123>" --filesToStage=target/word-count-beam-0.1.jar
> \*
> *                  --inputFile=/home/root1/temp/input.txt
> --output=/home/root1/temp/output.txt" -Pflink-runner*
>
> *Logs:*
> NFO: Submitting job with JobID: 9edd3c2e1d318da5d3ffda1cdefa52c7. Waiting
> for job completion.
> Submitting job with JobID: 9edd3c2e1d318da5d3ffda1cdefa52c7. Waiting for
> job completion.
> Aug 22, 2017 2:56:05 PM
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
> INFO: Starting client actor system.
> Aug 22, 2017 2:56:05 PM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
> applyOrElse
> INFO: Slf4jLogger started
> Aug 22, 2017 2:56:05 PM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Starting remoting
> Aug 22, 2017 2:56:06 PM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Remoting started; listening on addresses :[akka.tcp://flink@master
> :44871]
> Aug 22, 2017 2:56:06 PM org.apache.flink.runtime.client.JobClientActor
> disconnectFromJobManager
> *INFO: Disconnect from JobManager null.*
> Aug 22, 2017 2:56:06 PM org.apache.flink.runtime.client.JobClientActor
> handleMessage
> INFO: Received SubmitJobAndWait(JobGraph(jobId:
> 9edd3c2e1d318da5d3ffda1cdefa52c7)) but there is no connection to a
> JobManager yet.
> Aug 22, 2017 2:56:06 PM
> org.apache.flink.runtime.client.JobSubmissionClientActor
> handleCustomMessage
> INFO: Received job wordcount-root1-0822092604-654fbb92
> (9edd3c2e1d318da5d3ffda1cdefa52c7).
> Aug 22, 2017 2:57:06 PM org.apache.flink.runtime.client.JobClientActor
> terminate
> INFO: Terminate JobClientActor.
> Aug 22, 2017 2:57:06 PM org.apache.flink.runtime.client.JobClientActor
> disconnectFromJobManager
> INFO: Disconnect from JobManager null.
> Aug 22, 2017 2:57:06 PM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Shutting down remote daemon.
> Aug 22, 2017 2:57:06 PM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Remote daemon shut down; proceeding with flushing remote transports.
> Aug 22, 2017 2:57:06 PM
> akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> apply$mcV$sp
> INFO: Remoting shut down.
> Aug 22, 2017 2:57:06 PM org.apache.beam.runners.flink.FlinkRunner run
> SEVERE: Pipeline execution failed
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Couldn't retrieve the JobExecutionResult from the
> JobManager.
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>     at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:101)
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:362)
>     at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(
> RemoteExecutor.java:211)
>     at
> org.apache.flink.client.RemoteExecutor.executePlan(
> RemoteExecutor.java:188)
>     at
> org.apache.flink.api.java.RemoteEnvironment.execute(
> RemoteEnvironment.java:172)
>     at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm
> ent.executePipeline(FlinkPipelineExecutionEnvironment.java:112)
>     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>     at org.apache.beam.examples.WordCount.main(WordCount.java:184)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't
> retrieve the JobExecutionResult from the JobManager.
>     at
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:
> 294)
>     at
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.
> java:382)
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
>     ... 18 more
> Caused by:
> org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException:
> Lost connection to the JobManager.
>     at
> org.apache.flink.runtime.client.JobClientActor.
> handleMessage(JobClientActor.java:207)
>     at
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(
> FlinkUntypedActor.java:88)
>     at
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(
> FlinkUntypedActor.java:68)
>     at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
> UntypedActor.scala:167)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
>
> Thanks & Regards,
> Ramanji.
>
Reply | Threaded
Open this post in threaded view
|

Re: Not able to run beam appliction on Flink cluster environment

P. Ramanjaneya Reddy
Hi Till,

I'm not sure as i'm following correct steps...there is no log file
generated on flink cluster.

1. followed the steps mentinoned in below link to setup flink cluster
(Standalone)

https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/cluster_setup.html


Then

2.run beam word count on beam apche.

*Beam application run: (gives error for beam wordcount)*
root1@master:~*/Projects/beam/examples/jav*a$ mvn package exec:java
-Dexec.mainClass=org.apache.beam.examples.WordCount
 -Dexec.args="--runner=FlinkRunner --flinkMaster=192.168.56.1:6123
--filesToStage=target/beam-examples-java-2.2.0-SNAPSHOT.jar \
                  --inputFile=/home/root1/temp/input.txt
 --output=/home/root1/temp/output.txt" -Pflink-runner -Dcheckstyle.skip


*Cluster running:  (same cluster we are able to run the flink wordcount
example)*
root1@master:*~/NAI/Tools/BEAM/Flink_Cluster/rama/flink*$ ll
total 276
drwxrwxr-x 10 root1 root1   4096 Aug  7 17:57 ./
drwxrwxr-x  3 root1 root1   4096 Aug  7 13:27 ../
drwxrwxr-x  2 root1 root1   4096 Aug  3 16:50 bin/
drwxrwxr-x  2 root1 root1   4096 Aug 21 16:18 conf/
drwxrwxr-x  6 root1 root1   4096 Aug  3 16:50 examples/
-rw-rw-r--  1 root1 root1 210708 Aug  7 17:57 hamlet.txt
drwxrwxr-x  2 root1 root1   4096 Aug  3 16:50 lib/
-rw-r--r--  1 root1 root1  18196 Jul 28 20:10 LICENSE
drwxrwxr-x  2 root1 root1   4096 Aug 23 09:24 log/
-rw-r--r--  1 root1 root1    779 Jul 28 20:10 NOTICE
drwxrwxr-x  2 root1 root1   4096 Aug  3 16:50 opt/
-rw-r--r--  1 root1 root1   1308 Jul 28 20:10 README.txt
drwxrwxr-x  3 root1 root1   4096 Aug  3 16:50 resources/
drwxrwxr-x  2 root1 root1   4096 Aug  3 16:50 tools/
root1@master:~/NAI/Tools/BEAM/Flink_Cluster/rama/flink$
root1@master:~/NAI/Tools/BEAM/Flink_Cluster/rama/flink$ cd log/
root1@master:~/NAI/Tools/BEAM/Flink_Cluster/rama/flink/log$ ll
total 8
drwxrwxr-x  2 root1 root1 4096 Aug 23 09:24 ./
drwxrwxr-x 10 root1 root1 4096 Aug  7 17:57 ../
root1@master:~/NAI/Tools/BEAM/Flink_Cluster/rama/flink/log$




On Tue, Aug 22, 2017 at 6:43 PM, Till Rohrmann <[hidden email]> wrote:

> Hi Ramanji,
>
> do you have the logs of the Flink master running at 192.168.56.1:6123?
>
> Cheers,
> Till
>
> On Tue, Aug 22, 2017 at 2:43 PM, P. Ramanjaneya Reddy <
> [hidden email]>
> wrote:
>
> > Hi All,
> >
> > We followed the steps mentinoned in below link to setup flink cluster
> > (Standalone)
> > *https://ci.apache.org/projects/flink/flink-docs-
> > release-1.2/setup/cluster_setup.html
> > <https://ci.apache.org/projects/flink/flink-docs-
> > release-1.2/setup/cluster_setup.html>*
> >
> >
> > In the same cluster we are able to run the flink wordcount example, but
> the
> > beam wordcount execution gives below error
> >
> > *commandline execution:*
> > root1@master:~/Projects/beam/examples/java/target$      *mvn package
> > exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \*
> > *     -Dexec.args="--runner=FlinkRunner --flinkMaster="192.168.56.1:6123
> > <http://192.168.56.1:6123>" --filesToStage=target/word-
> count-beam-0.1.jar
> > \*
> > *                  --inputFile=/home/root1/temp/input.txt
> > --output=/home/root1/temp/output.txt" -Pflink-runner*
> >
> > *Logs:*
> > NFO: Submitting job with JobID: 9edd3c2e1d318da5d3ffda1cdefa52c7.
> Waiting
> > for job completion.
> > Submitting job with JobID: 9edd3c2e1d318da5d3ffda1cdefa52c7. Waiting for
> > job completion.
> > Aug 22, 2017 2:56:05 PM
> > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
> > INFO: Starting client actor system.
> > Aug 22, 2017 2:56:05 PM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
> > applyOrElse
> > INFO: Slf4jLogger started
> > Aug 22, 2017 2:56:05 PM
> > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> > apply$mcV$sp
> > INFO: Starting remoting
> > Aug 22, 2017 2:56:06 PM
> > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> > apply$mcV$sp
> > INFO: Remoting started; listening on addresses :[akka.tcp://flink@master
> > :44871]
> > Aug 22, 2017 2:56:06 PM org.apache.flink.runtime.client.JobClientActor
> > disconnectFromJobManager
> > *INFO: Disconnect from JobManager null.*
> > Aug 22, 2017 2:56:06 PM org.apache.flink.runtime.client.JobClientActor
> > handleMessage
> > INFO: Received SubmitJobAndWait(JobGraph(jobId:
> > 9edd3c2e1d318da5d3ffda1cdefa52c7)) but there is no connection to a
> > JobManager yet.
> > Aug 22, 2017 2:56:06 PM
> > org.apache.flink.runtime.client.JobSubmissionClientActor
> > handleCustomMessage
> > INFO: Received job wordcount-root1-0822092604-654fbb92
> > (9edd3c2e1d318da5d3ffda1cdefa52c7).
> > Aug 22, 2017 2:57:06 PM org.apache.flink.runtime.client.JobClientActor
> > terminate
> > INFO: Terminate JobClientActor.
> > Aug 22, 2017 2:57:06 PM org.apache.flink.runtime.client.JobClientActor
> > disconnectFromJobManager
> > INFO: Disconnect from JobManager null.
> > Aug 22, 2017 2:57:06 PM
> > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> > apply$mcV$sp
> > INFO: Shutting down remote daemon.
> > Aug 22, 2017 2:57:06 PM
> > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> > apply$mcV$sp
> > INFO: Remote daemon shut down; proceeding with flushing remote
> transports.
> > Aug 22, 2017 2:57:06 PM
> > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3
> > apply$mcV$sp
> > INFO: Remoting shut down.
> > Aug 22, 2017 2:57:06 PM org.apache.beam.runners.flink.FlinkRunner run
> > SEVERE: Pipeline execution failed
> > org.apache.flink.client.program.ProgramInvocationException: The program
> > execution failed: Couldn't retrieve the JobExecutionResult from the
> > JobManager.
> >     at
> > org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:427)
> >     at
> > org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> > StandaloneClusterClient.java:101)
> >     at
> > org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:400)
> >     at
> > org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:387)
> >     at
> > org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:362)
> >     at
> > org.apache.flink.client.RemoteExecutor.executePlanWithJars(
> > RemoteExecutor.java:211)
> >     at
> > org.apache.flink.client.RemoteExecutor.executePlan(
> > RemoteExecutor.java:188)
> >     at
> > org.apache.flink.api.java.RemoteEnvironment.execute(
> > RemoteEnvironment.java:172)
> >     at
> > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironm
> > ent.executePipeline(FlinkPipelineExecutionEnvironment.java:112)
> >     at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.
> java:119)
> >     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
> >     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
> >     at org.apache.beam.examples.WordCount.main(WordCount.java:184)
> >     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >     at
> > sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:
> > 62)
> >     at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > DelegatingMethodAccessorImpl.java:43)
> >     at java.lang.reflect.Method.invoke(Method.java:498)
> >     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
> >     at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Couldn't
> > retrieve the JobExecutionResult from the JobManager.
> >     at
> > org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:
> > 294)
> >     at
> > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.
> > java:382)
> >     at
> > org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:423)
> >     ... 18 more
> > Caused by:
> > org.apache.flink.runtime.client.JobClientActorConnectionTimeou
> tException:
> > Lost connection to the JobManager.
> >     at
> > org.apache.flink.runtime.client.JobClientActor.
> > handleMessage(JobClientActor.java:207)
> >     at
> > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(
> > FlinkUntypedActor.java:88)
> >     at
> > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(
> > FlinkUntypedActor.java:68)
> >     at
> > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
> > UntypedActor.scala:167)
> >     at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> >     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> >     at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> >     at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > AbstractDispatcher.scala:397)
> >     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > ForkJoinTask.java:260)
> >     at
> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > runTask(ForkJoinPool.java:1339)
> >     at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >     at
> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > ForkJoinWorkerThread.java:107)
> >
> >
> > Thanks & Regards,
> > Ramanji.
> >
>