Get Flink ExecutionGraph Programmatically

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Get Flink ExecutionGraph Programmatically

Chawla,Sumit
Hi All


I am trying to get JOB  accumulators.  ( I am aware that I can get the
accumulators through REST APIs as well, but i wanted to avoid JSON
parsing).

Looking at JobAccumulatorsHandler i am trying to get execution graph for
currently running job.  Following is my code:

  InetSocketAddress initialJobManagerAddress=new
InetSocketAddress(hostName,port);
            InetAddress ownHostname;
            ownHostname=
ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);

            ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration,
                    new Some(new
Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));

            FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS);

            ActorGateway akkaActorGateway=
LeaderRetrievalUtils.retrieveLeaderGateway(

LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
                    actorSystem,timeout
            );


            Future<Object> future=akkaActorGateway.ask(new
RequestJobDetails(true,false),timeout);

            MultipleJobsDetails result=(MultipleJobsDetails)
Await.result(future,timeout);
            ExecutionGraphHolder executionGraphHolder=new
ExecutionGraphHolder(timeout);
            LOG.info(result.toString());
            for(JobDetails detail:result.getRunningJobs()){
                LOG.info(detail.getJobName() + "  ID " + detail.getJobId());

*                ExecutionGraph
executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);*
               LOG.info("Accumulators " +
executionGraph.aggregateUserAccumulators());
            }


However, i am receiving following error in Flink:

2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
ERROR akka.remote.EndpointWriter - Transient association error (association
remains live)
java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
CheckpointCoordinator
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_92]
        at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at akka.serialization.JavaSerializer$$anonfun$
toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
~[akka-actor_2.10-2.3.7.jar:?]
        at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
~[akka-remote_2.10-2.3.7.jar:?]
        at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
        at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
~[akka-remote_2.10-2.3.7.jar:?]

Any reason why its failing? This code works when invoked through
WebRuntimeMonitor.

Regards
Sumit Chawla
Reply | Threaded
Open this post in threaded view
|

Re: Get Flink ExecutionGraph Programmatically

Chesnay Schepler-3
Hello,

this is a rather subtle issue you stumbled upon here.

The ExecutionGraph is not serializable. The only reason why the
WebInterface can access it is because it runs in the same JVM as the
JobManager.

I'm not sure if there is a way for what you are trying to do.

Regards,
Chesnay

On 21.09.2016 06:11, Chawla,Sumit wrote:

> Hi All
>
>
> I am trying to get JOB  accumulators.  ( I am aware that I can get the
> accumulators through REST APIs as well, but i wanted to avoid JSON
> parsing).
>
> Looking at JobAccumulatorsHandler i am trying to get execution graph for
> currently running job.  Following is my code:
>
>    InetSocketAddress initialJobManagerAddress=new
> InetSocketAddress(hostName,port);
>              InetAddress ownHostname;
>              ownHostname=
> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);
>
>              ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration,
>                      new Some(new
> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
>
>              FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS);
>
>              ActorGateway akkaActorGateway=
> LeaderRetrievalUtils.retrieveLeaderGateway(
>
> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
>                      actorSystem,timeout
>              );
>
>
>              Future<Object> future=akkaActorGateway.ask(new
> RequestJobDetails(true,false),timeout);
>
>              MultipleJobsDetails result=(MultipleJobsDetails)
> Await.result(future,timeout);
>              ExecutionGraphHolder executionGraphHolder=new
> ExecutionGraphHolder(timeout);
>              LOG.info(result.toString());
>              for(JobDetails detail:result.getRunningJobs()){
>                  LOG.info(detail.getJobName() + "  ID " + detail.getJobId());
>
> *                ExecutionGraph
> executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);*
>                 LOG.info("Accumulators " +
> executionGraph.aggregateUserAccumulators());
>              }
>
>
> However, i am receiving following error in Flink:
>
> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
> ERROR akka.remote.EndpointWriter - Transient association error (association
> remains live)
> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator
>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_92]
>          at akka.serialization.JavaSerializer$$anonfun$
> toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>          at akka.serialization.JavaSerializer$$anonfun$
> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>          at akka.serialization.JavaSerializer$$anonfun$
> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> ~[scala-library-2.10.5.jar:?]
>          at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
> ~[akka-actor_2.10-2.3.7.jar:?]
>          at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
> ~[akka-remote_2.10-2.3.7.jar:?]
>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
> ~[akka-remote_2.10-2.3.7.jar:?]
>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
> ~[akka-remote_2.10-2.3.7.jar:?]
>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> ~[scala-library-2.10.5.jar:?]
>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
> ~[akka-remote_2.10-2.3.7.jar:?]
>
> Any reason why its failing? This code works when invoked through
> WebRuntimeMonitor.
>
> Regards
> Sumit Chawla
>

Reply | Threaded
Open this post in threaded view
|

Re: Get Flink ExecutionGraph Programmatically

Chawla,Sumit
Hi Chesney

I am actually running this code in the same JVM as the WebInterface and
JobManager.  I am programmatically, starting the JobManager. and  then
running this code in same JVM to query metrics.  Only difference could be
that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if it
forces it to execute the code as if request is coming over the wire.  I am
not very well aware of Akka internals, so may be somebody can shed some
light on it.

Regards
Sumit Chawla


On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <[hidden email]>
wrote:

> Hello,
>
> this is a rather subtle issue you stumbled upon here.
>
> The ExecutionGraph is not serializable. The only reason why the
> WebInterface can access it is because it runs in the same JVM as the
> JobManager.
>
> I'm not sure if there is a way for what you are trying to do.
>
> Regards,
> Chesnay
>
>
> On 21.09.2016 06:11, Chawla,Sumit wrote:
>
>> Hi All
>>
>>
>> I am trying to get JOB  accumulators.  ( I am aware that I can get the
>> accumulators through REST APIs as well, but i wanted to avoid JSON
>> parsing).
>>
>> Looking at JobAccumulatorsHandler i am trying to get execution graph for
>> currently running job.  Following is my code:
>>
>>    InetSocketAddress initialJobManagerAddress=new
>> InetSocketAddress(hostName,port);
>>              InetAddress ownHostname;
>>              ownHostname=
>> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);
>>
>>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
>> nfiguration,
>>                      new Some(new
>> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
>>
>>              FiniteDuration timeout= FiniteDuration.apply(10,
>> TimeUnit.SECONDS);
>>
>>              ActorGateway akkaActorGateway=
>> LeaderRetrievalUtils.retrieveLeaderGateway(
>>
>> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
>>                      actorSystem,timeout
>>              );
>>
>>
>>              Future<Object> future=akkaActorGateway.ask(new
>> RequestJobDetails(true,false),timeout);
>>
>>              MultipleJobsDetails result=(MultipleJobsDetails)
>> Await.result(future,timeout);
>>              ExecutionGraphHolder executionGraphHolder=new
>> ExecutionGraphHolder(timeout);
>>              LOG.info(result.toString());
>>              for(JobDetails detail:result.getRunningJobs()){
>>                  LOG.info(detail.getJobName() + "  ID " +
>> detail.getJobId());
>>
>> *                ExecutionGraph
>> executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),
>> akkaActorGateway);*
>>
>>                 LOG.info("Accumulators " +
>> executionGraph.aggregateUserAccumulators());
>>              }
>>
>>
>> However, i am receiving following error in Flink:
>>
>> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
>> ERROR akka.remote.EndpointWriter - Transient association error
>> (association
>> remains live)
>> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
>> CheckpointCoordinator
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1184)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.
>> java:348)
>> ~[?:1.8.0_92]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply$mcV$sp(Serializer.scala:129)
>> ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
>> 57)
>> ~[scala-library-2.10.5.jar:?]
>>          at akka.serialization.JavaSerializer.toBinary(Serializer.scala:
>> 129)
>> ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.remote.MessageSerializer$.serialize(MessageSerializer.s
>> cala:36)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:845)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:845)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
>> 57)
>> ~[scala-library-2.10.5.jar:?]
>>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:
>> 844)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>
>> Any reason why its failing? This code works when invoked through
>> WebRuntimeMonitor.
>>
>> Regards
>> Sumit Chawla
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Get Flink ExecutionGraph Programmatically

amir bahmanyari
My only 2 cents is that when I started to turn the mem pre-allocation param, to true & #slots & #buffers....I started to get all kinds of Akka & Disassociated exceptions thrown by the JM regarding the TMs...So yes, since I am also not well aware of Akka internals...I went back to my previous config & continued with turning knobs that wouldn't cause Akka exceptions.Thanks+regardsAmir-

      From: "Chawla,Sumit" <[hidden email]>
 To: [hidden email]
 Sent: Wednesday, September 21, 2016 11:08 AM
 Subject: Re: Get Flink ExecutionGraph Programmatically
   
Hi Chesney

I am actually running this code in the same JVM as the WebInterface and
JobManager.  I am programmatically, starting the JobManager. and  then
running this code in same JVM to query metrics.  Only difference could be
that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if it
forces it to execute the code as if request is coming over the wire.  I am
not very well aware of Akka internals, so may be somebody can shed some
light on it.

Regards
Sumit Chawla


On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <[hidden email]>
wrote:

> Hello,
>
> this is a rather subtle issue you stumbled upon here.
>
> The ExecutionGraph is not serializable. The only reason why the
> WebInterface can access it is because it runs in the same JVM as the
> JobManager.
>
> I'm not sure if there is a way for what you are trying to do.
>
> Regards,
> Chesnay
>
>
> On 21.09.2016 06:11, Chawla,Sumit wrote:
>
>> Hi All
>>
>>
>> I am trying to get JOB  accumulators.  ( I am aware that I can get the
>> accumulators through REST APIs as well, but i wanted to avoid JSON
>> parsing).
>>
>> Looking at JobAccumulatorsHandler i am trying to get execution graph for
>> currently running job.  Following is my code:
>>
>>    InetSocketAddress initialJobManagerAddress=new
>> InetSocketAddress(hostName,port);
>>              InetAddress ownHostname;
>>              ownHostname=
>> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);
>>
>>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
>> nfiguration,
>>                      new Some(new
>> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
>>
>>              FiniteDuration timeout= FiniteDuration.apply(10,
>> TimeUnit.SECONDS);
>>
>>              ActorGateway akkaActorGateway=
>> LeaderRetrievalUtils.retrieveLeaderGateway(
>>
>> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
>>                      actorSystem,timeout
>>              );
>>
>>
>>              Future<Object> future=akkaActorGateway.ask(new
>> RequestJobDetails(true,false),timeout);
>>
>>              MultipleJobsDetails result=(MultipleJobsDetails)
>> Await.result(future,timeout);
>>              ExecutionGraphHolder executionGraphHolder=new
>> ExecutionGraphHolder(timeout);
>>              LOG.info(result.toString());
>>              for(JobDetails detail:result.getRunningJobs()){
>>                  LOG.info(detail.getJobName() + "  ID " +
>> detail.getJobId());
>>
>> *                ExecutionGraph
>> executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),
>> akkaActorGateway);*
>>
>>                LOG.info("Accumulators " +
>> executionGraph.aggregateUserAccumulators());
>>              }
>>
>>
>> However, i am receiving following error in Flink:
>>
>> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
>> ERROR akka.remote.EndpointWriter - Transient association error
>> (association
>> remains live)
>> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
>> CheckpointCoordinator
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1184)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.
>> java:348)
>> ~[?:1.8.0_92]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply$mcV$sp(Serializer.scala:129)
>> ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
>> 57)
>> ~[scala-library-2.10.5.jar:?]
>>          at akka.serialization.JavaSerializer.toBinary(Serializer.scala:
>> 129)
>> ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.remote.MessageSerializer$.serialize(MessageSerializer.s
>> cala:36)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:845)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:845)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
>> 57)
>> ~[scala-library-2.10.5.jar:?]
>>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:
>> 844)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>
>> Any reason why its failing? This code works when invoked through
>> WebRuntimeMonitor.
>>
>> Regards
>> Sumit Chawla
>>
>>
>


   
Reply | Threaded
Open this post in threaded view
|

Re: Get Flink ExecutionGraph Programmatically

Stephan Ewen
In reply to this post by Chawla,Sumit
Between two different actor systems in the same JVM, messages are still
serialized (they go through a local socket, I think).

Getting the execution graph is not easily possible, and not intended, as it
actually contains RPC resources, etc.

What do you need from the execution graph? Maybe there is another way to
achieve that...

On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <[hidden email]>
wrote:

> Hi Chesney
>
> I am actually running this code in the same JVM as the WebInterface and
> JobManager.  I am programmatically, starting the JobManager. and  then
> running this code in same JVM to query metrics.  Only difference could be
> that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if it
> forces it to execute the code as if request is coming over the wire.  I am
> not very well aware of Akka internals, so may be somebody can shed some
> light on it.
>
> Regards
> Sumit Chawla
>
>
> On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <[hidden email]>
> wrote:
>
> > Hello,
> >
> > this is a rather subtle issue you stumbled upon here.
> >
> > The ExecutionGraph is not serializable. The only reason why the
> > WebInterface can access it is because it runs in the same JVM as the
> > JobManager.
> >
> > I'm not sure if there is a way for what you are trying to do.
> >
> > Regards,
> > Chesnay
> >
> >
> > On 21.09.2016 06:11, Chawla,Sumit wrote:
> >
> >> Hi All
> >>
> >>
> >> I am trying to get JOB  accumulators.  ( I am aware that I can get the
> >> accumulators through REST APIs as well, but i wanted to avoid JSON
> >> parsing).
> >>
> >> Looking at JobAccumulatorsHandler i am trying to get execution graph for
> >> currently running job.  Following is my code:
> >>
> >>    InetSocketAddress initialJobManagerAddress=new
> >> InetSocketAddress(hostName,port);
> >>              InetAddress ownHostname;
> >>              ownHostname=
> >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,
> 400);
> >>
> >>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
> >> nfiguration,
> >>                      new Some(new
> >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
> >>
> >>              FiniteDuration timeout= FiniteDuration.apply(10,
> >> TimeUnit.SECONDS);
> >>
> >>              ActorGateway akkaActorGateway=
> >> LeaderRetrievalUtils.retrieveLeaderGateway(
> >>
> >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
> >>                      actorSystem,timeout
> >>              );
> >>
> >>
> >>              Future<Object> future=akkaActorGateway.ask(new
> >> RequestJobDetails(true,false),timeout);
> >>
> >>              MultipleJobsDetails result=(MultipleJobsDetails)
> >> Await.result(future,timeout);
> >>              ExecutionGraphHolder executionGraphHolder=new
> >> ExecutionGraphHolder(timeout);
> >>              LOG.info(result.toString());
> >>              for(JobDetails detail:result.getRunningJobs()){
> >>                  LOG.info(detail.getJobName() + "  ID " +
> >> detail.getJobId());
> >>
> >> *                ExecutionGraph
> >> executionGraph=executionGraphHolder.getExecutionGraph(detail.
> getJobId(),
> >> akkaActorGateway);*
> >>
> >>                 LOG.info("Accumulators " +
> >> executionGraph.aggregateUserAccumulators());
> >>              }
> >>
> >>
> >> However, i am receiving following error in Flink:
> >>
> >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
> >> ERROR akka.remote.EndpointWriter - Transient association error
> >> (association
> >> remains live)
> >> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
> >> CheckpointCoordinator
> >>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
> >> java:1184)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputSt
> >> ream.java:1548)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStrea
> >> m.java:1509)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputS
> >> tream.java:1432)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
> >> java:1178)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputSt
> >> ream.java:1548)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStrea
> >> m.java:1509)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputS
> >> tream.java:1432)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
> >> java:1178)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.
> >> java:348)
> >> ~[?:1.8.0_92]
> >>          at akka.serialization.JavaSerializer$$anonfun$
> >> toBinary$1.apply$mcV$sp(Serializer.scala:129)
> >> ~[akka-actor_2.10-2.3.7.jar:?]
> >>          at akka.serialization.JavaSerializer$$anonfun$
> >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
> >>          at akka.serialization.JavaSerializer$$anonfun$
> >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
> >>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
> >> 57)
> >> ~[scala-library-2.10.5.jar:?]
> >>          at akka.serialization.JavaSerializer.toBinary(
> Serializer.scala:
> >> 129)
> >> ~[akka-actor_2.10-2.3.7.jar:?]
> >>          at akka.remote.MessageSerializer$
> .serialize(MessageSerializer.s
> >> cala:36)
> >> ~[akka-remote_2.10-2.3.7.jar:?]
> >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> apply
> >> (Endpoint.scala:845)
> >> ~[akka-remote_2.10-2.3.7.jar:?]
> >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> apply
> >> (Endpoint.scala:845)
> >> ~[akka-remote_2.10-2.3.7.jar:?]
> >>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
> >> 57)
> >> ~[scala-library-2.10.5.jar:?]
> >>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:
> >> 844)
> >> ~[akka-remote_2.10-2.3.7.jar:?]
> >>
> >> Any reason why its failing? This code works when invoked through
> >> WebRuntimeMonitor.
> >>
> >> Regards
> >> Sumit Chawla
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Get Flink ExecutionGraph Programmatically

Stephan Ewen
In reply to this post by amir bahmanyari
Memory pre-allocation is generally not a good idea, unless you want to run
iterative batch jobs.

For example when you run RocksDB, it needs its own memory pool and Flink's
memory pool remains empty. So in that case, pre-allocating memory just
"steals" it away from other important consumers.

On Wed, Sep 21, 2016 at 8:37 PM, amir bahmanyari <
[hidden email]> wrote:

> My only 2 cents is that when I started to turn the mem pre-allocation
> param, to true & #slots & #buffers....I started to get all kinds of Akka &
> Disassociated exceptions thrown by the JM regarding the TMs...So yes, since
> I am also not well aware of Akka internals...I went back to my previous
> config & continued with turning knobs that wouldn't cause Akka
> exceptions.Thanks+regardsAmir-
>
>       From: "Chawla,Sumit" <[hidden email]>
>  To: [hidden email]
>  Sent: Wednesday, September 21, 2016 11:08 AM
>  Subject: Re: Get Flink ExecutionGraph Programmatically
>
> Hi Chesney
>
> I am actually running this code in the same JVM as the WebInterface and
> JobManager.  I am programmatically, starting the JobManager. and  then
> running this code in same JVM to query metrics.  Only difference could be
> that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if it
> forces it to execute the code as if request is coming over the wire.  I am
> not very well aware of Akka internals, so may be somebody can shed some
> light on it.
>
> Regards
> Sumit Chawla
>
>
> On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <[hidden email]>
> wrote:
>
> > Hello,
> >
> > this is a rather subtle issue you stumbled upon here.
> >
> > The ExecutionGraph is not serializable. The only reason why the
> > WebInterface can access it is because it runs in the same JVM as the
> > JobManager.
> >
> > I'm not sure if there is a way for what you are trying to do.
> >
> > Regards,
> > Chesnay
> >
> >
> > On 21.09.2016 06:11, Chawla,Sumit wrote:
> >
> >> Hi All
> >>
> >>
> >> I am trying to get JOB  accumulators.  ( I am aware that I can get the
> >> accumulators through REST APIs as well, but i wanted to avoid JSON
> >> parsing).
> >>
> >> Looking at JobAccumulatorsHandler i am trying to get execution graph for
> >> currently running job.  Following is my code:
> >>
> >>    InetSocketAddress initialJobManagerAddress=new
> >> InetSocketAddress(hostName,port);
> >>              InetAddress ownHostname;
> >>              ownHostname=
> >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,
> 400);
> >>
> >>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
> >> nfiguration,
> >>                      new Some(new
> >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
> >>
> >>              FiniteDuration timeout= FiniteDuration.apply(10,
> >> TimeUnit.SECONDS);
> >>
> >>              ActorGateway akkaActorGateway=
> >> LeaderRetrievalUtils.retrieveLeaderGateway(
> >>
> >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
> >>                      actorSystem,timeout
> >>              );
> >>
> >>
> >>              Future<Object> future=akkaActorGateway.ask(new
> >> RequestJobDetails(true,false),timeout);
> >>
> >>              MultipleJobsDetails result=(MultipleJobsDetails)
> >> Await.result(future,timeout);
> >>              ExecutionGraphHolder executionGraphHolder=new
> >> ExecutionGraphHolder(timeout);
> >>              LOG.info(result.toString());
> >>              for(JobDetails detail:result.getRunningJobs()){
> >>                  LOG.info(detail.getJobName() + "  ID " +
> >> detail.getJobId());
> >>
> >> *                ExecutionGraph
> >> executionGraph=executionGraphHolder.getExecutionGraph(detail.
> getJobId(),
> >> akkaActorGateway);*
> >>
> >>                LOG.info("Accumulators " +
> >> executionGraph.aggregateUserAccumulators());
> >>              }
> >>
> >>
> >> However, i am receiving following error in Flink:
> >>
> >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
> >> ERROR akka.remote.EndpointWriter - Transient association error
> >> (association
> >> remains live)
> >> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
> >> CheckpointCoordinator
> >>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
> >> java:1184)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputSt
> >> ream.java:1548)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStrea
> >> m.java:1509)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputS
> >> tream.java:1432)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
> >> java:1178)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputSt
> >> ream.java:1548)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStrea
> >> m.java:1509)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputS
> >> tream.java:1432)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
> >> java:1178)
> >> ~[?:1.8.0_92]
> >>          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.
> >> java:348)
> >> ~[?:1.8.0_92]
> >>          at akka.serialization.JavaSerializer$$anonfun$
> >> toBinary$1.apply$mcV$sp(Serializer.scala:129)
> >> ~[akka-actor_2.10-2.3.7.jar:?]
> >>          at akka.serialization.JavaSerializer$$anonfun$
> >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
> >>          at akka.serialization.JavaSerializer$$anonfun$
> >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
> >>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
> >> 57)
> >> ~[scala-library-2.10.5.jar:?]
> >>          at akka.serialization.JavaSerializer.toBinary(
> Serializer.scala:
> >> 129)
> >> ~[akka-actor_2.10-2.3.7.jar:?]
> >>          at akka.remote.MessageSerializer$
> .serialize(MessageSerializer.s
> >> cala:36)
> >> ~[akka-remote_2.10-2.3.7.jar:?]
> >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> apply
> >> (Endpoint.scala:845)
> >> ~[akka-remote_2.10-2.3.7.jar:?]
> >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> apply
> >> (Endpoint.scala:845)
> >> ~[akka-remote_2.10-2.3.7.jar:?]
> >>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
> >> 57)
> >> ~[scala-library-2.10.5.jar:?]
> >>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:
> >> 844)
> >> ~[akka-remote_2.10-2.3.7.jar:?]
> >>
> >> Any reason why its failing? This code works when invoked through
> >> WebRuntimeMonitor.
> >>
> >> Regards
> >> Sumit Chawla
> >>
> >>
> >
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Get Flink ExecutionGraph Programmatically

Chawla,Sumit
In reply to this post by Stephan Ewen
Hi Sean

My goal here is to get User Accumulators.  I know there exists the REST
Calls.  But since i am running my code in the same JVM, i wanted to avoid
go over HTTP.  I saw this code in JobAccumulatorsHandler and tried to use
this.  Would you suggest some alternative approach to avoid this over the
network serialization for Akka?

Regards
Sumit Chawla


On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <[hidden email]> wrote:

> Between two different actor systems in the same JVM, messages are still
> serialized (they go through a local socket, I think).
>
> Getting the execution graph is not easily possible, and not intended, as it
> actually contains RPC resources, etc.
>
> What do you need from the execution graph? Maybe there is another way to
> achieve that...
>
> On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <[hidden email]>
> wrote:
>
> > Hi Chesney
> >
> > I am actually running this code in the same JVM as the WebInterface and
> > JobManager.  I am programmatically, starting the JobManager. and  then
> > running this code in same JVM to query metrics.  Only difference could be
> > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if
> it
> > forces it to execute the code as if request is coming over the wire.  I
> am
> > not very well aware of Akka internals, so may be somebody can shed some
> > light on it.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <[hidden email]>
> > wrote:
> >
> > > Hello,
> > >
> > > this is a rather subtle issue you stumbled upon here.
> > >
> > > The ExecutionGraph is not serializable. The only reason why the
> > > WebInterface can access it is because it runs in the same JVM as the
> > > JobManager.
> > >
> > > I'm not sure if there is a way for what you are trying to do.
> > >
> > > Regards,
> > > Chesnay
> > >
> > >
> > > On 21.09.2016 06:11, Chawla,Sumit wrote:
> > >
> > >> Hi All
> > >>
> > >>
> > >> I am trying to get JOB  accumulators.  ( I am aware that I can get the
> > >> accumulators through REST APIs as well, but i wanted to avoid JSON
> > >> parsing).
> > >>
> > >> Looking at JobAccumulatorsHandler i am trying to get execution graph
> for
> > >> currently running job.  Following is my code:
> > >>
> > >>    InetSocketAddress initialJobManagerAddress=new
> > >> InetSocketAddress(hostName,port);
> > >>              InetAddress ownHostname;
> > >>              ownHostname=
> > >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,
> > 400);
> > >>
> > >>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
> > >> nfiguration,
> > >>                      new Some(new
> > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
> > >>
> > >>              FiniteDuration timeout= FiniteDuration.apply(10,
> > >> TimeUnit.SECONDS);
> > >>
> > >>              ActorGateway akkaActorGateway=
> > >> LeaderRetrievalUtils.retrieveLeaderGateway(
> > >>
> > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
> > >>                      actorSystem,timeout
> > >>              );
> > >>
> > >>
> > >>              Future<Object> future=akkaActorGateway.ask(new
> > >> RequestJobDetails(true,false),timeout);
> > >>
> > >>              MultipleJobsDetails result=(MultipleJobsDetails)
> > >> Await.result(future,timeout);
> > >>              ExecutionGraphHolder executionGraphHolder=new
> > >> ExecutionGraphHolder(timeout);
> > >>              LOG.info(result.toString());
> > >>              for(JobDetails detail:result.getRunningJobs()){
> > >>                  LOG.info(detail.getJobName() + "  ID " +
> > >> detail.getJobId());
> > >>
> > >> *                ExecutionGraph
> > >> executionGraph=executionGraphHolder.getExecutionGraph(detail.
> > getJobId(),
> > >> akkaActorGateway);*
> > >>
> > >>                 LOG.info("Accumulators " +
> > >> executionGraph.aggregateUserAccumulators());
> > >>              }
> > >>
> > >>
> > >> However, i am receiving following error in Flink:
> > >>
> > >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3]
> nobody
> > >> ERROR akka.remote.EndpointWriter - Transient association error
> > >> (association
> > >> remains live)
> > >> java.io.NotSerializableException: org.apache.flink.runtime.
> checkpoint.
> > >> CheckpointCoordinator
> > >>          at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.
> > >> java:1184)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > ObjectOutputSt
> > >> ream.java:1548)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeSerialData(
> > ObjectOutputStrea
> > >> m.java:1509)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > ObjectOutputS
> > >> tream.java:1432)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.
> > >> java:1178)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > ObjectOutputSt
> > >> ream.java:1548)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeSerialData(
> > ObjectOutputStrea
> > >> m.java:1509)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > ObjectOutputS
> > >> tream.java:1432)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.
> > >> java:1178)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.
> > >> java:348)
> > >> ~[?:1.8.0_92]
> > >>          at akka.serialization.JavaSerializer$$anonfun$
> > >> toBinary$1.apply$mcV$sp(Serializer.scala:129)
> > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > >>          at akka.serialization.JavaSerializer$$anonfun$
> > >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
> > >>          at akka.serialization.JavaSerializer$$anonfun$
> > >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
> > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> scala:
> > >> 57)
> > >> ~[scala-library-2.10.5.jar:?]
> > >>          at akka.serialization.JavaSerializer.toBinary(
> > Serializer.scala:
> > >> 129)
> > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > >>          at akka.remote.MessageSerializer$
> > .serialize(MessageSerializer.s
> > >> cala:36)
> > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> > apply
> > >> (Endpoint.scala:845)
> > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> > apply
> > >> (Endpoint.scala:845)
> > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> scala:
> > >> 57)
> > >> ~[scala-library-2.10.5.jar:?]
> > >>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.
> scala:
> > >> 844)
> > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > >>
> > >> Any reason why its failing? This code works when invoked through
> > >> WebRuntimeMonitor.
> > >>
> > >> Regards
> > >> Sumit Chawla
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Get Flink ExecutionGraph Programmatically

Aljoscha Krettek-2
Hi,
there is ClusterClient.getAccumulators(JobID jobID) which should be able to
get the accumulators for a running job. If you can construct a
ClusterClient that should be a good solution.

Cheers,
Aljoscha

On Wed, 21 Sep 2016 at 21:15 Chawla,Sumit <[hidden email]> wrote:

> Hi Sean
>
> My goal here is to get User Accumulators.  I know there exists the REST
> Calls.  But since i am running my code in the same JVM, i wanted to avoid
> go over HTTP.  I saw this code in JobAccumulatorsHandler and tried to use
> this.  Would you suggest some alternative approach to avoid this over the
> network serialization for Akka?
>
> Regards
> Sumit Chawla
>
>
> On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <[hidden email]> wrote:
>
> > Between two different actor systems in the same JVM, messages are still
> > serialized (they go through a local socket, I think).
> >
> > Getting the execution graph is not easily possible, and not intended, as
> it
> > actually contains RPC resources, etc.
> >
> > What do you need from the execution graph? Maybe there is another way to
> > achieve that...
> >
> > On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <[hidden email]>
> > wrote:
> >
> > > Hi Chesney
> > >
> > > I am actually running this code in the same JVM as the WebInterface and
> > > JobManager.  I am programmatically, starting the JobManager. and  then
> > > running this code in same JVM to query metrics.  Only difference could
> be
> > > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure
> if
> > it
> > > forces it to execute the code as if request is coming over the wire.  I
> > am
> > > not very well aware of Akka internals, so may be somebody can shed some
> > > light on it.
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <[hidden email]>
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > this is a rather subtle issue you stumbled upon here.
> > > >
> > > > The ExecutionGraph is not serializable. The only reason why the
> > > > WebInterface can access it is because it runs in the same JVM as the
> > > > JobManager.
> > > >
> > > > I'm not sure if there is a way for what you are trying to do.
> > > >
> > > > Regards,
> > > > Chesnay
> > > >
> > > >
> > > > On 21.09.2016 06:11, Chawla,Sumit wrote:
> > > >
> > > >> Hi All
> > > >>
> > > >>
> > > >> I am trying to get JOB  accumulators.  ( I am aware that I can get
> the
> > > >> accumulators through REST APIs as well, but i wanted to avoid JSON
> > > >> parsing).
> > > >>
> > > >> Looking at JobAccumulatorsHandler i am trying to get execution graph
> > for
> > > >> currently running job.  Following is my code:
> > > >>
> > > >>    InetSocketAddress initialJobManagerAddress=new
> > > >> InetSocketAddress(hostName,port);
> > > >>              InetAddress ownHostname;
> > > >>              ownHostname=
> > > >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,
> > > 400);
> > > >>
> > > >>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
> > > >> nfiguration,
> > > >>                      new Some(new
> > > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
> > > >>
> > > >>              FiniteDuration timeout= FiniteDuration.apply(10,
> > > >> TimeUnit.SECONDS);
> > > >>
> > > >>              ActorGateway akkaActorGateway=
> > > >> LeaderRetrievalUtils.retrieveLeaderGateway(
> > > >>
> > > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
> > > >>                      actorSystem,timeout
> > > >>              );
> > > >>
> > > >>
> > > >>              Future<Object> future=akkaActorGateway.ask(new
> > > >> RequestJobDetails(true,false),timeout);
> > > >>
> > > >>              MultipleJobsDetails result=(MultipleJobsDetails)
> > > >> Await.result(future,timeout);
> > > >>              ExecutionGraphHolder executionGraphHolder=new
> > > >> ExecutionGraphHolder(timeout);
> > > >>              LOG.info(result.toString());
> > > >>              for(JobDetails detail:result.getRunningJobs()){
> > > >>                  LOG.info(detail.getJobName() + "  ID " +
> > > >> detail.getJobId());
> > > >>
> > > >> *                ExecutionGraph
> > > >> executionGraph=executionGraphHolder.getExecutionGraph(detail.
> > > getJobId(),
> > > >> akkaActorGateway);*
> > > >>
> > > >>                 LOG.info("Accumulators " +
> > > >> executionGraph.aggregateUserAccumulators());
> > > >>              }
> > > >>
> > > >>
> > > >> However, i am receiving following error in Flink:
> > > >>
> > > >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3]
> > nobody
> > > >> ERROR akka.remote.EndpointWriter - Transient association error
> > > >> (association
> > > >> remains live)
> > > >> java.io.NotSerializableException: org.apache.flink.runtime.
> > checkpoint.
> > > >> CheckpointCoordinator
> > > >>          at java.io.ObjectOutputStream.writeObject0(
> > ObjectOutputStream.
> > > >> java:1184)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > > ObjectOutputSt
> > > >> ream.java:1548)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeSerialData(
> > > ObjectOutputStrea
> > > >> m.java:1509)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > > ObjectOutputS
> > > >> tream.java:1432)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeObject0(
> > ObjectOutputStream.
> > > >> java:1178)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > > ObjectOutputSt
> > > >> ream.java:1548)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeSerialData(
> > > ObjectOutputStrea
> > > >> m.java:1509)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > > ObjectOutputS
> > > >> tream.java:1432)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeObject0(
> > ObjectOutputStream.
> > > >> java:1178)
> > > >> ~[?:1.8.0_92]
> > > >>          at java.io.ObjectOutputStream.writeObject(
> > ObjectOutputStream.
> > > >> java:348)
> > > >> ~[?:1.8.0_92]
> > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > >> toBinary$1.apply$mcV$sp(Serializer.scala:129)
> > > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > >> toBinary$1.apply(Serializer.scala:129)
> ~[akka-actor_2.10-2.3.7.jar:?]
> > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > >> toBinary$1.apply(Serializer.scala:129)
> ~[akka-actor_2.10-2.3.7.jar:?]
> > > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> > scala:
> > > >> 57)
> > > >> ~[scala-library-2.10.5.jar:?]
> > > >>          at akka.serialization.JavaSerializer.toBinary(
> > > Serializer.scala:
> > > >> 129)
> > > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > > >>          at akka.remote.MessageSerializer$
> > > .serialize(MessageSerializer.s
> > > >> cala:36)
> > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> > > apply
> > > >> (Endpoint.scala:845)
> > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> > > apply
> > > >> (Endpoint.scala:845)
> > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> > scala:
> > > >> 57)
> > > >> ~[scala-library-2.10.5.jar:?]
> > > >>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.
> > scala:
> > > >> 844)
> > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > >>
> > > >> Any reason why its failing? This code works when invoked through
> > > >> WebRuntimeMonitor.
> > > >>
> > > >> Regards
> > > >> Sumit Chawla
> > > >>
> > > >>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Get Flink ExecutionGraph Programmatically

Chawla,Sumit
HI Aljoscha

I was able to get the ClusterClient and Accumulators using following:

DefaultCLI defaultCLI = new DefaultCLI();
CommandLine line = new DefaultParser().parse(new Options(), new
String[]{}, true);
ClusterClient clusterClient = defaultCLI.retrieveCluster(line,configuration);



Regards
Sumit Chawla


On Thu, Sep 22, 2016 at 4:55 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> there is ClusterClient.getAccumulators(JobID jobID) which should be able
> to
> get the accumulators for a running job. If you can construct a
> ClusterClient that should be a good solution.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Sep 2016 at 21:15 Chawla,Sumit <[hidden email]> wrote:
>
> > Hi Sean
> >
> > My goal here is to get User Accumulators.  I know there exists the REST
> > Calls.  But since i am running my code in the same JVM, i wanted to avoid
> > go over HTTP.  I saw this code in JobAccumulatorsHandler and tried to use
> > this.  Would you suggest some alternative approach to avoid this over the
> > network serialization for Akka?
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Between two different actor systems in the same JVM, messages are still
> > > serialized (they go through a local socket, I think).
> > >
> > > Getting the execution graph is not easily possible, and not intended,
> as
> > it
> > > actually contains RPC resources, etc.
> > >
> > > What do you need from the execution graph? Maybe there is another way
> to
> > > achieve that...
> > >
> > > On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <[hidden email]>
> > > wrote:
> > >
> > > > Hi Chesney
> > > >
> > > > I am actually running this code in the same JVM as the WebInterface
> and
> > > > JobManager.  I am programmatically, starting the JobManager. and
> then
> > > > running this code in same JVM to query metrics.  Only difference
> could
> > be
> > > > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure
> > if
> > > it
> > > > forces it to execute the code as if request is coming over the
> wire.  I
> > > am
> > > > not very well aware of Akka internals, so may be somebody can shed
> some
> > > > light on it.
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > this is a rather subtle issue you stumbled upon here.
> > > > >
> > > > > The ExecutionGraph is not serializable. The only reason why the
> > > > > WebInterface can access it is because it runs in the same JVM as
> the
> > > > > JobManager.
> > > > >
> > > > > I'm not sure if there is a way for what you are trying to do.
> > > > >
> > > > > Regards,
> > > > > Chesnay
> > > > >
> > > > >
> > > > > On 21.09.2016 06:11, Chawla,Sumit wrote:
> > > > >
> > > > >> Hi All
> > > > >>
> > > > >>
> > > > >> I am trying to get JOB  accumulators.  ( I am aware that I can get
> > the
> > > > >> accumulators through REST APIs as well, but i wanted to avoid JSON
> > > > >> parsing).
> > > > >>
> > > > >> Looking at JobAccumulatorsHandler i am trying to get execution
> graph
> > > for
> > > > >> currently running job.  Following is my code:
> > > > >>
> > > > >>    InetSocketAddress initialJobManagerAddress=new
> > > > >> InetSocketAddress(hostName,port);
> > > > >>              InetAddress ownHostname;
> > > > >>              ownHostname=
> > > > >> ConnectionUtils.findConnectingAddress(
> initialJobManagerAddress,2000,
> > > > 400);
> > > > >>
> > > > >>              ActorSystem actorSystem=
> AkkaUtils.createActorSystem(co
> > > > >> nfiguration,
> > > > >>                      new Some(new
> > > > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
> > > > >>
> > > > >>              FiniteDuration timeout= FiniteDuration.apply(10,
> > > > >> TimeUnit.SECONDS);
> > > > >>
> > > > >>              ActorGateway akkaActorGateway=
> > > > >> LeaderRetrievalUtils.retrieveLeaderGateway(
> > > > >>
> > > > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
> > > > >>                      actorSystem,timeout
> > > > >>              );
> > > > >>
> > > > >>
> > > > >>              Future<Object> future=akkaActorGateway.ask(new
> > > > >> RequestJobDetails(true,false),timeout);
> > > > >>
> > > > >>              MultipleJobsDetails result=(MultipleJobsDetails)
> > > > >> Await.result(future,timeout);
> > > > >>              ExecutionGraphHolder executionGraphHolder=new
> > > > >> ExecutionGraphHolder(timeout);
> > > > >>              LOG.info(result.toString());
> > > > >>              for(JobDetails detail:result.getRunningJobs()){
> > > > >>                  LOG.info(detail.getJobName() + "  ID " +
> > > > >> detail.getJobId());
> > > > >>
> > > > >> *                ExecutionGraph
> > > > >> executionGraph=executionGraphHolder.getExecutionGraph(detail.
> > > > getJobId(),
> > > > >> akkaActorGateway);*
> > > > >>
> > > > >>                 LOG.info("Accumulators " +
> > > > >> executionGraph.aggregateUserAccumulators());
> > > > >>              }
> > > > >>
> > > > >>
> > > > >> However, i am receiving following error in Flink:
> > > > >>
> > > > >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3]
> > > nobody
> > > > >> ERROR akka.remote.EndpointWriter - Transient association error
> > > > >> (association
> > > > >> remains live)
> > > > >> java.io.NotSerializableException: org.apache.flink.runtime.
> > > checkpoint.
> > > > >> CheckpointCoordinator
> > > > >>          at java.io.ObjectOutputStream.writeObject0(
> > > ObjectOutputStream.
> > > > >> java:1184)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > > > ObjectOutputSt
> > > > >> ream.java:1548)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeSerialData(
> > > > ObjectOutputStrea
> > > > >> m.java:1509)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > > > ObjectOutputS
> > > > >> tream.java:1432)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeObject0(
> > > ObjectOutputStream.
> > > > >> java:1178)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > > > ObjectOutputSt
> > > > >> ream.java:1548)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeSerialData(
> > > > ObjectOutputStrea
> > > > >> m.java:1509)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > > > ObjectOutputS
> > > > >> tream.java:1432)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeObject0(
> > > ObjectOutputStream.
> > > > >> java:1178)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at java.io.ObjectOutputStream.writeObject(
> > > ObjectOutputStream.
> > > > >> java:348)
> > > > >> ~[?:1.8.0_92]
> > > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > > >> toBinary$1.apply$mcV$sp(Serializer.scala:129)
> > > > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > > >> toBinary$1.apply(Serializer.scala:129)
> > ~[akka-actor_2.10-2.3.7.jar:?]
> > > > >>          at akka.serialization.JavaSerializer$$anonfun$
> > > > >> toBinary$1.apply(Serializer.scala:129)
> > ~[akka-actor_2.10-2.3.7.jar:?]
> > > > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> > > scala:
> > > > >> 57)
> > > > >> ~[scala-library-2.10.5.jar:?]
> > > > >>          at akka.serialization.JavaSerializer.toBinary(
> > > > Serializer.scala:
> > > > >> 129)
> > > > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > > > >>          at akka.remote.MessageSerializer$
> > > > .serialize(MessageSerializer.s
> > > > >> cala:36)
> > > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > > >>          at akka.remote.EndpointWriter$$
> anonfun$serializeMessage$1.
> > > > apply
> > > > >> (Endpoint.scala:845)
> > > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > > >>          at akka.remote.EndpointWriter$$
> anonfun$serializeMessage$1.
> > > > apply
> > > > >> (Endpoint.scala:845)
> > > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> > > scala:
> > > > >> 57)
> > > > >> ~[scala-library-2.10.5.jar:?]
> > > > >>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.
> > > scala:
> > > > >> 844)
> > > > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > > > >>
> > > > >> Any reason why its failing? This code works when invoked through
> > > > >> WebRuntimeMonitor.
> > > > >>
> > > > >> Regards
> > > > >> Sumit Chawla
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>