Apache Flink:ProgramInvocationException on Yarn

classic Classic list List threaded Threaded
30 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Apache Flink:ProgramInvocationException on Yarn

hanan meyer
Hello All

When using Eclipse IDE to submit Flink to Yarn single node cluster I'm
getting :
"org.apache.flink.client.program.ProgramInvocationException: Failed to
resolve JobManager"

Using Flink 0.9.0

The Jar copy a file from one location in Hdfs to another and works fine
while executed locally on the single node Yarn cluster -
bin/flink run -c Test ./examples/MyJar.jar
hdfs://localhost:9000/flink/in.txt hdfs://localhost:9000/flink/out.txt

The code skeleton:

    ExecutionEnvironment envRemote =
ExecutionEnvironment.createRemoteEnvironment
(FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
DataSet<String> data =
envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
data.writeAsText("hdfs://localhost:9000/flink/out.txt");
envRemote.execute();


Please advise,

Hanan Meyer
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Robert Metzger
Hi,

Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
Every time you deploy Flink on YARN, the host and port change, because the
JobManager is started on a different YARN container.


On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]> wrote:

> Hello All
>
> When using Eclipse IDE to submit Flink to Yarn single node cluster I'm
> getting :
> "org.apache.flink.client.program.ProgramInvocationException: Failed to
> resolve JobManager"
>
> Using Flink 0.9.0
>
> The Jar copy a file from one location in Hdfs to another and works fine
> while executed locally on the single node Yarn cluster -
> bin/flink run -c Test ./examples/MyJar.jar
> hdfs://localhost:9000/flink/in.txt hdfs://localhost:9000/flink/out.txt
>
> The code skeleton:
>
>     ExecutionEnvironment envRemote =
> ExecutionEnvironment.createRemoteEnvironment
> (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> DataSet<String> data =
> envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> envRemote.execute();
>
>
> Please advise,
>
> Hanan Meyer
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Stephan Ewen
If you start the job via the "bin/flink" script, then simply use
"ExecutionEnvironment.getExecutionEnvironment()" rather then creating a
remote environment manually.

That way, hosts and ports are configured automatically.

On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <[hidden email]> wrote:

> Hi,
>
> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> Every time you deploy Flink on YARN, the host and port change, because the
> JobManager is started on a different YARN container.
>
>
> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]> wrote:
>
> > Hello All
> >
> > When using Eclipse IDE to submit Flink to Yarn single node cluster I'm
> > getting :
> > "org.apache.flink.client.program.ProgramInvocationException: Failed to
> > resolve JobManager"
> >
> > Using Flink 0.9.0
> >
> > The Jar copy a file from one location in Hdfs to another and works fine
> > while executed locally on the single node Yarn cluster -
> > bin/flink run -c Test ./examples/MyJar.jar
> > hdfs://localhost:9000/flink/in.txt hdfs://localhost:9000/flink/out.txt
> >
> > The code skeleton:
> >
> >     ExecutionEnvironment envRemote =
> > ExecutionEnvironment.createRemoteEnvironment
> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > DataSet<String> data =
> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > envRemote.execute();
> >
> >
> > Please advise,
> >
> > Hanan Meyer
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Alexey Sapozhnikov
Hello all.

Some clarification: locally everything works great.
However once we run our Flink on remote linux machine and try to run the
client program from our machine, using create remote environment- Flink
JobManager is raising this exception

On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:

> If you start the job via the "bin/flink" script, then simply use
> "ExecutionEnvironment.getExecutionEnvironment()" rather then creating a
> remote environment manually.
>
> That way, hosts and ports are configured automatically.
>
> On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <[hidden email]>
> wrote:
>
>> Hi,
>>
>> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
>> Every time you deploy Flink on YARN, the host and port change, because the
>> JobManager is started on a different YARN container.
>>
>>
>> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]> wrote:
>>
>> > Hello All
>> >
>> > When using Eclipse IDE to submit Flink to Yarn single node cluster I'm
>> > getting :
>> > "org.apache.flink.client.program.ProgramInvocationException: Failed to
>> > resolve JobManager"
>> >
>> > Using Flink 0.9.0
>> >
>> > The Jar copy a file from one location in Hdfs to another and works fine
>> > while executed locally on the single node Yarn cluster -
>> > bin/flink run -c Test ./examples/MyJar.jar
>> > hdfs://localhost:9000/flink/in.txt hdfs://localhost:9000/flink/out.txt
>> >
>> > The code skeleton:
>> >
>> >     ExecutionEnvironment envRemote =
>> > ExecutionEnvironment.createRemoteEnvironment
>> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
>> > DataSet<String> data =
>> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
>> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
>> > envRemote.execute();
>> >
>> >
>> > Please advise,
>> >
>> > Hanan Meyer
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Stephan Ewen
Please subscribe to the mailing list. All your mails are held back and need
to be manually approved.

On Thu, Aug 27, 2015 at 6:49 PM, Alexey Sapozhnikov <[hidden email]>
wrote:

> Hello all.
>
> Some clarification: locally everything works great.
> However once we run our Flink on remote linux machine and try to run the
> client program from our machine, using create remote environment- Flink
> JobManager is raising this exception
>
> On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
>
>> If you start the job via the "bin/flink" script, then simply use
>> "ExecutionEnvironment.getExecutionEnvironment()" rather then creating a
>> remote environment manually.
>>
>> That way, hosts and ports are configured automatically.
>>
>> On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <[hidden email]>
>> wrote:
>>
>>> Hi,
>>>
>>> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
>>> Every time you deploy Flink on YARN, the host and port change, because
>>> the
>>> JobManager is started on a different YARN container.
>>>
>>>
>>> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]> wrote:
>>>
>>> > Hello All
>>> >
>>> > When using Eclipse IDE to submit Flink to Yarn single node cluster I'm
>>> > getting :
>>> > "org.apache.flink.client.program.ProgramInvocationException: Failed to
>>> > resolve JobManager"
>>> >
>>> > Using Flink 0.9.0
>>> >
>>> > The Jar copy a file from one location in Hdfs to another and works fine
>>> > while executed locally on the single node Yarn cluster -
>>> > bin/flink run -c Test ./examples/MyJar.jar
>>> > hdfs://localhost:9000/flink/in.txt hdfs://localhost:9000/flink/out.txt
>>> >
>>> > The code skeleton:
>>> >
>>> >     ExecutionEnvironment envRemote =
>>> > ExecutionEnvironment.createRemoteEnvironment
>>> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
>>> > DataSet<String> data =
>>> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
>>> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
>>> > envRemote.execute();
>>> >
>>> >
>>> > Please advise,
>>> >
>>> > Hanan Meyer
>>> >
>>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Alexey Sapozhnikov
In reply to this post by Stephan Ewen
Hello all.

Some clarification: locally everything works great.
However once we run our Flink on remote linux machine and try to run the
client program from our machine, using create remote environment- Flink
JobManager is raising this exception

On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:

> If you start the job via the "bin/flink" script, then simply use
> "ExecutionEnvironment.getExecutionEnvironment()" rather then creating a
> remote environment manually.
>
> That way, hosts and ports are configured automatically.
>
> On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <[hidden email]>
> wrote:
>
>> Hi,
>>
>> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
>> Every time you deploy Flink on YARN, the host and port change, because the
>> JobManager is started on a different YARN container.
>>
>>
>> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]> wrote:
>>
>> > Hello All
>> >
>> > When using Eclipse IDE to submit Flink to Yarn single node cluster I'm
>> > getting :
>> > "org.apache.flink.client.program.ProgramInvocationException: Failed to
>> > resolve JobManager"
>> >
>> > Using Flink 0.9.0
>> >
>> > The Jar copy a file from one location in Hdfs to another and works fine
>> > while executed locally on the single node Yarn cluster -
>> > bin/flink run -c Test ./examples/MyJar.jar
>> > hdfs://localhost:9000/flink/in.txt hdfs://localhost:9000/flink/out.txt
>> >
>> > The code skeleton:
>> >
>> >     ExecutionEnvironment envRemote =
>> > ExecutionEnvironment.createRemoteEnvironment
>> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
>> > DataSet<String> data =
>> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
>> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
>> > envRemote.execute();
>> >
>> >
>> > Please advise,
>> >
>> > Hanan Meyer
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Robert Metzger
I guess you are getting an entire exception after the "org.apache.flink
.client.program.ProgramInvocationException: Failed to
resolve JobManager".
Can you post it here to help us understanding the issue?

On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <[hidden email]>
wrote:

> Hello all.
>
> Some clarification: locally everything works great.
> However once we run our Flink on remote linux machine and try to run the
> client program from our machine, using create remote environment- Flink
> JobManager is raising this exception
>
> On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
>
> > If you start the job via the "bin/flink" script, then simply use
> > "ExecutionEnvironment.getExecutionEnvironment()" rather then creating a
> > remote environment manually.
> >
> > That way, hosts and ports are configured automatically.
> >
> > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <[hidden email]>
> > wrote:
> >
> >> Hi,
> >>
> >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> >> Every time you deploy Flink on YARN, the host and port change, because
> the
> >> JobManager is started on a different YARN container.
> >>
> >>
> >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]>
> wrote:
> >>
> >> > Hello All
> >> >
> >> > When using Eclipse IDE to submit Flink to Yarn single node cluster I'm
> >> > getting :
> >> > "org.apache.flink.client.program.ProgramInvocationException: Failed to
> >> > resolve JobManager"
> >> >
> >> > Using Flink 0.9.0
> >> >
> >> > The Jar copy a file from one location in Hdfs to another and works
> fine
> >> > while executed locally on the single node Yarn cluster -
> >> > bin/flink run -c Test ./examples/MyJar.jar
> >> > hdfs://localhost:9000/flink/in.txt hdfs://localhost:9000/flink/out.txt
> >> >
> >> > The code skeleton:
> >> >
> >> >     ExecutionEnvironment envRemote =
> >> > ExecutionEnvironment.createRemoteEnvironment
> >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> >> > DataSet<String> data =
> >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> >> > envRemote.execute();
> >> >
> >> >
> >> > Please advise,
> >> >
> >> > Hanan Meyer
> >> >
> >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

hanan meyer
Hi

1. I have restarted Flink service via stop/start-loval.sh - it have been
restarted successfully ,no errors in log folder
2. default flink port is -6123

Getting this via Eclips IDE:

Thanks


org.apache.flink.client.program.ProgramInvocationException: Failed to
resolve JobManager
at org.apache.flink.client.program.Client.run(Client.java:379)
at org.apache.flink.client.program.Client.run(Client.java:356)
at org.apache.flink.client.program.Client.run(Client.java:349)
at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at Test.main(Test.java:39)
Caused by: java.io.IOException: JobManager at
akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not reachable.
Please make sure that the JobManager is running and its port is reachable.
at
org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
at
org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
at
org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
at
org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
at org.apache.flink.client.program.Client.run(Client.java:376)
... 7 more
Caused by: akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
Path(/user/jobmanager)]
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$
Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
at
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <[hidden email]>
wrote:

> I guess you are getting an entire exception after the "org.apache.flink
> .client.program.ProgramInvocationException: Failed to
> resolve JobManager".
> Can you post it here to help us understanding the issue?
>
> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <[hidden email]>
> wrote:
>
> > Hello all.
> >
> > Some clarification: locally everything works great.
> > However once we run our Flink on remote linux machine and try to run the
> > client program from our machine, using create remote environment- Flink
> > JobManager is raising this exception
> >
> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > If you start the job via the "bin/flink" script, then simply use
> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then creating a
> > > remote environment manually.
> > >
> > > That way, hosts and ports are configured automatically.
> > >
> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <[hidden email]>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> > >> Every time you deploy Flink on YARN, the host and port change, because
> > the
> > >> JobManager is started on a different YARN container.
> > >>
> > >>
> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]>
> > wrote:
> > >>
> > >> > Hello All
> > >> >
> > >> > When using Eclipse IDE to submit Flink to Yarn single node cluster
> I'm
> > >> > getting :
> > >> > "org.apache.flink.client.program.ProgramInvocationException: Failed
> to
> > >> > resolve JobManager"
> > >> >
> > >> > Using Flink 0.9.0
> > >> >
> > >> > The Jar copy a file from one location in Hdfs to another and works
> > fine
> > >> > while executed locally on the single node Yarn cluster -
> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > >> > hdfs://localhost:9000/flink/in.txt
> hdfs://localhost:9000/flink/out.txt
> > >> >
> > >> > The code skeleton:
> > >> >
> > >> >     ExecutionEnvironment envRemote =
> > >> > ExecutionEnvironment.createRemoteEnvironment
> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > >> > DataSet<String> data =
> > >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > >> > envRemote.execute();
> > >> >
> > >> >
> > >> > Please advise,
> > >> >
> > >> > Hanan Meyer
> > >> >
> > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

hanan meyer
Hi
 I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
By using flink-clients-0.7.0-hadoop2-incubating.jar with executePlan(Plan
p) method  instead, I'm getting the same exception

Hanan

On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]> wrote:

>
> Hi
>
> 1. I have restarted Flink service via stop/start-loval.sh - it have been
> restarted successfully ,no errors in log folder
> 2. default flink port is -6123
>
> Getting this via Eclips IDE:
>
> Thanks
>
>
> org.apache.flink.client.program.ProgramInvocationException: Failed to
> resolve JobManager
> at org.apache.flink.client.program.Client.run(Client.java:379)
> at org.apache.flink.client.program.Client.run(Client.java:356)
> at org.apache.flink.client.program.Client.run(Client.java:349)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> at
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> at Test.main(Test.java:39)
> Caused by: java.io.IOException: JobManager at
> akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not reachable.
> Please make sure that the JobManager is running and its port is reachable.
> at
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> at
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> at
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> at
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> at org.apache.flink.client.program.Client.run(Client.java:376)
> ... 7 more
> Caused by: akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> Path(/user/jobmanager)]
> at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at akka.dispatch.BatchingExecutor$
> Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <[hidden email]>
> wrote:
>
>> I guess you are getting an entire exception after the "org.apache.flink
>> .client.program.ProgramInvocationException: Failed to
>> resolve JobManager".
>> Can you post it here to help us understanding the issue?
>>
>> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <[hidden email]>
>> wrote:
>>
>> > Hello all.
>> >
>> > Some clarification: locally everything works great.
>> > However once we run our Flink on remote linux machine and try to run the
>> > client program from our machine, using create remote environment- Flink
>> > JobManager is raising this exception
>> >
>> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]> wrote:
>> >
>> > > If you start the job via the "bin/flink" script, then simply use
>> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then creating
>> a
>> > > remote environment manually.
>> > >
>> > > That way, hosts and ports are configured automatically.
>> > >
>> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <[hidden email]>
>> > > wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
>> > >> Every time you deploy Flink on YARN, the host and port change,
>> because
>> > the
>> > >> JobManager is started on a different YARN container.
>> > >>
>> > >>
>> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]>
>> > wrote:
>> > >>
>> > >> > Hello All
>> > >> >
>> > >> > When using Eclipse IDE to submit Flink to Yarn single node cluster
>> I'm
>> > >> > getting :
>> > >> > "org.apache.flink.client.program.ProgramInvocationException:
>> Failed to
>> > >> > resolve JobManager"
>> > >> >
>> > >> > Using Flink 0.9.0
>> > >> >
>> > >> > The Jar copy a file from one location in Hdfs to another and works
>> > fine
>> > >> > while executed locally on the single node Yarn cluster -
>> > >> > bin/flink run -c Test ./examples/MyJar.jar
>> > >> > hdfs://localhost:9000/flink/in.txt
>> hdfs://localhost:9000/flink/out.txt
>> > >> >
>> > >> > The code skeleton:
>> > >> >
>> > >> >     ExecutionEnvironment envRemote =
>> > >> > ExecutionEnvironment.createRemoteEnvironment
>> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
>> > >> > DataSet<String> data =
>> > >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
>> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
>> > >> > envRemote.execute();
>> > >> >
>> > >> >
>> > >> > Please advise,
>> > >> >
>> > >> > Hanan Meyer
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Robert Metzger
Hi,

in the exception you've posted earlier, you can see the following root
cause:

Caused by: akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
Path(/user/jobmanager)]

This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks like
this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
passing FLINK_SERVER_URL
as the server hostname (or ip).
Can you pass the correct hostname when you call ExecutionEnvironment.
createRemoteEnvironment().

On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]> wrote:

> Hi
>  I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
> By using flink-clients-0.7.0-hadoop2-incubating.jar with executePlan(Plan
> p) method  instead, I'm getting the same exception
>
> Hanan
>
> On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]> wrote:
>
> >
> > Hi
> >
> > 1. I have restarted Flink service via stop/start-loval.sh - it have been
> > restarted successfully ,no errors in log folder
> > 2. default flink port is -6123
> >
> > Getting this via Eclips IDE:
> >
> > Thanks
> >
> >
> > org.apache.flink.client.program.ProgramInvocationException: Failed to
> > resolve JobManager
> > at org.apache.flink.client.program.Client.run(Client.java:379)
> > at org.apache.flink.client.program.Client.run(Client.java:356)
> > at org.apache.flink.client.program.Client.run(Client.java:349)
> > at
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > at
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > at
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > at
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > at Test.main(Test.java:39)
> > Caused by: java.io.IOException: JobManager at
> > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not reachable.
> > Please make sure that the JobManager is running and its port is
> reachable.
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > at org.apache.flink.client.program.Client.run(Client.java:376)
> > ... 7 more
> > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > Path(/user/jobmanager)]
> > at
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > at
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at akka.dispatch.BatchingExecutor$
> > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > at
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > at
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > at
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > at
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > at
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > at
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > at
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > at
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > at
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > at
> > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >
> > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <[hidden email]>
> > wrote:
> >
> >> I guess you are getting an entire exception after the "org.apache.flink
> >> .client.program.ProgramInvocationException: Failed to
> >> resolve JobManager".
> >> Can you post it here to help us understanding the issue?
> >>
> >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> [hidden email]>
> >> wrote:
> >>
> >> > Hello all.
> >> >
> >> > Some clarification: locally everything works great.
> >> > However once we run our Flink on remote linux machine and try to run
> the
> >> > client program from our machine, using create remote environment-
> Flink
> >> > JobManager is raising this exception
> >> >
> >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]>
> wrote:
> >> >
> >> > > If you start the job via the "bin/flink" script, then simply use
> >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then
> creating
> >> a
> >> > > remote environment manually.
> >> > >
> >> > > That way, hosts and ports are configured automatically.
> >> > >
> >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> [hidden email]>
> >> > > wrote:
> >> > >
> >> > >> Hi,
> >> > >>
> >> > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> >> > >> Every time you deploy Flink on YARN, the host and port change,
> >> because
> >> > the
> >> > >> JobManager is started on a different YARN container.
> >> > >>
> >> > >>
> >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]>
> >> > wrote:
> >> > >>
> >> > >> > Hello All
> >> > >> >
> >> > >> > When using Eclipse IDE to submit Flink to Yarn single node
> cluster
> >> I'm
> >> > >> > getting :
> >> > >> > "org.apache.flink.client.program.ProgramInvocationException:
> >> Failed to
> >> > >> > resolve JobManager"
> >> > >> >
> >> > >> > Using Flink 0.9.0
> >> > >> >
> >> > >> > The Jar copy a file from one location in Hdfs to another and
> works
> >> > fine
> >> > >> > while executed locally on the single node Yarn cluster -
> >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> >> > >> > hdfs://localhost:9000/flink/in.txt
> >> hdfs://localhost:9000/flink/out.txt
> >> > >> >
> >> > >> > The code skeleton:
> >> > >> >
> >> > >> >     ExecutionEnvironment envRemote =
> >> > >> > ExecutionEnvironment.createRemoteEnvironment
> >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> >> > >> > DataSet<String> data =
> >> > >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> >> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> >> > >> > envRemote.execute();
> >> > >> >
> >> > >> >
> >> > >> > Please advise,
> >> > >> >
> >> > >> > Hanan Meyer
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

hanan meyer
Hi

I'm running with a formal server ip but for securuty reasons I can't share
with you the real ip .
I put "FLINK_SERVER_URL" in order to replace the actual ip only in my post .

Hanan Meyer

On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <[hidden email]>
wrote:

> Hi,
>
> in the exception you've posted earlier, you can see the following root
> cause:
>
> Caused by: akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> Path(/user/jobmanager)]
>
> This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks like
> this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> passing FLINK_SERVER_URL
> as the server hostname (or ip).
> Can you pass the correct hostname when you call ExecutionEnvironment.
> createRemoteEnvironment().
>
> On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]> wrote:
>
> > Hi
> >  I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
> > By using flink-clients-0.7.0-hadoop2-incubating.jar with executePlan(Plan
> > p) method  instead, I'm getting the same exception
> >
> > Hanan
> >
> > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]> wrote:
> >
> > >
> > > Hi
> > >
> > > 1. I have restarted Flink service via stop/start-loval.sh - it have
> been
> > > restarted successfully ,no errors in log folder
> > > 2. default flink port is -6123
> > >
> > > Getting this via Eclips IDE:
> > >
> > > Thanks
> > >
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: Failed to
> > > resolve JobManager
> > > at org.apache.flink.client.program.Client.run(Client.java:379)
> > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > at
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > at
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > at
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > at
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > at Test.main(Test.java:39)
> > > Caused by: java.io.IOException: JobManager at
> > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not reachable.
> > > Please make sure that the JobManager is running and its port is
> > reachable.
> > > at
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > at
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > at
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > at
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > at org.apache.flink.client.program.Client.run(Client.java:376)
> > > ... 7 more
> > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > Path(/user/jobmanager)]
> > > at
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > at
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > at akka.dispatch.BatchingExecutor$
> > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > at
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > at
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > at
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > at
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > at
> > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > at
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > at
> > >
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > at
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > at
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > at
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > at
> > >
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > at
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > at
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > at
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > >
> > >
> > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <[hidden email]>
> > > wrote:
> > >
> > >> I guess you are getting an entire exception after the
> "org.apache.flink
> > >> .client.program.ProgramInvocationException: Failed to
> > >> resolve JobManager".
> > >> Can you post it here to help us understanding the issue?
> > >>
> > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > [hidden email]>
> > >> wrote:
> > >>
> > >> > Hello all.
> > >> >
> > >> > Some clarification: locally everything works great.
> > >> > However once we run our Flink on remote linux machine and try to run
> > the
> > >> > client program from our machine, using create remote environment-
> > Flink
> > >> > JobManager is raising this exception
> > >> >
> > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]>
> > wrote:
> > >> >
> > >> > > If you start the job via the "bin/flink" script, then simply use
> > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then
> > creating
> > >> a
> > >> > > remote environment manually.
> > >> > >
> > >> > > That way, hosts and ports are configured automatically.
> > >> > >
> > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > [hidden email]>
> > >> > > wrote:
> > >> > >
> > >> > >> Hi,
> > >> > >>
> > >> > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> > >> > >> Every time you deploy Flink on YARN, the host and port change,
> > >> because
> > >> > the
> > >> > >> JobManager is started on a different YARN container.
> > >> > >>
> > >> > >>
> > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]
> >
> > >> > wrote:
> > >> > >>
> > >> > >> > Hello All
> > >> > >> >
> > >> > >> > When using Eclipse IDE to submit Flink to Yarn single node
> > cluster
> > >> I'm
> > >> > >> > getting :
> > >> > >> > "org.apache.flink.client.program.ProgramInvocationException:
> > >> Failed to
> > >> > >> > resolve JobManager"
> > >> > >> >
> > >> > >> > Using Flink 0.9.0
> > >> > >> >
> > >> > >> > The Jar copy a file from one location in Hdfs to another and
> > works
> > >> > fine
> > >> > >> > while executed locally on the single node Yarn cluster -
> > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > >> > >> > hdfs://localhost:9000/flink/in.txt
> > >> hdfs://localhost:9000/flink/out.txt
> > >> > >> >
> > >> > >> > The code skeleton:
> > >> > >> >
> > >> > >> >     ExecutionEnvironment envRemote =
> > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > >> > >> > DataSet<String> data =
> > >> > >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > >> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > >> > >> > envRemote.execute();
> > >> > >> >
> > >> > >> >
> > >> > >> > Please advise,
> > >> > >> >
> > >> > >> > Hanan Meyer
> > >> > >> >
> > >> > >>
> > >> > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Stephan Ewen
Can you try to not manually create a "RemoteExecutionEnvironment", but to
simply use the recommended way of doing this:

Please use "ExecutionEnvironment.getExecutionEnvironment()" if you run the
program through the command line anyways.

On Fri, Aug 28, 2015 at 1:04 PM, Hanan Meyer <[hidden email]> wrote:

> Hi
>
> I'm running with a formal server ip but for securuty reasons I can't share
> with you the real ip .
> I put "FLINK_SERVER_URL" in order to replace the actual ip only in my post
> .
>
> Hanan Meyer
>
> On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <[hidden email]>
> wrote:
>
> > Hi,
> >
> > in the exception you've posted earlier, you can see the following root
> > cause:
> >
> > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > Path(/user/jobmanager)]
> >
> > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks like
> > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> > passing FLINK_SERVER_URL
> > as the server hostname (or ip).
> > Can you pass the correct hostname when you call ExecutionEnvironment.
> > createRemoteEnvironment().
> >
> > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]> wrote:
> >
> > > Hi
> > >  I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
> > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> executePlan(Plan
> > > p) method  instead, I'm getting the same exception
> > >
> > > Hanan
> > >
> > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]>
> wrote:
> > >
> > > >
> > > > Hi
> > > >
> > > > 1. I have restarted Flink service via stop/start-loval.sh - it have
> > been
> > > > restarted successfully ,no errors in log folder
> > > > 2. default flink port is -6123
> > > >
> > > > Getting this via Eclips IDE:
> > > >
> > > > Thanks
> > > >
> > > >
> > > > org.apache.flink.client.program.ProgramInvocationException: Failed to
> > > > resolve JobManager
> > > > at org.apache.flink.client.program.Client.run(Client.java:379)
> > > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > at Test.main(Test.java:39)
> > > > Caused by: java.io.IOException: JobManager at
> > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not
> reachable.
> > > > Please make sure that the JobManager is running and its port is
> > > reachable.
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > at org.apache.flink.client.program.Client.run(Client.java:376)
> > > > ... 7 more
> > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > Path(/user/jobmanager)]
> > > > at
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > at
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > at akka.dispatch.BatchingExecutor$
> > > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > at
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > at
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > at
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > at
> > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > at
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > at
> > >
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > at
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > at
> > > >
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > at
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > at
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > at
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > at
> > > >
> > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > at
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > at
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > at
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > at
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > >
> > > >
> > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> [hidden email]>
> > > > wrote:
> > > >
> > > >> I guess you are getting an entire exception after the
> > "org.apache.flink
> > > >> .client.program.ProgramInvocationException: Failed to
> > > >> resolve JobManager".
> > > >> Can you post it here to help us understanding the issue?
> > > >>
> > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > [hidden email]>
> > > >> wrote:
> > > >>
> > > >> > Hello all.
> > > >> >
> > > >> > Some clarification: locally everything works great.
> > > >> > However once we run our Flink on remote linux machine and try to
> run
> > > the
> > > >> > client program from our machine, using create remote environment-
> > > Flink
> > > >> > JobManager is raising this exception
> > > >> >
> > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]>
> > > wrote:
> > > >> >
> > > >> > > If you start the job via the "bin/flink" script, then simply use
> > > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then
> > > creating
> > > >> a
> > > >> > > remote environment manually.
> > > >> > >
> > > >> > > That way, hosts and ports are configured automatically.
> > > >> > >
> > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > [hidden email]>
> > > >> > > wrote:
> > > >> > >
> > > >> > >> Hi,
> > > >> > >>
> > > >> > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> > > >> > >> Every time you deploy Flink on YARN, the host and port change,
> > > >> because
> > > >> > the
> > > >> > >> JobManager is started on a different YARN container.
> > > >> > >>
> > > >> > >>
> > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> [hidden email]
> > >
> > > >> > wrote:
> > > >> > >>
> > > >> > >> > Hello All
> > > >> > >> >
> > > >> > >> > When using Eclipse IDE to submit Flink to Yarn single node
> > > cluster
> > > >> I'm
> > > >> > >> > getting :
> > > >> > >> > "org.apache.flink.client.program.ProgramInvocationException:
> > > >> Failed to
> > > >> > >> > resolve JobManager"
> > > >> > >> >
> > > >> > >> > Using Flink 0.9.0
> > > >> > >> >
> > > >> > >> > The Jar copy a file from one location in Hdfs to another and
> > > works
> > > >> > fine
> > > >> > >> > while executed locally on the single node Yarn cluster -
> > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > >> hdfs://localhost:9000/flink/out.txt
> > > >> > >> >
> > > >> > >> > The code skeleton:
> > > >> > >> >
> > > >> > >> >     ExecutionEnvironment envRemote =
> > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > >> > >> > DataSet<String> data =
> > > >> > >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > >> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > >> > >> > envRemote.execute();
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > Please advise,
> > > >> > >> >
> > > >> > >> > Hanan Meyer
> > > >> > >> >
> > > >> > >>
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

hanan meyer
In reply to this post by Robert Metzger
Hello.
Let me clarify the situation.
1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS 2.7.1.
2. Locally, our program is working: once we run flink as ./start-local.sh,
we are able to connect and run the createRemoteEnvironment and Execute
methods.
3.Due to our architecture and basic Flink feature we want to invoke this
functionality REMOTELY , when our Java code is calling the Flink methods
from another server.
4.We tried both ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
6123, "TestProj.jar"); and ExecutionEnvironment.createRemoteEnvironment("
flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not right since
it should be an IP address) - it crash on the "cant reach JobManager" error.

It seems to us that it can be  one of 2 issues.
1.Somehow we need to configure flink to accept the connections from the
remote machine
2.Flink has a critical showstopper bug that jeopardizing a whole decision
to use this technology.

Please advise us how we should advance.




On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <[hidden email]>
wrote:

> Hi,
>
> in the exception you've posted earlier, you can see the following root
> cause:
>
> Caused by: akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> Path(/user/jobmanager)]
>
> This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks like
> this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> passing FLINK_SERVER_URL
> as the server hostname (or ip).
> Can you pass the correct hostname when you call ExecutionEnvironment.
> createRemoteEnvironment().
>
> On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]> wrote:
>
> > Hi
> >  I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
> > By using flink-clients-0.7.0-hadoop2-incubating.jar with executePlan(Plan
> > p) method  instead, I'm getting the same exception
> >
> > Hanan
> >
> > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]> wrote:
> >
> > >
> > > Hi
> > >
> > > 1. I have restarted Flink service via stop/start-loval.sh - it have
> been
> > > restarted successfully ,no errors in log folder
> > > 2. default flink port is -6123
> > >
> > > Getting this via Eclips IDE:
> > >
> > > Thanks
> > >
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: Failed to
> > > resolve JobManager
> > > at org.apache.flink.client.program.Client.run(Client.java:379)
> > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > at
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > at
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > at
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > at
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > at Test.main(Test.java:39)
> > > Caused by: java.io.IOException: JobManager at
> > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not reachable.
> > > Please make sure that the JobManager is running and its port is
> > reachable.
> > > at
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > at
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > at
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > at
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > at org.apache.flink.client.program.Client.run(Client.java:376)
> > > ... 7 more
> > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > Path(/user/jobmanager)]
> > > at
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > at
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > at akka.dispatch.BatchingExecutor$
> > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > at
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > at
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > at
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > at
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > at
> > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > at
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > at
> > >
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > at
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > at
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > at
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > at
> > >
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > at
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > at
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > at
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > >
> > >
> > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <[hidden email]>
> > > wrote:
> > >
> > >> I guess you are getting an entire exception after the
> "org.apache.flink
> > >> .client.program.ProgramInvocationException: Failed to
> > >> resolve JobManager".
> > >> Can you post it here to help us understanding the issue?
> > >>
> > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > [hidden email]>
> > >> wrote:
> > >>
> > >> > Hello all.
> > >> >
> > >> > Some clarification: locally everything works great.
> > >> > However once we run our Flink on remote linux machine and try to run
> > the
> > >> > client program from our machine, using create remote environment-
> > Flink
> > >> > JobManager is raising this exception
> > >> >
> > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]>
> > wrote:
> > >> >
> > >> > > If you start the job via the "bin/flink" script, then simply use
> > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then
> > creating
> > >> a
> > >> > > remote environment manually.
> > >> > >
> > >> > > That way, hosts and ports are configured automatically.
> > >> > >
> > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > [hidden email]>
> > >> > > wrote:
> > >> > >
> > >> > >> Hi,
> > >> > >>
> > >> > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> > >> > >> Every time you deploy Flink on YARN, the host and port change,
> > >> because
> > >> > the
> > >> > >> JobManager is started on a different YARN container.
> > >> > >>
> > >> > >>
> > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <[hidden email]
> >
> > >> > wrote:
> > >> > >>
> > >> > >> > Hello All
> > >> > >> >
> > >> > >> > When using Eclipse IDE to submit Flink to Yarn single node
> > cluster
> > >> I'm
> > >> > >> > getting :
> > >> > >> > "org.apache.flink.client.program.ProgramInvocationException:
> > >> Failed to
> > >> > >> > resolve JobManager"
> > >> > >> >
> > >> > >> > Using Flink 0.9.0
> > >> > >> >
> > >> > >> > The Jar copy a file from one location in Hdfs to another and
> > works
> > >> > fine
> > >> > >> > while executed locally on the single node Yarn cluster -
> > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > >> > >> > hdfs://localhost:9000/flink/in.txt
> > >> hdfs://localhost:9000/flink/out.txt
> > >> > >> >
> > >> > >> > The code skeleton:
> > >> > >> >
> > >> > >> >     ExecutionEnvironment envRemote =
> > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > >> > >> > DataSet<String> data =
> > >> > >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > >> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > >> > >> > envRemote.execute();
> > >> > >> >
> > >> > >> >
> > >> > >> > Please advise,
> > >> > >> >
> > >> > >> > Hanan Meyer
> > >> > >> >
> > >> > >>
> > >> > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Stephan Ewen
The only thing I can think of is that you are not using the right host/port
for the JobManager.

When you start the YARN session, it should print the host where the
JobManager runs. You also need to take the port from there, as in YARN, the
port is usually not 6123. Yarn starts many services on one machine, so the
ports need to be randomized.

It may be worth adding a YARNExecutionEnvironment at some point, which
deals with this transparent (starting the YARN cluster, connecting to the
JobManager).

On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer <[hidden email]> wrote:

> Hello.
> Let me clarify the situation.
> 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS 2.7.1.
> 2. Locally, our program is working: once we run flink as ./start-local.sh,
> we are able to connect and run the createRemoteEnvironment and Execute
> methods.
> 3.Due to our architecture and basic Flink feature we want to invoke this
> functionality REMOTELY , when our Java code is calling the Flink methods
> from another server.
> 4.We tried both ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
> 6123, "TestProj.jar"); and ExecutionEnvironment.createRemoteEnvironment("
> flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not right
> since
> it should be an IP address) - it crash on the "cant reach JobManager"
> error.
>
> It seems to us that it can be  one of 2 issues.
> 1.Somehow we need to configure flink to accept the connections from the
> remote machine
> 2.Flink has a critical showstopper bug that jeopardizing a whole decision
> to use this technology.
>
> Please advise us how we should advance.
>
>
>
>
> On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <[hidden email]>
> wrote:
>
> > Hi,
> >
> > in the exception you've posted earlier, you can see the following root
> > cause:
> >
> > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > Path(/user/jobmanager)]
> >
> > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks like
> > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> > passing FLINK_SERVER_URL
> > as the server hostname (or ip).
> > Can you pass the correct hostname when you call ExecutionEnvironment.
> > createRemoteEnvironment().
> >
> > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]> wrote:
> >
> > > Hi
> > >  I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
> > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> executePlan(Plan
> > > p) method  instead, I'm getting the same exception
> > >
> > > Hanan
> > >
> > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]>
> wrote:
> > >
> > > >
> > > > Hi
> > > >
> > > > 1. I have restarted Flink service via stop/start-loval.sh - it have
> > been
> > > > restarted successfully ,no errors in log folder
> > > > 2. default flink port is -6123
> > > >
> > > > Getting this via Eclips IDE:
> > > >
> > > > Thanks
> > > >
> > > >
> > > > org.apache.flink.client.program.ProgramInvocationException: Failed to
> > > > resolve JobManager
> > > > at org.apache.flink.client.program.Client.run(Client.java:379)
> > > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > at Test.main(Test.java:39)
> > > > Caused by: java.io.IOException: JobManager at
> > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not
> reachable.
> > > > Please make sure that the JobManager is running and its port is
> > > reachable.
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > at org.apache.flink.client.program.Client.run(Client.java:376)
> > > > ... 7 more
> > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > Path(/user/jobmanager)]
> > > > at
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > at
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > at akka.dispatch.BatchingExecutor$
> > > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > at
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > at
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > at
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > at
> > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > at
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > at
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > at
> > >
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > at
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > at
> > > >
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > at
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > at
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > at
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > at
> > > >
> > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > at
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > at
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > at
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > at
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > >
> > > >
> > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> [hidden email]>
> > > > wrote:
> > > >
> > > >> I guess you are getting an entire exception after the
> > "org.apache.flink
> > > >> .client.program.ProgramInvocationException: Failed to
> > > >> resolve JobManager".
> > > >> Can you post it here to help us understanding the issue?
> > > >>
> > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > [hidden email]>
> > > >> wrote:
> > > >>
> > > >> > Hello all.
> > > >> >
> > > >> > Some clarification: locally everything works great.
> > > >> > However once we run our Flink on remote linux machine and try to
> run
> > > the
> > > >> > client program from our machine, using create remote environment-
> > > Flink
> > > >> > JobManager is raising this exception
> > > >> >
> > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]>
> > > wrote:
> > > >> >
> > > >> > > If you start the job via the "bin/flink" script, then simply use
> > > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then
> > > creating
> > > >> a
> > > >> > > remote environment manually.
> > > >> > >
> > > >> > > That way, hosts and ports are configured automatically.
> > > >> > >
> > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > [hidden email]>
> > > >> > > wrote:
> > > >> > >
> > > >> > >> Hi,
> > > >> > >>
> > > >> > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> > > >> > >> Every time you deploy Flink on YARN, the host and port change,
> > > >> because
> > > >> > the
> > > >> > >> JobManager is started on a different YARN container.
> > > >> > >>
> > > >> > >>
> > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> [hidden email]
> > >
> > > >> > wrote:
> > > >> > >>
> > > >> > >> > Hello All
> > > >> > >> >
> > > >> > >> > When using Eclipse IDE to submit Flink to Yarn single node
> > > cluster
> > > >> I'm
> > > >> > >> > getting :
> > > >> > >> > "org.apache.flink.client.program.ProgramInvocationException:
> > > >> Failed to
> > > >> > >> > resolve JobManager"
> > > >> > >> >
> > > >> > >> > Using Flink 0.9.0
> > > >> > >> >
> > > >> > >> > The Jar copy a file from one location in Hdfs to another and
> > > works
> > > >> > fine
> > > >> > >> > while executed locally on the single node Yarn cluster -
> > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > >> hdfs://localhost:9000/flink/out.txt
> > > >> > >> >
> > > >> > >> > The code skeleton:
> > > >> > >> >
> > > >> > >> >     ExecutionEnvironment envRemote =
> > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > >> > >> > DataSet<String> data =
> > > >> > >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > >> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > >> > >> > envRemote.execute();
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > Please advise,
> > > >> > >> >
> > > >> > >> > Hanan Meyer
> > > >> > >> >
> > > >> > >>
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Robert Metzger
The output of the YARN session should look like this:

Flink JobManager is now running on quickstart.cloudera:39956
JobManager Web Interface:
http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/
Number of connected TaskManagers changed to 1. Slots available: 1




On Sun, Aug 30, 2015 at 11:12 AM, Stephan Ewen <[hidden email]> wrote:

> The only thing I can think of is that you are not using the right host/port
> for the JobManager.
>
> When you start the YARN session, it should print the host where the
> JobManager runs. You also need to take the port from there, as in YARN, the
> port is usually not 6123. Yarn starts many services on one machine, so the
> ports need to be randomized.
>
> It may be worth adding a YARNExecutionEnvironment at some point, which
> deals with this transparent (starting the YARN cluster, connecting to the
> JobManager).
>
> On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer <[hidden email]> wrote:
>
> > Hello.
> > Let me clarify the situation.
> > 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS
> 2.7.1.
> > 2. Locally, our program is working: once we run flink as
> ./start-local.sh,
> > we are able to connect and run the createRemoteEnvironment and Execute
> > methods.
> > 3.Due to our architecture and basic Flink feature we want to invoke this
> > functionality REMOTELY , when our Java code is calling the Flink methods
> > from another server.
> > 4.We tried both ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
> > 6123, "TestProj.jar"); and ExecutionEnvironment.createRemoteEnvironment("
> > flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not right
> > since
> > it should be an IP address) - it crash on the "cant reach JobManager"
> > error.
> >
> > It seems to us that it can be  one of 2 issues.
> > 1.Somehow we need to configure flink to accept the connections from the
> > remote machine
> > 2.Flink has a critical showstopper bug that jeopardizing a whole decision
> > to use this technology.
> >
> > Please advise us how we should advance.
> >
> >
> >
> >
> > On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <[hidden email]>
> > wrote:
> >
> > > Hi,
> > >
> > > in the exception you've posted earlier, you can see the following root
> > > cause:
> > >
> > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > Path(/user/jobmanager)]
> > >
> > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks
> like
> > > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> > > passing FLINK_SERVER_URL
> > > as the server hostname (or ip).
> > > Can you pass the correct hostname when you call ExecutionEnvironment.
> > > createRemoteEnvironment().
> > >
> > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]>
> wrote:
> > >
> > > > Hi
> > > >  I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
> > > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> > executePlan(Plan
> > > > p) method  instead, I'm getting the same exception
> > > >
> > > > Hanan
> > > >
> > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]>
> > wrote:
> > > >
> > > > >
> > > > > Hi
> > > > >
> > > > > 1. I have restarted Flink service via stop/start-loval.sh - it have
> > > been
> > > > > restarted successfully ,no errors in log folder
> > > > > 2. default flink port is -6123
> > > > >
> > > > > Getting this via Eclips IDE:
> > > > >
> > > > > Thanks
> > > > >
> > > > >
> > > > > org.apache.flink.client.program.ProgramInvocationException: Failed
> to
> > > > > resolve JobManager
> > > > > at org.apache.flink.client.program.Client.run(Client.java:379)
> > > > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > > at Test.main(Test.java:39)
> > > > > Caused by: java.io.IOException: JobManager at
> > > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not
> > reachable.
> > > > > Please make sure that the JobManager is running and its port is
> > > > reachable.
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > > at org.apache.flink.client.program.Client.run(Client.java:376)
> > > > > ... 7 more
> > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > > Path(/user/jobmanager)]
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > > at akka.dispatch.BatchingExecutor$
> > > > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > at
> > > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > > at
> > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > > at
> > > >
> > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > > at
> > > > >
> > >
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > > at
> > > > >
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > > at
> > > > >
> > >
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > > at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > > at
> > > >
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > >
> > > > >
> > > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> > [hidden email]>
> > > > > wrote:
> > > > >
> > > > >> I guess you are getting an entire exception after the
> > > "org.apache.flink
> > > > >> .client.program.ProgramInvocationException: Failed to
> > > > >> resolve JobManager".
> > > > >> Can you post it here to help us understanding the issue?
> > > > >>
> > > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > > [hidden email]>
> > > > >> wrote:
> > > > >>
> > > > >> > Hello all.
> > > > >> >
> > > > >> > Some clarification: locally everything works great.
> > > > >> > However once we run our Flink on remote linux machine and try to
> > run
> > > > the
> > > > >> > client program from our machine, using create remote
> environment-
> > > > Flink
> > > > >> > JobManager is raising this exception
> > > > >> >
> > > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <[hidden email]
> >
> > > > wrote:
> > > > >> >
> > > > >> > > If you start the job via the "bin/flink" script, then simply
> use
> > > > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then
> > > > creating
> > > > >> a
> > > > >> > > remote environment manually.
> > > > >> > >
> > > > >> > > That way, hosts and ports are configured automatically.
> > > > >> > >
> > > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > > [hidden email]>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > >> Hi,
> > > > >> > >>
> > > > >> > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT?
> > > > >> > >> Every time you deploy Flink on YARN, the host and port
> change,
> > > > >> because
> > > > >> > the
> > > > >> > >> JobManager is started on a different YARN container.
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> > [hidden email]
> > > >
> > > > >> > wrote:
> > > > >> > >>
> > > > >> > >> > Hello All
> > > > >> > >> >
> > > > >> > >> > When using Eclipse IDE to submit Flink to Yarn single node
> > > > cluster
> > > > >> I'm
> > > > >> > >> > getting :
> > > > >> > >> >
> "org.apache.flink.client.program.ProgramInvocationException:
> > > > >> Failed to
> > > > >> > >> > resolve JobManager"
> > > > >> > >> >
> > > > >> > >> > Using Flink 0.9.0
> > > > >> > >> >
> > > > >> > >> > The Jar copy a file from one location in Hdfs to another
> and
> > > > works
> > > > >> > fine
> > > > >> > >> > while executed locally on the single node Yarn cluster -
> > > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > > >> hdfs://localhost:9000/flink/out.txt
> > > > >> > >> >
> > > > >> > >> > The code skeleton:
> > > > >> > >> >
> > > > >> > >> >     ExecutionEnvironment envRemote =
> > > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > > >> > >> > DataSet<String> data =
> > > > >> > >> >
> envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > > >> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > > >> > >> > envRemote.execute();
> > > > >> > >> >
> > > > >> > >> >
> > > > >> > >> > Please advise,
> > > > >> > >> >
> > > > >> > >> > Hanan Meyer
> > > > >> > >> >
> > > > >> > >>
> > > > >> > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

hanan meyer
Hello all.

Firstly- thank you for your valuable advices.
We did some very fine tuned pinpoint test and comes to following conclusions

1.We run on Ubuntu 14 flink for hadoop 2.7
2.Once we copy our Java client program directy to the machine and run it
directly there it worked very good
The program is

.....

ExecutionEnvironment envRemote
=ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
"\usr\local\HananTestProj.jar");


org.apache.flink.api.java.DataSet text =
(org.apache.flink.api.java.DataSet) envRemote.fromElements(
           "Who's there?",
           "I think I hear them. Stand, ho! Who's there?");

       org.apache.flink.api.java.DataSet<Tuple2<String, Integer>>
wordCounts = text
           .flatMap(new LineSplitter())
           .groupBy(0)
           .sum(1);

       wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
       public void flatMap(String line, Collector<Tuple2<String, Integer>>
out) {
           for (String word : line.split(" ")) {
               out.collect(new Tuple2<String, Integer>(word, 1));
           }
       }
   }

.....
Program works fine
3.Now we are trying to run this program remotely , from windows machine
when the first row looks differently
ExecutionEnvironment envRemote
=ExecutionEnvironment.createRemoteEnvironment("1.2.3.4", 6123,
"C:\\HananTestProj.jar");
 when 1.2.3.4   is IP address of fink machine

4.We got an exception :Jobmanager at 1.2.3.4 cant be reached bla bla bla

5.in flink configuration we found a following line jobmanager.rpc.address:
localhost
Flink cant be started with any other value (hostname/ipaddress ) except the
localhost


6.We assume that probably Flink has a critical bug : it cant be started
from remote machine, only locally. Are we right? Are we wrong? Should we
fill JIRA ?
Maybe we need somehow to configure Flink differently?

Please advice
Best regards



On Sun, Aug 30, 2015 at 3:19 PM, Robert Metzger <[hidden email]> wrote:

> The output of the YARN session should look like this:
>
> Flink JobManager is now running on quickstart.cloudera:39956
> JobManager Web Interface:
> http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/
> Number of connected TaskManagers changed to 1. Slots available: 1
>
>
>
>
> On Sun, Aug 30, 2015 at 11:12 AM, Stephan Ewen <[hidden email]> wrote:
>
> > The only thing I can think of is that you are not using the right
> host/port
> > for the JobManager.
> >
> > When you start the YARN session, it should print the host where the
> > JobManager runs. You also need to take the port from there, as in YARN,
> the
> > port is usually not 6123. Yarn starts many services on one machine, so
> the
> > ports need to be randomized.
> >
> > It may be worth adding a YARNExecutionEnvironment at some point, which
> > deals with this transparent (starting the YARN cluster, connecting to the
> > JobManager).
> >
> > On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer <[hidden email]>
> wrote:
> >
> > > Hello.
> > > Let me clarify the situation.
> > > 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS
> > 2.7.1.
> > > 2. Locally, our program is working: once we run flink as
> > ./start-local.sh,
> > > we are able to connect and run the createRemoteEnvironment and Execute
> > > methods.
> > > 3.Due to our architecture and basic Flink feature we want to invoke
> this
> > > functionality REMOTELY , when our Java code is calling the Flink
> methods
> > > from another server.
> > > 4.We tried both ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
> > > 6123, "TestProj.jar"); and
> ExecutionEnvironment.createRemoteEnvironment("
> > > flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not right
> > > since
> > > it should be an IP address) - it crash on the "cant reach JobManager"
> > > error.
> > >
> > > It seems to us that it can be  one of 2 issues.
> > > 1.Somehow we need to configure flink to accept the connections from the
> > > remote machine
> > > 2.Flink has a critical showstopper bug that jeopardizing a whole
> decision
> > > to use this technology.
> > >
> > > Please advise us how we should advance.
> > >
> > >
> > >
> > >
> > > On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <[hidden email]>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > in the exception you've posted earlier, you can see the following
> root
> > > > cause:
> > > >
> > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > Path(/user/jobmanager)]
> > > >
> > > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks
> > like
> > > > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> > > > passing FLINK_SERVER_URL
> > > > as the server hostname (or ip).
> > > > Can you pass the correct hostname when you call ExecutionEnvironment.
> > > > createRemoteEnvironment().
> > > >
> > > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]>
> > wrote:
> > > >
> > > > > Hi
> > > > >  I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
> > > > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> > > executePlan(Plan
> > > > > p) method  instead, I'm getting the same exception
> > > > >
> > > > > Hanan
> > > > >
> > > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]>
> > > wrote:
> > > > >
> > > > > >
> > > > > > Hi
> > > > > >
> > > > > > 1. I have restarted Flink service via stop/start-loval.sh - it
> have
> > > > been
> > > > > > restarted successfully ,no errors in log folder
> > > > > > 2. default flink port is -6123
> > > > > >
> > > > > > Getting this via Eclips IDE:
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > >
> > > > > > org.apache.flink.client.program.ProgramInvocationException:
> Failed
> > to
> > > > > > resolve JobManager
> > > > > > at org.apache.flink.client.program.Client.run(Client.java:379)
> > > > > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > > > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > > > at Test.main(Test.java:39)
> > > > > > Caused by: java.io.IOException: JobManager at
> > > > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not
> > > reachable.
> > > > > > Please make sure that the JobManager is running and its port is
> > > > > reachable.
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > > > at org.apache.flink.client.program.Client.run(Client.java:376)
> > > > > > ... 7 more
> > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > > > Path(/user/jobmanager)]
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > > > at akka.dispatch.BatchingExecutor$
> > > > > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > at
> > > >
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > > > at
> > > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > > > at
> > > > >
> > >
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > > > at
> > > > > >
> > > >
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > > > at
> akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > > > at
> akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > > > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > > > at
> > > > > >
> > > >
> > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > > > at
> > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > > > at
> > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > > > at
> > > > >
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> > > [hidden email]>
> > > > > > wrote:
> > > > > >
> > > > > >> I guess you are getting an entire exception after the
> > > > "org.apache.flink
> > > > > >> .client.program.ProgramInvocationException: Failed to
> > > > > >> resolve JobManager".
> > > > > >> Can you post it here to help us understanding the issue?
> > > > > >>
> > > > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > > > [hidden email]>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hello all.
> > > > > >> >
> > > > > >> > Some clarification: locally everything works great.
> > > > > >> > However once we run our Flink on remote linux machine and try
> to
> > > run
> > > > > the
> > > > > >> > client program from our machine, using create remote
> > environment-
> > > > > Flink
> > > > > >> > JobManager is raising this exception
> > > > > >> >
> > > > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <
> [hidden email]
> > >
> > > > > wrote:
> > > > > >> >
> > > > > >> > > If you start the job via the "bin/flink" script, then simply
> > use
> > > > > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then
> > > > > creating
> > > > > >> a
> > > > > >> > > remote environment manually.
> > > > > >> > >
> > > > > >> > > That way, hosts and ports are configured automatically.
> > > > > >> > >
> > > > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > > > [hidden email]>
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > >> Hi,
> > > > > >> > >>
> > > > > >> > >> Which values did you use for FLINK_SERVER_URL and
> FLINK_PORT?
> > > > > >> > >> Every time you deploy Flink on YARN, the host and port
> > change,
> > > > > >> because
> > > > > >> > the
> > > > > >> > >> JobManager is started on a different YARN container.
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> > > [hidden email]
> > > > >
> > > > > >> > wrote:
> > > > > >> > >>
> > > > > >> > >> > Hello All
> > > > > >> > >> >
> > > > > >> > >> > When using Eclipse IDE to submit Flink to Yarn single
> node
> > > > > cluster
> > > > > >> I'm
> > > > > >> > >> > getting :
> > > > > >> > >> >
> > "org.apache.flink.client.program.ProgramInvocationException:
> > > > > >> Failed to
> > > > > >> > >> > resolve JobManager"
> > > > > >> > >> >
> > > > > >> > >> > Using Flink 0.9.0
> > > > > >> > >> >
> > > > > >> > >> > The Jar copy a file from one location in Hdfs to another
> > and
> > > > > works
> > > > > >> > fine
> > > > > >> > >> > while executed locally on the single node Yarn cluster -
> > > > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > > > >> hdfs://localhost:9000/flink/out.txt
> > > > > >> > >> >
> > > > > >> > >> > The code skeleton:
> > > > > >> > >> >
> > > > > >> > >> >     ExecutionEnvironment envRemote =
> > > > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > > > >> > >> > DataSet<String> data =
> > > > > >> > >> >
> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > > > >> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > > > >> > >> > envRemote.execute();
> > > > > >> > >> >
> > > > > >> > >> >
> > > > > >> > >> > Please advise,
> > > > > >> > >> >
> > > > > >> > >> > Hanan Meyer
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Stephan Ewen
Do you start Flink via YARN? In that case the "jobmanager.rpc.address" is
not used, because YARN assigns containers/nodes.

If you start Flink in "standalone" mode, this should be the address of the
node that runs the JobManager. It will be used as the host/IP that Flink
binds to. The same host should then be used in the RemoteEnvironment.

Stephan


On Sun, Aug 30, 2015 at 3:25 PM, Hanan Meyer <[hidden email]> wrote:

> Hello all.
>
> Firstly- thank you for your valuable advices.
> We did some very fine tuned pinpoint test and comes to following
> conclusions
>
> 1.We run on Ubuntu 14 flink for hadoop 2.7
> 2.Once we copy our Java client program directy to the machine and run it
> directly there it worked very good
> The program is
>
> .....
>
> ExecutionEnvironment envRemote
> =ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
> "\usr\local\HananTestProj.jar");
>
>
> org.apache.flink.api.java.DataSet text =
> (org.apache.flink.api.java.DataSet) envRemote.fromElements(
>            "Who's there?",
>            "I think I hear them. Stand, ho! Who's there?");
>
>        org.apache.flink.api.java.DataSet<Tuple2<String, Integer>>
> wordCounts = text
>            .flatMap(new LineSplitter())
>            .groupBy(0)
>            .sum(1);
>
>        wordCounts.print();
>    }
>
>    public static class LineSplitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
>        public void flatMap(String line, Collector<Tuple2<String, Integer>>
> out) {
>            for (String word : line.split(" ")) {
>                out.collect(new Tuple2<String, Integer>(word, 1));
>            }
>        }
>    }
>
> .....
> Program works fine
> 3.Now we are trying to run this program remotely , from windows machine
> when the first row looks differently
> ExecutionEnvironment envRemote
> =ExecutionEnvironment.createRemoteEnvironment("1.2.3.4", 6123,
> "C:\\HananTestProj.jar");
>  when 1.2.3.4   is IP address of fink machine
>
> 4.We got an exception :Jobmanager at 1.2.3.4 cant be reached bla bla bla
>
> 5.in flink configuration we found a following line jobmanager.rpc.address:
> localhost
> Flink cant be started with any other value (hostname/ipaddress ) except the
> localhost
>
>
> 6.We assume that probably Flink has a critical bug : it cant be started
> from remote machine, only locally. Are we right? Are we wrong? Should we
> fill JIRA ?
> Maybe we need somehow to configure Flink differently?
>
> Please advice
> Best regards
>
>
>
> On Sun, Aug 30, 2015 at 3:19 PM, Robert Metzger <[hidden email]>
> wrote:
>
> > The output of the YARN session should look like this:
> >
> > Flink JobManager is now running on quickstart.cloudera:39956
> > JobManager Web Interface:
> > http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/
> > Number of connected TaskManagers changed to 1. Slots available: 1
> >
> >
> >
> >
> > On Sun, Aug 30, 2015 at 11:12 AM, Stephan Ewen <[hidden email]> wrote:
> >
> > > The only thing I can think of is that you are not using the right
> > host/port
> > > for the JobManager.
> > >
> > > When you start the YARN session, it should print the host where the
> > > JobManager runs. You also need to take the port from there, as in YARN,
> > the
> > > port is usually not 6123. Yarn starts many services on one machine, so
> > the
> > > ports need to be randomized.
> > >
> > > It may be worth adding a YARNExecutionEnvironment at some point, which
> > > deals with this transparent (starting the YARN cluster, connecting to
> the
> > > JobManager).
> > >
> > > On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer <[hidden email]>
> > wrote:
> > >
> > > > Hello.
> > > > Let me clarify the situation.
> > > > 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS
> > > 2.7.1.
> > > > 2. Locally, our program is working: once we run flink as
> > > ./start-local.sh,
> > > > we are able to connect and run the createRemoteEnvironment and
> Execute
> > > > methods.
> > > > 3.Due to our architecture and basic Flink feature we want to invoke
> > this
> > > > functionality REMOTELY , when our Java code is calling the Flink
> > methods
> > > > from another server.
> > > > 4.We tried both
> ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
> > > > 6123, "TestProj.jar"); and
> > ExecutionEnvironment.createRemoteEnvironment("
> > > > flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not
> right
> > > > since
> > > > it should be an IP address) - it crash on the "cant reach JobManager"
> > > > error.
> > > >
> > > > It seems to us that it can be  one of 2 issues.
> > > > 1.Somehow we need to configure flink to accept the connections from
> the
> > > > remote machine
> > > > 2.Flink has a critical showstopper bug that jeopardizing a whole
> > decision
> > > > to use this technology.
> > > >
> > > > Please advise us how we should advance.
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > in the exception you've posted earlier, you can see the following
> > root
> > > > > cause:
> > > > >
> > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > > Path(/user/jobmanager)]
> > > > >
> > > > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually
> looks
> > > like
> > > > > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> > > > > passing FLINK_SERVER_URL
> > > > > as the server hostname (or ip).
> > > > > Can you pass the correct hostname when you call
> ExecutionEnvironment.
> > > > > createRemoteEnvironment().
> > > > >
> > > > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]>
> > > wrote:
> > > > >
> > > > > > Hi
> > > > > >  I'm currently using flink 0.9.0 which by maven support Hadoop 1
> .
> > > > > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> > > > executePlan(Plan
> > > > > > p) method  instead, I'm getting the same exception
> > > > > >
> > > > > > Hanan
> > > > > >
> > > > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <[hidden email]
> >
> > > > wrote:
> > > > > >
> > > > > > >
> > > > > > > Hi
> > > > > > >
> > > > > > > 1. I have restarted Flink service via stop/start-loval.sh - it
> > have
> > > > > been
> > > > > > > restarted successfully ,no errors in log folder
> > > > > > > 2. default flink port is -6123
> > > > > > >
> > > > > > > Getting this via Eclips IDE:
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > >
> > > > > > > org.apache.flink.client.program.ProgramInvocationException:
> > Failed
> > > to
> > > > > > > resolve JobManager
> > > > > > > at org.apache.flink.client.program.Client.run(Client.java:379)
> > > > > > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > > > > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > > > > at Test.main(Test.java:39)
> > > > > > > Caused by: java.io.IOException: JobManager at
> > > > > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not
> > > > reachable.
> > > > > > > Please make sure that the JobManager is running and its port is
> > > > > > reachable.
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > > > > at org.apache.flink.client.program.Client.run(Client.java:376)
> > > > > > > ... 7 more
> > > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL
> :6123/),
> > > > > > > Path(/user/jobmanager)]
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > > > > at akka.dispatch.BatchingExecutor$
> > > > > > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > > at
> > > > >
> > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > > > > at
> > > > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > > > > at
> > > > > >
> > > >
> > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > > > > at
> > > > > > >
> > > > >
> > >
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > > > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > > > > at
> > akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > > > > at
> > akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > > > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > > > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > > > > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > > > > at
> > > > > > >
> > > > >
> > >
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > > > > at
> > > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > > > > at
> > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > > > > at
> > > > > >
> > > >
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> > > > [hidden email]>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> I guess you are getting an entire exception after the
> > > > > "org.apache.flink
> > > > > > >> .client.program.ProgramInvocationException: Failed to
> > > > > > >> resolve JobManager".
> > > > > > >> Can you post it here to help us understanding the issue?
> > > > > > >>
> > > > > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > > > > [hidden email]>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Hello all.
> > > > > > >> >
> > > > > > >> > Some clarification: locally everything works great.
> > > > > > >> > However once we run our Flink on remote linux machine and
> try
> > to
> > > > run
> > > > > > the
> > > > > > >> > client program from our machine, using create remote
> > > environment-
> > > > > > Flink
> > > > > > >> > JobManager is raising this exception
> > > > > > >> >
> > > > > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <
> > [hidden email]
> > > >
> > > > > > wrote:
> > > > > > >> >
> > > > > > >> > > If you start the job via the "bin/flink" script, then
> simply
> > > use
> > > > > > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather
> then
> > > > > > creating
> > > > > > >> a
> > > > > > >> > > remote environment manually.
> > > > > > >> > >
> > > > > > >> > > That way, hosts and ports are configured automatically.
> > > > > > >> > >
> > > > > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > > > > [hidden email]>
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > >> Hi,
> > > > > > >> > >>
> > > > > > >> > >> Which values did you use for FLINK_SERVER_URL and
> > FLINK_PORT?
> > > > > > >> > >> Every time you deploy Flink on YARN, the host and port
> > > change,
> > > > > > >> because
> > > > > > >> > the
> > > > > > >> > >> JobManager is started on a different YARN container.
> > > > > > >> > >>
> > > > > > >> > >>
> > > > > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> > > > [hidden email]
> > > > > >
> > > > > > >> > wrote:
> > > > > > >> > >>
> > > > > > >> > >> > Hello All
> > > > > > >> > >> >
> > > > > > >> > >> > When using Eclipse IDE to submit Flink to Yarn single
> > node
> > > > > > cluster
> > > > > > >> I'm
> > > > > > >> > >> > getting :
> > > > > > >> > >> >
> > > "org.apache.flink.client.program.ProgramInvocationException:
> > > > > > >> Failed to
> > > > > > >> > >> > resolve JobManager"
> > > > > > >> > >> >
> > > > > > >> > >> > Using Flink 0.9.0
> > > > > > >> > >> >
> > > > > > >> > >> > The Jar copy a file from one location in Hdfs to
> another
> > > and
> > > > > > works
> > > > > > >> > fine
> > > > > > >> > >> > while executed locally on the single node Yarn cluster
> -
> > > > > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > > > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > > > > >> hdfs://localhost:9000/flink/out.txt
> > > > > > >> > >> >
> > > > > > >> > >> > The code skeleton:
> > > > > > >> > >> >
> > > > > > >> > >> >     ExecutionEnvironment envRemote =
> > > > > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > > > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > > > > >> > >> > DataSet<String> data =
> > > > > > >> > >> >
> > > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > > > > >> > >> >
> data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > > > > >> > >> > envRemote.execute();
> > > > > > >> > >> >
> > > > > > >> > >> >
> > > > > > >> > >> > Please advise,
> > > > > > >> > >> >
> > > > > > >> > >> > Hanan Meyer
> > > > > > >> > >> >
> > > > > > >> > >>
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Alexey Sapozhnikov
Hello Stephan.

We run this Linux machine on Amazon, which I predict, most of the people
will do.
We tried to put "0.0.0.0" or Public IP of the machine- Flink crashes on
start, it doesnt recognize himself.
It is very strange that it doesnt work with 0.0.0.0- basically this is a
way in Java to make it being seen widely.
We tried to put there the hostname - what you get from "hostname" command.
It crashes.
It works only with "localhost" and works only locally
So what you suggest we will put there so the remote client could connect.
?

Best regards

On Sun, Aug 30, 2015 at 4:34 PM, Stephan Ewen <[hidden email]> wrote:

> Do you start Flink via YARN? In that case the "jobmanager.rpc.address" is
> not used, because YARN assigns containers/nodes.
>
> If you start Flink in "standalone" mode, this should be the address of the
> node that runs the JobManager. It will be used as the host/IP that Flink
> binds to. The same host should then be used in the RemoteEnvironment.
>
> Stephan
>
>
> On Sun, Aug 30, 2015 at 3:25 PM, Hanan Meyer <[hidden email]> wrote:
>
> > Hello all.
> >
> > Firstly- thank you for your valuable advices.
> > We did some very fine tuned pinpoint test and comes to following
> > conclusions
> >
> > 1.We run on Ubuntu 14 flink for hadoop 2.7
> > 2.Once we copy our Java client program directy to the machine and run it
> > directly there it worked very good
> > The program is
> >
> > .....
> >
> > ExecutionEnvironment envRemote
> > =ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
> > "\usr\local\HananTestProj.jar");
> >
> >
> > org.apache.flink.api.java.DataSet text =
> > (org.apache.flink.api.java.DataSet) envRemote.fromElements(
> >            "Who's there?",
> >            "I think I hear them. Stand, ho! Who's there?");
> >
> >        org.apache.flink.api.java.DataSet<Tuple2<String, Integer>>
> > wordCounts = text
> >            .flatMap(new LineSplitter())
> >            .groupBy(0)
> >            .sum(1);
> >
> >        wordCounts.print();
> >    }
> >
> >    public static class LineSplitter implements FlatMapFunction<String,
> > Tuple2<String, Integer>> {
> >        public void flatMap(String line, Collector<Tuple2<String,
> Integer>>
> > out) {
> >            for (String word : line.split(" ")) {
> >                out.collect(new Tuple2<String, Integer>(word, 1));
> >            }
> >        }
> >    }
> >
> > .....
> > Program works fine
> > 3.Now we are trying to run this program remotely , from windows machine
> > when the first row looks differently
> > ExecutionEnvironment envRemote
> > =ExecutionEnvironment.createRemoteEnvironment("1.2.3.4", 6123,
> > "C:\\HananTestProj.jar");
> >  when 1.2.3.4   is IP address of fink machine
> >
> > 4.We got an exception :Jobmanager at 1.2.3.4 cant be reached bla bla bla
> >
> > 5.in flink configuration we found a following line
> jobmanager.rpc.address:
> > localhost
> > Flink cant be started with any other value (hostname/ipaddress ) except
> the
> > localhost
> >
> >
> > 6.We assume that probably Flink has a critical bug : it cant be started
> > from remote machine, only locally. Are we right? Are we wrong? Should we
> > fill JIRA ?
> > Maybe we need somehow to configure Flink differently?
> >
> > Please advice
> > Best regards
> >
> >
> >
> > On Sun, Aug 30, 2015 at 3:19 PM, Robert Metzger <[hidden email]>
> > wrote:
> >
> > > The output of the YARN session should look like this:
> > >
> > > Flink JobManager is now running on quickstart.cloudera:39956
> > > JobManager Web Interface:
> > > http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/
> > > Number of connected TaskManagers changed to 1. Slots available: 1
> > >
> > >
> > >
> > >
> > > On Sun, Aug 30, 2015 at 11:12 AM, Stephan Ewen <[hidden email]>
> wrote:
> > >
> > > > The only thing I can think of is that you are not using the right
> > > host/port
> > > > for the JobManager.
> > > >
> > > > When you start the YARN session, it should print the host where the
> > > > JobManager runs. You also need to take the port from there, as in
> YARN,
> > > the
> > > > port is usually not 6123. Yarn starts many services on one machine,
> so
> > > the
> > > > ports need to be randomized.
> > > >
> > > > It may be worth adding a YARNExecutionEnvironment at some point,
> which
> > > > deals with this transparent (starting the YARN cluster, connecting to
> > the
> > > > JobManager).
> > > >
> > > > On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer <[hidden email]>
> > > wrote:
> > > >
> > > > > Hello.
> > > > > Let me clarify the situation.
> > > > > 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS
> > > > 2.7.1.
> > > > > 2. Locally, our program is working: once we run flink as
> > > > ./start-local.sh,
> > > > > we are able to connect and run the createRemoteEnvironment and
> > Execute
> > > > > methods.
> > > > > 3.Due to our architecture and basic Flink feature we want to invoke
> > > this
> > > > > functionality REMOTELY , when our Java code is calling the Flink
> > > methods
> > > > > from another server.
> > > > > 4.We tried both
> > ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
> > > > > 6123, "TestProj.jar"); and
> > > ExecutionEnvironment.createRemoteEnvironment("
> > > > > flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not
> > right
> > > > > since
> > > > > it should be an IP address) - it crash on the "cant reach
> JobManager"
> > > > > error.
> > > > >
> > > > > It seems to us that it can be  one of 2 issues.
> > > > > 1.Somehow we need to configure flink to accept the connections from
> > the
> > > > > remote machine
> > > > > 2.Flink has a critical showstopper bug that jeopardizing a whole
> > > decision
> > > > > to use this technology.
> > > > >
> > > > > Please advise us how we should advance.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <
> > [hidden email]>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > in the exception you've posted earlier, you can see the following
> > > root
> > > > > > cause:
> > > > > >
> > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > > > Path(/user/jobmanager)]
> > > > > >
> > > > > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually
> > looks
> > > > like
> > > > > > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> > > > > > passing FLINK_SERVER_URL
> > > > > > as the server hostname (or ip).
> > > > > > Can you pass the correct hostname when you call
> > ExecutionEnvironment.
> > > > > > createRemoteEnvironment().
> > > > > >
> > > > > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <[hidden email]
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi
> > > > > > >  I'm currently using flink 0.9.0 which by maven support Hadoop
> 1
> > .
> > > > > > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> > > > > executePlan(Plan
> > > > > > > p) method  instead, I'm getting the same exception
> > > > > > >
> > > > > > > Hanan
> > > > > > >
> > > > > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <
> [hidden email]
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > >
> > > > > > > > Hi
> > > > > > > >
> > > > > > > > 1. I have restarted Flink service via stop/start-loval.sh -
> it
> > > have
> > > > > > been
> > > > > > > > restarted successfully ,no errors in log folder
> > > > > > > > 2. default flink port is -6123
> > > > > > > >
> > > > > > > > Getting this via Eclips IDE:
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > >
> > > > > > > >
> > > > > > > > org.apache.flink.client.program.ProgramInvocationException:
> > > Failed
> > > > to
> > > > > > > > resolve JobManager
> > > > > > > > at
> org.apache.flink.client.program.Client.run(Client.java:379)
> > > > > > > > at
> org.apache.flink.client.program.Client.run(Client.java:356)
> > > > > > > > at
> org.apache.flink.client.program.Client.run(Client.java:349)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > > > > > at Test.main(Test.java:39)
> > > > > > > > Caused by: java.io.IOException: JobManager at
> > > > > > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not
> > > > > reachable.
> > > > > > > > Please make sure that the JobManager is running and its port
> is
> > > > > > > reachable.
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > > > > > at
> org.apache.flink.client.program.Client.run(Client.java:376)
> > > > > > > > ... 7 more
> > > > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL
> > :6123/),
> > > > > > > > Path(/user/jobmanager)]
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > > > > > at
> scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > > > > > at akka.dispatch.BatchingExecutor$
> > > > > > > >
> Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > > > at
> > > > > >
> > > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > > > > > at
> > > > > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > > > > > at
> > > > > > >
> > > > >
> > >
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > > > > > at
> > > > > > > >
> > > > > >
> > > >
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > > > > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > > > > > at
> > > akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > > > > > at
> > > akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > > > > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > > > > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > > > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > > > > > at
> akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > > > > > at
> > > > > > > >
> > > > > >
> > > >
> > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > > > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > > > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > > > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > > > > > at
> > > > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > > > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > > > > > at
> > > > >
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > > > > > at
> > > > > > >
> > > > >
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > > > > > at
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> > > > > [hidden email]>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> I guess you are getting an entire exception after the
> > > > > > "org.apache.flink
> > > > > > > >> .client.program.ProgramInvocationException: Failed to
> > > > > > > >> resolve JobManager".
> > > > > > > >> Can you post it here to help us understanding the issue?
> > > > > > > >>
> > > > > > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > > > > > [hidden email]>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > Hello all.
> > > > > > > >> >
> > > > > > > >> > Some clarification: locally everything works great.
> > > > > > > >> > However once we run our Flink on remote linux machine and
> > try
> > > to
> > > > > run
> > > > > > > the
> > > > > > > >> > client program from our machine, using create remote
> > > > environment-
> > > > > > > Flink
> > > > > > > >> > JobManager is raising this exception
> > > > > > > >> >
> > > > > > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <
> > > [hidden email]
> > > > >
> > > > > > > wrote:
> > > > > > > >> >
> > > > > > > >> > > If you start the job via the "bin/flink" script, then
> > simply
> > > > use
> > > > > > > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather
> > then
> > > > > > > creating
> > > > > > > >> a
> > > > > > > >> > > remote environment manually.
> > > > > > > >> > >
> > > > > > > >> > > That way, hosts and ports are configured automatically.
> > > > > > > >> > >
> > > > > > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > > > > > [hidden email]>
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > >> Hi,
> > > > > > > >> > >>
> > > > > > > >> > >> Which values did you use for FLINK_SERVER_URL and
> > > FLINK_PORT?
> > > > > > > >> > >> Every time you deploy Flink on YARN, the host and port
> > > > change,
> > > > > > > >> because
> > > > > > > >> > the
> > > > > > > >> > >> JobManager is started on a different YARN container.
> > > > > > > >> > >>
> > > > > > > >> > >>
> > > > > > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> > > > > [hidden email]
> > > > > > >
> > > > > > > >> > wrote:
> > > > > > > >> > >>
> > > > > > > >> > >> > Hello All
> > > > > > > >> > >> >
> > > > > > > >> > >> > When using Eclipse IDE to submit Flink to Yarn single
> > > node
> > > > > > > cluster
> > > > > > > >> I'm
> > > > > > > >> > >> > getting :
> > > > > > > >> > >> >
> > > > "org.apache.flink.client.program.ProgramInvocationException:
> > > > > > > >> Failed to
> > > > > > > >> > >> > resolve JobManager"
> > > > > > > >> > >> >
> > > > > > > >> > >> > Using Flink 0.9.0
> > > > > > > >> > >> >
> > > > > > > >> > >> > The Jar copy a file from one location in Hdfs to
> > another
> > > > and
> > > > > > > works
> > > > > > > >> > fine
> > > > > > > >> > >> > while executed locally on the single node Yarn
> cluster
> > -
> > > > > > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > > > > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > > > > > >> hdfs://localhost:9000/flink/out.txt
> > > > > > > >> > >> >
> > > > > > > >> > >> > The code skeleton:
> > > > > > > >> > >> >
> > > > > > > >> > >> >     ExecutionEnvironment envRemote =
> > > > > > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > > > > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > > > > > >> > >> > DataSet<String> data =
> > > > > > > >> > >> >
> > > > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > > > > > >> > >> >
> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > > > > > >> > >> > envRemote.execute();
> > > > > > > >> > >> >
> > > > > > > >> > >> >
> > > > > > > >> > >> > Please advise,
> > > > > > > >> > >> >
> > > > > > > >> > >> > Hanan Meyer
> > > > > > > >> > >> >
> > > > > > > >> > >>
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>



--

*Regards*

*Alexey Sapozhnikov*
CTO& Co-Founder
Scalabillit Inc
Aba Even 10-C, Herzelia, Israel
M : +972-52-2363823
E : [hidden email]
W : http://www.scalabill.it
YT - https://youtu.be/9Rj309PTOFA
Map:http://mapta.gs/Scalabillit
Revolutionizing Proof-of-Concept
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Robert Metzger
How is Flink crashing when you start it on the Linux machine in Amazon?

Can you post the exception here?

On Sun, Aug 30, 2015 at 3:48 PM, Alexey Sapozhnikov <[hidden email]>
wrote:

> Hello Stephan.
>
> We run this Linux machine on Amazon, which I predict, most of the people
> will do.
> We tried to put "0.0.0.0" or Public IP of the machine- Flink crashes on
> start, it doesnt recognize himself.
> It is very strange that it doesnt work with 0.0.0.0- basically this is a
> way in Java to make it being seen widely.
> We tried to put there the hostname - what you get from "hostname" command.
> It crashes.
> It works only with "localhost" and works only locally
> So what you suggest we will put there so the remote client could connect.
> ?
>
> Best regards
>
> On Sun, Aug 30, 2015 at 4:34 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Do you start Flink via YARN? In that case the "jobmanager.rpc.address" is
> > not used, because YARN assigns containers/nodes.
> >
> > If you start Flink in "standalone" mode, this should be the address of
> the
> > node that runs the JobManager. It will be used as the host/IP that Flink
> > binds to. The same host should then be used in the RemoteEnvironment.
> >
> > Stephan
> >
> >
> > On Sun, Aug 30, 2015 at 3:25 PM, Hanan Meyer <[hidden email]> wrote:
> >
> > > Hello all.
> > >
> > > Firstly- thank you for your valuable advices.
> > > We did some very fine tuned pinpoint test and comes to following
> > > conclusions
> > >
> > > 1.We run on Ubuntu 14 flink for hadoop 2.7
> > > 2.Once we copy our Java client program directy to the machine and run
> it
> > > directly there it worked very good
> > > The program is
> > >
> > > .....
> > >
> > > ExecutionEnvironment envRemote
> > > =ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
> > > "\usr\local\HananTestProj.jar");
> > >
> > >
> > > org.apache.flink.api.java.DataSet text =
> > > (org.apache.flink.api.java.DataSet) envRemote.fromElements(
> > >            "Who's there?",
> > >            "I think I hear them. Stand, ho! Who's there?");
> > >
> > >        org.apache.flink.api.java.DataSet<Tuple2<String, Integer>>
> > > wordCounts = text
> > >            .flatMap(new LineSplitter())
> > >            .groupBy(0)
> > >            .sum(1);
> > >
> > >        wordCounts.print();
> > >    }
> > >
> > >    public static class LineSplitter implements FlatMapFunction<String,
> > > Tuple2<String, Integer>> {
> > >        public void flatMap(String line, Collector<Tuple2<String,
> > Integer>>
> > > out) {
> > >            for (String word : line.split(" ")) {
> > >                out.collect(new Tuple2<String, Integer>(word, 1));
> > >            }
> > >        }
> > >    }
> > >
> > > .....
> > > Program works fine
> > > 3.Now we are trying to run this program remotely , from windows machine
> > > when the first row looks differently
> > > ExecutionEnvironment envRemote
> > > =ExecutionEnvironment.createRemoteEnvironment("1.2.3.4", 6123,
> > > "C:\\HananTestProj.jar");
> > >  when 1.2.3.4   is IP address of fink machine
> > >
> > > 4.We got an exception :Jobmanager at 1.2.3.4 cant be reached bla bla
> bla
> > >
> > > 5.in flink configuration we found a following line
> > jobmanager.rpc.address:
> > > localhost
> > > Flink cant be started with any other value (hostname/ipaddress ) except
> > the
> > > localhost
> > >
> > >
> > > 6.We assume that probably Flink has a critical bug : it cant be started
> > > from remote machine, only locally. Are we right? Are we wrong? Should
> we
> > > fill JIRA ?
> > > Maybe we need somehow to configure Flink differently?
> > >
> > > Please advice
> > > Best regards
> > >
> > >
> > >
> > > On Sun, Aug 30, 2015 at 3:19 PM, Robert Metzger <[hidden email]>
> > > wrote:
> > >
> > > > The output of the YARN session should look like this:
> > > >
> > > > Flink JobManager is now running on quickstart.cloudera:39956
> > > > JobManager Web Interface:
> > > >
> http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/
> > > > Number of connected TaskManagers changed to 1. Slots available: 1
> > > >
> > > >
> > > >
> > > >
> > > > On Sun, Aug 30, 2015 at 11:12 AM, Stephan Ewen <[hidden email]>
> > wrote:
> > > >
> > > > > The only thing I can think of is that you are not using the right
> > > > host/port
> > > > > for the JobManager.
> > > > >
> > > > > When you start the YARN session, it should print the host where the
> > > > > JobManager runs. You also need to take the port from there, as in
> > YARN,
> > > > the
> > > > > port is usually not 6123. Yarn starts many services on one machine,
> > so
> > > > the
> > > > > ports need to be randomized.
> > > > >
> > > > > It may be worth adding a YARNExecutionEnvironment at some point,
> > which
> > > > > deals with this transparent (starting the YARN cluster, connecting
> to
> > > the
> > > > > JobManager).
> > > > >
> > > > > On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer <[hidden email]>
> > > > wrote:
> > > > >
> > > > > > Hello.
> > > > > > Let me clarify the situation.
> > > > > > 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to
> HDFS
> > > > > 2.7.1.
> > > > > > 2. Locally, our program is working: once we run flink as
> > > > > ./start-local.sh,
> > > > > > we are able to connect and run the createRemoteEnvironment and
> > > Execute
> > > > > > methods.
> > > > > > 3.Due to our architecture and basic Flink feature we want to
> invoke
> > > > this
> > > > > > functionality REMOTELY , when our Java code is calling the Flink
> > > > methods
> > > > > > from another server.
> > > > > > 4.We tried both
> > > ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
> > > > > > 6123, "TestProj.jar"); and
> > > > ExecutionEnvironment.createRemoteEnvironment("
> > > > > > flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not
> > > right
> > > > > > since
> > > > > > it should be an IP address) - it crash on the "cant reach
> > JobManager"
> > > > > > error.
> > > > > >
> > > > > > It seems to us that it can be  one of 2 issues.
> > > > > > 1.Somehow we need to configure flink to accept the connections
> from
> > > the
> > > > > > remote machine
> > > > > > 2.Flink has a critical showstopper bug that jeopardizing a whole
> > > > decision
> > > > > > to use this technology.
> > > > > >
> > > > > > Please advise us how we should advance.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <
> > > [hidden email]>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > in the exception you've posted earlier, you can see the
> following
> > > > root
> > > > > > > cause:
> > > > > > >
> > > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL
> :6123/),
> > > > > > > Path(/user/jobmanager)]
> > > > > > >
> > > > > > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually
> > > looks
> > > > > like
> > > > > > > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you
> are
> > > > > > > passing FLINK_SERVER_URL
> > > > > > > as the server hostname (or ip).
> > > > > > > Can you pass the correct hostname when you call
> > > ExecutionEnvironment.
> > > > > > > createRemoteEnvironment().
> > > > > > >
> > > > > > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <
> [hidden email]
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi
> > > > > > > >  I'm currently using flink 0.9.0 which by maven support
> Hadoop
> > 1
> > > .
> > > > > > > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> > > > > > executePlan(Plan
> > > > > > > > p) method  instead, I'm getting the same exception
> > > > > > > >
> > > > > > > > Hanan
> > > > > > > >
> > > > > > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <
> > [hidden email]
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > >
> > > > > > > > > Hi
> > > > > > > > >
> > > > > > > > > 1. I have restarted Flink service via stop/start-loval.sh -
> > it
> > > > have
> > > > > > > been
> > > > > > > > > restarted successfully ,no errors in log folder
> > > > > > > > > 2. default flink port is -6123
> > > > > > > > >
> > > > > > > > > Getting this via Eclips IDE:
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > org.apache.flink.client.program.ProgramInvocationException:
> > > > Failed
> > > > > to
> > > > > > > > > resolve JobManager
> > > > > > > > > at
> > org.apache.flink.client.program.Client.run(Client.java:379)
> > > > > > > > > at
> > org.apache.flink.client.program.Client.run(Client.java:356)
> > > > > > > > > at
> > org.apache.flink.client.program.Client.run(Client.java:349)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > > > > > > at Test.main(Test.java:39)
> > > > > > > > > Caused by: java.io.IOException: JobManager at
> > > > > > > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not
> > > > > > reachable.
> > > > > > > > > Please make sure that the JobManager is running and its
> port
> > is
> > > > > > > > reachable.
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > > > > > > at
> > org.apache.flink.client.program.Client.run(Client.java:376)
> > > > > > > > > ... 7 more
> > > > > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL
> > > :6123/),
> > > > > > > > > Path(/user/jobmanager)]
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > > > > > > at
> > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > > > > > > at akka.dispatch.BatchingExecutor$
> > > > > > > > >
> > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > > > > at
> > > > > > >
> > > >
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > > > > > > at
> > > > > >
> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > > > > > > at
> > > > > > > >
> > > > > >
> > > >
> > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > > > > > > at
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > > > > > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > > > > > > at
> > > > akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > > > > > > at
> > > > akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > > > > > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > > > > > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > > > > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > > > > > > at
> > akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > > > > > > at
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > > > > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > > > > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > > > > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > > > > > > at
> > > > > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > > > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > > > > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > > > > > > at
> > > > > >
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > > > > > > at
> > > > > > > >
> > > > > >
> > > >
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> > > > > > [hidden email]>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> I guess you are getting an entire exception after the
> > > > > > > "org.apache.flink
> > > > > > > > >> .client.program.ProgramInvocationException: Failed to
> > > > > > > > >> resolve JobManager".
> > > > > > > > >> Can you post it here to help us understanding the issue?
> > > > > > > > >>
> > > > > > > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > > > > > > [hidden email]>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> > Hello all.
> > > > > > > > >> >
> > > > > > > > >> > Some clarification: locally everything works great.
> > > > > > > > >> > However once we run our Flink on remote linux machine
> and
> > > try
> > > > to
> > > > > > run
> > > > > > > > the
> > > > > > > > >> > client program from our machine, using create remote
> > > > > environment-
> > > > > > > > Flink
> > > > > > > > >> > JobManager is raising this exception
> > > > > > > > >> >
> > > > > > > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <
> > > > [hidden email]
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > If you start the job via the "bin/flink" script, then
> > > simply
> > > > > use
> > > > > > > > >> > > "ExecutionEnvironment.getExecutionEnvironment()"
> rather
> > > then
> > > > > > > > creating
> > > > > > > > >> a
> > > > > > > > >> > > remote environment manually.
> > > > > > > > >> > >
> > > > > > > > >> > > That way, hosts and ports are configured
> automatically.
> > > > > > > > >> > >
> > > > > > > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > > > > > > [hidden email]>
> > > > > > > > >> > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > >> Hi,
> > > > > > > > >> > >>
> > > > > > > > >> > >> Which values did you use for FLINK_SERVER_URL and
> > > > FLINK_PORT?
> > > > > > > > >> > >> Every time you deploy Flink on YARN, the host and
> port
> > > > > change,
> > > > > > > > >> because
> > > > > > > > >> > the
> > > > > > > > >> > >> JobManager is started on a different YARN container.
> > > > > > > > >> > >>
> > > > > > > > >> > >>
> > > > > > > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> > > > > > [hidden email]
> > > > > > > >
> > > > > > > > >> > wrote:
> > > > > > > > >> > >>
> > > > > > > > >> > >> > Hello All
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > When using Eclipse IDE to submit Flink to Yarn
> single
> > > > node
> > > > > > > > cluster
> > > > > > > > >> I'm
> > > > > > > > >> > >> > getting :
> > > > > > > > >> > >> >
> > > > > "org.apache.flink.client.program.ProgramInvocationException:
> > > > > > > > >> Failed to
> > > > > > > > >> > >> > resolve JobManager"
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > Using Flink 0.9.0
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > The Jar copy a file from one location in Hdfs to
> > > another
> > > > > and
> > > > > > > > works
> > > > > > > > >> > fine
> > > > > > > > >> > >> > while executed locally on the single node Yarn
> > cluster
> > > -
> > > > > > > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > > > > > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > > > > > > >> hdfs://localhost:9000/flink/out.txt
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > The code skeleton:
> > > > > > > > >> > >> >
> > > > > > > > >> > >> >     ExecutionEnvironment envRemote =
> > > > > > > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > > > > > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > > > > > > >> > >> > DataSet<String> data =
> > > > > > > > >> > >> >
> > > > > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > > > > > > >> > >> >
> > > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > > > > > > >> > >> > envRemote.execute();
> > > > > > > > >> > >> >
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > Please advise,
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > Hanan Meyer
> > > > > > > > >> > >> >
> > > > > > > > >> > >>
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
>
> *Regards*
>
> *Alexey Sapozhnikov*
> CTO& Co-Founder
> Scalabillit Inc
> Aba Even 10-C, Herzelia, Israel
> M : +972-52-2363823
> E : [hidden email]
> W : http://www.scalabill.it
> YT - https://youtu.be/9Rj309PTOFA
> Map:http://mapta.gs/Scalabillit
> Revolutionizing Proof-of-Concept
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink:ProgramInvocationException on Yarn

Stephan Ewen
Flink uses Akka internally, and Akka requires to have exact host/ip
addresses to bind to. Maybe that is the crash you see.

Having the exact exception would help.

On Sun, Aug 30, 2015 at 3:57 PM, Robert Metzger <[hidden email]> wrote:

> How is Flink crashing when you start it on the Linux machine in Amazon?
>
> Can you post the exception here?
>
> On Sun, Aug 30, 2015 at 3:48 PM, Alexey Sapozhnikov <[hidden email]>
> wrote:
>
> > Hello Stephan.
> >
> > We run this Linux machine on Amazon, which I predict, most of the people
> > will do.
> > We tried to put "0.0.0.0" or Public IP of the machine- Flink crashes on
> > start, it doesnt recognize himself.
> > It is very strange that it doesnt work with 0.0.0.0- basically this is a
> > way in Java to make it being seen widely.
> > We tried to put there the hostname - what you get from "hostname"
> command.
> > It crashes.
> > It works only with "localhost" and works only locally
> > So what you suggest we will put there so the remote client could connect.
> > ?
> >
> > Best regards
> >
> > On Sun, Aug 30, 2015 at 4:34 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Do you start Flink via YARN? In that case the "jobmanager.rpc.address"
> is
> > > not used, because YARN assigns containers/nodes.
> > >
> > > If you start Flink in "standalone" mode, this should be the address of
> > the
> > > node that runs the JobManager. It will be used as the host/IP that
> Flink
> > > binds to. The same host should then be used in the RemoteEnvironment.
> > >
> > > Stephan
> > >
> > >
> > > On Sun, Aug 30, 2015 at 3:25 PM, Hanan Meyer <[hidden email]>
> wrote:
> > >
> > > > Hello all.
> > > >
> > > > Firstly- thank you for your valuable advices.
> > > > We did some very fine tuned pinpoint test and comes to following
> > > > conclusions
> > > >
> > > > 1.We run on Ubuntu 14 flink for hadoop 2.7
> > > > 2.Once we copy our Java client program directy to the machine and run
> > it
> > > > directly there it worked very good
> > > > The program is
> > > >
> > > > .....
> > > >
> > > > ExecutionEnvironment envRemote
> > > > =ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
> > > > "\usr\local\HananTestProj.jar");
> > > >
> > > >
> > > > org.apache.flink.api.java.DataSet text =
> > > > (org.apache.flink.api.java.DataSet) envRemote.fromElements(
> > > >            "Who's there?",
> > > >            "I think I hear them. Stand, ho! Who's there?");
> > > >
> > > >        org.apache.flink.api.java.DataSet<Tuple2<String, Integer>>
> > > > wordCounts = text
> > > >            .flatMap(new LineSplitter())
> > > >            .groupBy(0)
> > > >            .sum(1);
> > > >
> > > >        wordCounts.print();
> > > >    }
> > > >
> > > >    public static class LineSplitter implements
> FlatMapFunction<String,
> > > > Tuple2<String, Integer>> {
> > > >        public void flatMap(String line, Collector<Tuple2<String,
> > > Integer>>
> > > > out) {
> > > >            for (String word : line.split(" ")) {
> > > >                out.collect(new Tuple2<String, Integer>(word, 1));
> > > >            }
> > > >        }
> > > >    }
> > > >
> > > > .....
> > > > Program works fine
> > > > 3.Now we are trying to run this program remotely , from windows
> machine
> > > > when the first row looks differently
> > > > ExecutionEnvironment envRemote
> > > > =ExecutionEnvironment.createRemoteEnvironment("1.2.3.4", 6123,
> > > > "C:\\HananTestProj.jar");
> > > >  when 1.2.3.4   is IP address of fink machine
> > > >
> > > > 4.We got an exception :Jobmanager at 1.2.3.4 cant be reached bla bla
> > bla
> > > >
> > > > 5.in flink configuration we found a following line
> > > jobmanager.rpc.address:
> > > > localhost
> > > > Flink cant be started with any other value (hostname/ipaddress )
> except
> > > the
> > > > localhost
> > > >
> > > >
> > > > 6.We assume that probably Flink has a critical bug : it cant be
> started
> > > > from remote machine, only locally. Are we right? Are we wrong? Should
> > we
> > > > fill JIRA ?
> > > > Maybe we need somehow to configure Flink differently?
> > > >
> > > > Please advice
> > > > Best regards
> > > >
> > > >
> > > >
> > > > On Sun, Aug 30, 2015 at 3:19 PM, Robert Metzger <[hidden email]
> >
> > > > wrote:
> > > >
> > > > > The output of the YARN session should look like this:
> > > > >
> > > > > Flink JobManager is now running on quickstart.cloudera:39956
> > > > > JobManager Web Interface:
> > > > >
> > http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/
> > > > > Number of connected TaskManagers changed to 1. Slots available: 1
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Sun, Aug 30, 2015 at 11:12 AM, Stephan Ewen <[hidden email]>
> > > wrote:
> > > > >
> > > > > > The only thing I can think of is that you are not using the right
> > > > > host/port
> > > > > > for the JobManager.
> > > > > >
> > > > > > When you start the YARN session, it should print the host where
> the
> > > > > > JobManager runs. You also need to take the port from there, as in
> > > YARN,
> > > > > the
> > > > > > port is usually not 6123. Yarn starts many services on one
> machine,
> > > so
> > > > > the
> > > > > > ports need to be randomized.
> > > > > >
> > > > > > It may be worth adding a YARNExecutionEnvironment at some point,
> > > which
> > > > > > deals with this transparent (starting the YARN cluster,
> connecting
> > to
> > > > the
> > > > > > JobManager).
> > > > > >
> > > > > > On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer <
> [hidden email]>
> > > > > wrote:
> > > > > >
> > > > > > > Hello.
> > > > > > > Let me clarify the situation.
> > > > > > > 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to
> > HDFS
> > > > > > 2.7.1.
> > > > > > > 2. Locally, our program is working: once we run flink as
> > > > > > ./start-local.sh,
> > > > > > > we are able to connect and run the createRemoteEnvironment and
> > > > Execute
> > > > > > > methods.
> > > > > > > 3.Due to our architecture and basic Flink feature we want to
> > invoke
> > > > > this
> > > > > > > functionality REMOTELY , when our Java code is calling the
> Flink
> > > > > methods
> > > > > > > from another server.
> > > > > > > 4.We tried both
> > > > ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
> > > > > > > 6123, "TestProj.jar"); and
> > > > > ExecutionEnvironment.createRemoteEnvironment("
> > > > > > > flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely
> not
> > > > right
> > > > > > > since
> > > > > > > it should be an IP address) - it crash on the "cant reach
> > > JobManager"
> > > > > > > error.
> > > > > > >
> > > > > > > It seems to us that it can be  one of 2 issues.
> > > > > > > 1.Somehow we need to configure flink to accept the connections
> > from
> > > > the
> > > > > > > remote machine
> > > > > > > 2.Flink has a critical showstopper bug that jeopardizing a
> whole
> > > > > decision
> > > > > > > to use this technology.
> > > > > > >
> > > > > > > Please advise us how we should advance.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <
> > > > [hidden email]>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > in the exception you've posted earlier, you can see the
> > following
> > > > > root
> > > > > > > > cause:
> > > > > > > >
> > > > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL
> > :6123/),
> > > > > > > > Path(/user/jobmanager)]
> > > > > > > >
> > > > > > > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/"
> usually
> > > > looks
> > > > > > like
> > > > > > > > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you
> > are
> > > > > > > > passing FLINK_SERVER_URL
> > > > > > > > as the server hostname (or ip).
> > > > > > > > Can you pass the correct hostname when you call
> > > > ExecutionEnvironment.
> > > > > > > > createRemoteEnvironment().
> > > > > > > >
> > > > > > > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <
> > [hidden email]
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi
> > > > > > > > >  I'm currently using flink 0.9.0 which by maven support
> > Hadoop
> > > 1
> > > > .
> > > > > > > > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> > > > > > > executePlan(Plan
> > > > > > > > > p) method  instead, I'm getting the same exception
> > > > > > > > >
> > > > > > > > > Hanan
> > > > > > > > >
> > > > > > > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <
> > > [hidden email]
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Hi
> > > > > > > > > >
> > > > > > > > > > 1. I have restarted Flink service via
> stop/start-loval.sh -
> > > it
> > > > > have
> > > > > > > > been
> > > > > > > > > > restarted successfully ,no errors in log folder
> > > > > > > > > > 2. default flink port is -6123
> > > > > > > > > >
> > > > > > > > > > Getting this via Eclips IDE:
> > > > > > > > > >
> > > > > > > > > > Thanks
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> org.apache.flink.client.program.ProgramInvocationException:
> > > > > Failed
> > > > > > to
> > > > > > > > > > resolve JobManager
> > > > > > > > > > at
> > > org.apache.flink.client.program.Client.run(Client.java:379)
> > > > > > > > > > at
> > > org.apache.flink.client.program.Client.run(Client.java:356)
> > > > > > > > > > at
> > > org.apache.flink.client.program.Client.run(Client.java:349)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > > > > > > > at Test.main(Test.java:39)
> > > > > > > > > > Caused by: java.io.IOException: JobManager at
> > > > > > > > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager
> not
> > > > > > > reachable.
> > > > > > > > > > Please make sure that the JobManager is running and its
> > port
> > > is
> > > > > > > > > reachable.
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > > > > > > > at
> > > org.apache.flink.client.program.Client.run(Client.java:376)
> > > > > > > > > > ... 7 more
> > > > > > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL
> > > > :6123/),
> > > > > > > > > > Path(/user/jobmanager)]
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > > > > > > > at
> > > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > > > > > > > at akka.dispatch.BatchingExecutor$
> > > > > > > > > >
> > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > > > > > at
> > > > > > > >
> > > > >
> > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > > > > > > > at
> > > > > > >
> > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > > > > > > > at
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > > > > > > > at
> akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > > > > > > > at
> > > > > akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > > > > > > > at
> > > > > akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > > > > > > > at
> akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > > > > > > > at
> akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > > > > > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > > > > > > > at
> > > akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > > > > > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > > > > > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > > > > > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > > > > > > > at
> > > > > > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > > > > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > > > > > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > > > > > > > at
> > > > > > >
> > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > > > > > > > at
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> > > > > > > [hidden email]>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> I guess you are getting an entire exception after the
> > > > > > > > "org.apache.flink
> > > > > > > > > >> .client.program.ProgramInvocationException: Failed to
> > > > > > > > > >> resolve JobManager".
> > > > > > > > > >> Can you post it here to help us understanding the issue?
> > > > > > > > > >>
> > > > > > > > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > > > > > > > [hidden email]>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hello all.
> > > > > > > > > >> >
> > > > > > > > > >> > Some clarification: locally everything works great.
> > > > > > > > > >> > However once we run our Flink on remote linux machine
> > and
> > > > try
> > > > > to
> > > > > > > run
> > > > > > > > > the
> > > > > > > > > >> > client program from our machine, using create remote
> > > > > > environment-
> > > > > > > > > Flink
> > > > > > > > > >> > JobManager is raising this exception
> > > > > > > > > >> >
> > > > > > > > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <
> > > > > [hidden email]
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > If you start the job via the "bin/flink" script,
> then
> > > > simply
> > > > > > use
> > > > > > > > > >> > > "ExecutionEnvironment.getExecutionEnvironment()"
> > rather
> > > > then
> > > > > > > > > creating
> > > > > > > > > >> a
> > > > > > > > > >> > > remote environment manually.
> > > > > > > > > >> > >
> > > > > > > > > >> > > That way, hosts and ports are configured
> > automatically.
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > > > > > > > [hidden email]>
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > >> Hi,
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> Which values did you use for FLINK_SERVER_URL and
> > > > > FLINK_PORT?
> > > > > > > > > >> > >> Every time you deploy Flink on YARN, the host and
> > port
> > > > > > change,
> > > > > > > > > >> because
> > > > > > > > > >> > the
> > > > > > > > > >> > >> JobManager is started on a different YARN
> container.
> > > > > > > > > >> > >>
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> > > > > > > [hidden email]
> > > > > > > > >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> > >>
> > > > > > > > > >> > >> > Hello All
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > When using Eclipse IDE to submit Flink to Yarn
> > single
> > > > > node
> > > > > > > > > cluster
> > > > > > > > > >> I'm
> > > > > > > > > >> > >> > getting :
> > > > > > > > > >> > >> >
> > > > > > "org.apache.flink.client.program.ProgramInvocationException:
> > > > > > > > > >> Failed to
> > > > > > > > > >> > >> > resolve JobManager"
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > Using Flink 0.9.0
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > The Jar copy a file from one location in Hdfs to
> > > > another
> > > > > > and
> > > > > > > > > works
> > > > > > > > > >> > fine
> > > > > > > > > >> > >> > while executed locally on the single node Yarn
> > > cluster
> > > > -
> > > > > > > > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > > > > > > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > > > > > > > >> hdfs://localhost:9000/flink/out.txt
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > The code skeleton:
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> >     ExecutionEnvironment envRemote =
> > > > > > > > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > > > > > > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > > > > > > > >> > >> > DataSet<String> data =
> > > > > > > > > >> > >> >
> > > > > > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > > > > > > > >> > >> >
> > > > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > > > > > > > >> > >> > envRemote.execute();
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > Please advise,
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >> > Hanan Meyer
> > > > > > > > > >> > >> >
> > > > > > > > > >> > >>
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> >
> > *Regards*
> >
> > *Alexey Sapozhnikov*
> > CTO& Co-Founder
> > Scalabillit Inc
> > Aba Even 10-C, Herzelia, Israel
> > M : +972-52-2363823
> > E : [hidden email]
> > W : http://www.scalabill.it
> > YT - https://youtu.be/9Rj309PTOFA
> > Map:http://mapta.gs/Scalabillit
> > Revolutionizing Proof-of-Concept
> >
>
12