Extracting detailed Flink execution plan

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Extracting detailed Flink execution plan

Amit Pawar
Hi

I am trying to extract/retrieve the Flink execution plan. I managed to get
it as JSON string in following ways:
1. Using JAR - via PackagedProgram using getPreviewPlan() ; or
2. Directly in program - via ExecutionEnvironment's getExecutionPlan()

My question is - Is it possible to retrieve directly the Plan object?
I tried for this but was not successful as submitting the jar takes us into
interactive mode, and in order to use the other mode, programEntryPoint,
the main class needs to implement Program interface with getPlan method.

Even if we manage to get the execution plan as a Plan object, will it be
different from what we have using JSON string? like in terms of -
1. What are the datatypes used in the dataset's tuple
2. On what key is the Join taking place
3. Filtering predicate
4. Field for Distinct and so on
(JSON plan does have the operator tree but the contents field points to the
line of code in the class, which is not that helpful)

If not, is it possible (by some other way) to get the above details just by
using the Flink job/jar as an input?


Thanks and Regards
Amit Pawar
Reply | Threaded
Open this post in threaded view
|

Re: Extracting detailed Flink execution plan

Stephan Ewen
Hi Amit!

The DataSet API is basically a fluent builder for the internal DAG of
operations, the "Plan". This plan is build when you call "env.execute()".

You can directly get the Plan by calling
ExecutionEnvironment#createProgramPlan()

The JSON plan has in addition the information inserted by the Optimizer
(what partitioning to use where, what keys to use). This is called the
"OptimizedPlan".
To obtain that, you have to push the Plan through the Optimizer:
"OptimizedPlan op = new Optimizer(new DataStaristics(), new
DefaultCostEstimator()).compile(plan)"

That optimized plan has everything in information for the execution. The
JSON is created from that OptimizedPlan via "new
PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"

Note: These classnames and instructions refer to Flink 0.9. For version
0.8, the names are a bit different.

Greetings,
Stephan



On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <[hidden email]> wrote:

> Hi
>
> I am trying to extract/retrieve the Flink execution plan. I managed to get
> it as JSON string in following ways:
> 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or
> 2. Directly in program - via ExecutionEnvironment's getExecutionPlan()
>
> My question is - Is it possible to retrieve directly the Plan object?
> I tried for this but was not successful as submitting the jar takes us into
> interactive mode, and in order to use the other mode, programEntryPoint,
> the main class needs to implement Program interface with getPlan method.
>
> Even if we manage to get the execution plan as a Plan object, will it be
> different from what we have using JSON string? like in terms of -
> 1. What are the datatypes used in the dataset's tuple
> 2. On what key is the Join taking place
> 3. Filtering predicate
> 4. Field for Distinct and so on
> (JSON plan does have the operator tree but the contents field points to the
> line of code in the class, which is not that helpful)
>
> If not, is it possible (by some other way) to get the above details just by
> using the Flink job/jar as an input?
>
>
> Thanks and Regards
> Amit Pawar
>
Reply | Threaded
Open this post in threaded view
|

Re: Extracting detailed Flink execution plan

Amit Pawar
Many thanks Stephan.
I followed your instructions and it was working fine when I had the
required flink projects in the build path,
later when I substituted it by adding respective dependencies with the
snapshots in pom, I am getting the below exception at

OptimizedPlan opPlan = op.compile(env.createProgramPlan());

Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not an
optimizer post-pass.
at
org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
at thesis.examples.SampleTest.main(SampleTest.java:189)
Caused by: java.lang.ClassCastException: class
org.apache.flink.compiler.postpass.JavaApiPostPass
at java.lang.Class.asSubclass(Class.java:3208)
at
org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557)
... 2 more

I can work around that by having the necessary flink projects in
eclipse/build path, but then I face  different issue of scala, No such
method exception on env.execute();

Please advise.

Thanks and Regards
Amit Pawar


On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <[hidden email]> wrote:

> Hi Amit!
>
> The DataSet API is basically a fluent builder for the internal DAG of
> operations, the "Plan". This plan is build when you call "env.execute()".
>
> You can directly get the Plan by calling
> ExecutionEnvironment#createProgramPlan()
>
> The JSON plan has in addition the information inserted by the Optimizer
> (what partitioning to use where, what keys to use). This is called the
> "OptimizedPlan".
> To obtain that, you have to push the Plan through the Optimizer:
> "OptimizedPlan op = new Optimizer(new DataStaristics(), new
> DefaultCostEstimator()).compile(plan)"
>
> That optimized plan has everything in information for the execution. The
> JSON is created from that OptimizedPlan via "new
> PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"
>
> Note: These classnames and instructions refer to Flink 0.9. For version
> 0.8, the names are a bit different.
>
> Greetings,
> Stephan
>
>
>
> On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <[hidden email]>
> wrote:
>
> > Hi
> >
> > I am trying to extract/retrieve the Flink execution plan. I managed to
> get
> > it as JSON string in following ways:
> > 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or
> > 2. Directly in program - via ExecutionEnvironment's getExecutionPlan()
> >
> > My question is - Is it possible to retrieve directly the Plan object?
> > I tried for this but was not successful as submitting the jar takes us
> into
> > interactive mode, and in order to use the other mode, programEntryPoint,
> > the main class needs to implement Program interface with getPlan method.
> >
> > Even if we manage to get the execution plan as a Plan object, will it be
> > different from what we have using JSON string? like in terms of -
> > 1. What are the datatypes used in the dataset's tuple
> > 2. On what key is the Join taking place
> > 3. Filtering predicate
> > 4. Field for Distinct and so on
> > (JSON plan does have the operator tree but the contents field points to
> the
> > line of code in the class, which is not that helpful)
> >
> > If not, is it possible (by some other way) to get the above details just
> by
> > using the Flink job/jar as an input?
> >
> >
> > Thanks and Regards
> > Amit Pawar
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Extracting detailed Flink execution plan

Stephan Ewen
Hi!

What you describe sounds pretty much like a version mixup - NoSuchMethod
indicates one part of the code is out of sync with the other. Can you make
sure that you have all jars from the same Flink version in the classpath?

For the Optimizer Exception: The cause may be a similar issue (version
mixup) or a completely missing jar file. If you use the big jar file from
flink-dist in version 0.9, does that error occur?

Greetings,
Stephan


On Thu, Apr 23, 2015 at 2:01 AM, Amit Pawar <[hidden email]> wrote:

> Many thanks Stephan.
> I followed your instructions and it was working fine when I had the
> required flink projects in the build path,
> later when I substituted it by adding respective dependencies with the
> snapshots in pom, I am getting the below exception at
>
> OptimizedPlan opPlan = op.compile(env.createProgramPlan());
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not an
> optimizer post-pass.
> at
>
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573)
> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> at thesis.examples.SampleTest.main(SampleTest.java:189)
> Caused by: java.lang.ClassCastException: class
> org.apache.flink.compiler.postpass.JavaApiPostPass
> at java.lang.Class.asSubclass(Class.java:3208)
> at
>
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557)
> ... 2 more
>
> I can work around that by having the necessary flink projects in
> eclipse/build path, but then I face  different issue of scala, No such
> method exception on env.execute();
>
> Please advise.
>
> Thanks and Regards
> Amit Pawar
>
>
> On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi Amit!
> >
> > The DataSet API is basically a fluent builder for the internal DAG of
> > operations, the "Plan". This plan is build when you call "env.execute()".
> >
> > You can directly get the Plan by calling
> > ExecutionEnvironment#createProgramPlan()
> >
> > The JSON plan has in addition the information inserted by the Optimizer
> > (what partitioning to use where, what keys to use). This is called the
> > "OptimizedPlan".
> > To obtain that, you have to push the Plan through the Optimizer:
> > "OptimizedPlan op = new Optimizer(new DataStaristics(), new
> > DefaultCostEstimator()).compile(plan)"
> >
> > That optimized plan has everything in information for the execution. The
> > JSON is created from that OptimizedPlan via "new
> > PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"
> >
> > Note: These classnames and instructions refer to Flink 0.9. For version
> > 0.8, the names are a bit different.
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <[hidden email]>
> > wrote:
> >
> > > Hi
> > >
> > > I am trying to extract/retrieve the Flink execution plan. I managed to
> > get
> > > it as JSON string in following ways:
> > > 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or
> > > 2. Directly in program - via ExecutionEnvironment's getExecutionPlan()
> > >
> > > My question is - Is it possible to retrieve directly the Plan object?
> > > I tried for this but was not successful as submitting the jar takes us
> > into
> > > interactive mode, and in order to use the other mode,
> programEntryPoint,
> > > the main class needs to implement Program interface with getPlan
> method.
> > >
> > > Even if we manage to get the execution plan as a Plan object, will it
> be
> > > different from what we have using JSON string? like in terms of -
> > > 1. What are the datatypes used in the dataset's tuple
> > > 2. On what key is the Join taking place
> > > 3. Filtering predicate
> > > 4. Field for Distinct and so on
> > > (JSON plan does have the operator tree but the contents field points to
> > the
> > > line of code in the class, which is not that helpful)
> > >
> > > If not, is it possible (by some other way) to get the above details
> just
> > by
> > > using the Flink job/jar as an input?
> > >
> > >
> > > Thanks and Regards
> > > Amit Pawar
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Extracting detailed Flink execution plan

Amit Pawar
Thanks Stephan.
Using flink-dist jar solves the issue.


Thanks and Regards
Amit Pawar


On Thu, Apr 23, 2015 at 2:02 PM, Stephan Ewen <[hidden email]> wrote:

> Hi!
>
> What you describe sounds pretty much like a version mixup - NoSuchMethod
> indicates one part of the code is out of sync with the other. Can you make
> sure that you have all jars from the same Flink version in the classpath?
>
> For the Optimizer Exception: The cause may be a similar issue (version
> mixup) or a completely missing jar file. If you use the big jar file from
> flink-dist in version 0.9, does that error occur?
>
> Greetings,
> Stephan
>
>
> On Thu, Apr 23, 2015 at 2:01 AM, Amit Pawar <[hidden email]>
> wrote:
>
> > Many thanks Stephan.
> > I followed your instructions and it was working fine when I had the
> > required flink projects in the build path,
> > later when I substituted it by adding respective dependencies with the
> > snapshots in pom, I am getting the below exception at
> >
> > OptimizedPlan opPlan = op.compile(env.createProgramPlan());
> >
> > Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> > Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not an
> > optimizer post-pass.
> > at
> >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573)
> > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> > at thesis.examples.SampleTest.main(SampleTest.java:189)
> > Caused by: java.lang.ClassCastException: class
> > org.apache.flink.compiler.postpass.JavaApiPostPass
> > at java.lang.Class.asSubclass(Class.java:3208)
> > at
> >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557)
> > ... 2 more
> >
> > I can work around that by having the necessary flink projects in
> > eclipse/build path, but then I face  different issue of scala, No such
> > method exception on env.execute();
> >
> > Please advise.
> >
> > Thanks and Regards
> > Amit Pawar
> >
> >
> > On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi Amit!
> > >
> > > The DataSet API is basically a fluent builder for the internal DAG of
> > > operations, the "Plan". This plan is build when you call
> "env.execute()".
> > >
> > > You can directly get the Plan by calling
> > > ExecutionEnvironment#createProgramPlan()
> > >
> > > The JSON plan has in addition the information inserted by the Optimizer
> > > (what partitioning to use where, what keys to use). This is called the
> > > "OptimizedPlan".
> > > To obtain that, you have to push the Plan through the Optimizer:
> > > "OptimizedPlan op = new Optimizer(new DataStaristics(), new
> > > DefaultCostEstimator()).compile(plan)"
> > >
> > > That optimized plan has everything in information for the execution.
> The
> > > JSON is created from that OptimizedPlan via "new
> > > PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"
> > >
> > > Note: These classnames and instructions refer to Flink 0.9. For version
> > > 0.8, the names are a bit different.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > >
> > > On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <[hidden email]>
> > > wrote:
> > >
> > > > Hi
> > > >
> > > > I am trying to extract/retrieve the Flink execution plan. I managed
> to
> > > get
> > > > it as JSON string in following ways:
> > > > 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or
> > > > 2. Directly in program - via ExecutionEnvironment's
> getExecutionPlan()
> > > >
> > > > My question is - Is it possible to retrieve directly the Plan object?
> > > > I tried for this but was not successful as submitting the jar takes
> us
> > > into
> > > > interactive mode, and in order to use the other mode,
> > programEntryPoint,
> > > > the main class needs to implement Program interface with getPlan
> > method.
> > > >
> > > > Even if we manage to get the execution plan as a Plan object, will it
> > be
> > > > different from what we have using JSON string? like in terms of -
> > > > 1. What are the datatypes used in the dataset's tuple
> > > > 2. On what key is the Join taking place
> > > > 3. Filtering predicate
> > > > 4. Field for Distinct and so on
> > > > (JSON plan does have the operator tree but the contents field points
> to
> > > the
> > > > line of code in the class, which is not that helpful)
> > > >
> > > > If not, is it possible (by some other way) to get the above details
> > just
> > > by
> > > > using the Flink job/jar as an input?
> > > >
> > > >
> > > > Thanks and Regards
> > > > Amit Pawar
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Extracting detailed Flink execution plan

Stephan Ewen
Okay, nice to hear!

Ping us if you run into other trouble...

On Thu, Apr 23, 2015 at 2:28 PM, Amit Pawar <[hidden email]> wrote:

> Thanks Stephan.
> Using flink-dist jar solves the issue.
>
>
> Thanks and Regards
> Amit Pawar
>
>
> On Thu, Apr 23, 2015 at 2:02 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi!
> >
> > What you describe sounds pretty much like a version mixup - NoSuchMethod
> > indicates one part of the code is out of sync with the other. Can you
> make
> > sure that you have all jars from the same Flink version in the classpath?
> >
> > For the Optimizer Exception: The cause may be a similar issue (version
> > mixup) or a completely missing jar file. If you use the big jar file from
> > flink-dist in version 0.9, does that error occur?
> >
> > Greetings,
> > Stephan
> >
> >
> > On Thu, Apr 23, 2015 at 2:01 AM, Amit Pawar <[hidden email]>
> > wrote:
> >
> > > Many thanks Stephan.
> > > I followed your instructions and it was working fine when I had the
> > > required flink projects in the build path,
> > > later when I substituted it by adding respective dependencies with the
> > > snapshots in pom, I am getting the below exception at
> > >
> > > OptimizedPlan opPlan = op.compile(env.createProgramPlan());
> > >
> > > Exception in thread "main"
> org.apache.flink.optimizer.CompilerException:
> > > Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not an
> > > optimizer post-pass.
> > > at
> > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573)
> > > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> > > at thesis.examples.SampleTest.main(SampleTest.java:189)
> > > Caused by: java.lang.ClassCastException: class
> > > org.apache.flink.compiler.postpass.JavaApiPostPass
> > > at java.lang.Class.asSubclass(Class.java:3208)
> > > at
> > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557)
> > > ... 2 more
> > >
> > > I can work around that by having the necessary flink projects in
> > > eclipse/build path, but then I face  different issue of scala, No such
> > > method exception on env.execute();
> > >
> > > Please advise.
> > >
> > > Thanks and Regards
> > > Amit Pawar
> > >
> > >
> > > On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <[hidden email]>
> wrote:
> > >
> > > > Hi Amit!
> > > >
> > > > The DataSet API is basically a fluent builder for the internal DAG of
> > > > operations, the "Plan". This plan is build when you call
> > "env.execute()".
> > > >
> > > > You can directly get the Plan by calling
> > > > ExecutionEnvironment#createProgramPlan()
> > > >
> > > > The JSON plan has in addition the information inserted by the
> Optimizer
> > > > (what partitioning to use where, what keys to use). This is called
> the
> > > > "OptimizedPlan".
> > > > To obtain that, you have to push the Plan through the Optimizer:
> > > > "OptimizedPlan op = new Optimizer(new DataStaristics(), new
> > > > DefaultCostEstimator()).compile(plan)"
> > > >
> > > > That optimized plan has everything in information for the execution.
> > The
> > > > JSON is created from that OptimizedPlan via "new
> > > > PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"
> > > >
> > > > Note: These classnames and instructions refer to Flink 0.9. For
> version
> > > > 0.8, the names are a bit different.
> > > >
> > > > Greetings,
> > > > Stephan
> > > >
> > > >
> > > >
> > > > On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <[hidden email]
> >
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > I am trying to extract/retrieve the Flink execution plan. I managed
> > to
> > > > get
> > > > > it as JSON string in following ways:
> > > > > 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or
> > > > > 2. Directly in program - via ExecutionEnvironment's
> > getExecutionPlan()
> > > > >
> > > > > My question is - Is it possible to retrieve directly the Plan
> object?
> > > > > I tried for this but was not successful as submitting the jar takes
> > us
> > > > into
> > > > > interactive mode, and in order to use the other mode,
> > > programEntryPoint,
> > > > > the main class needs to implement Program interface with getPlan
> > > method.
> > > > >
> > > > > Even if we manage to get the execution plan as a Plan object, will
> it
> > > be
> > > > > different from what we have using JSON string? like in terms of -
> > > > > 1. What are the datatypes used in the dataset's tuple
> > > > > 2. On what key is the Join taking place
> > > > > 3. Filtering predicate
> > > > > 4. Field for Distinct and so on
> > > > > (JSON plan does have the operator tree but the contents field
> points
> > to
> > > > the
> > > > > line of code in the class, which is not that helpful)
> > > > >
> > > > > If not, is it possible (by some other way) to get the above details
> > > just
> > > > by
> > > > > using the Flink job/jar as an input?
> > > > >
> > > > >
> > > > > Thanks and Regards
> > > > > Amit Pawar
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Extracting detailed Flink execution plan

Amit Pawar
Hi

For a given Operator<?>, it is easily possible to find the input and output
TypeInformation<?>,which gives the datatype and other information of the
dataset. For e.g.

if(operator instanceof SingleInputOperator){
inputTypes.add(((SingleInputOperator)operator).getOperatorInfo().getInputType());
}

For a given DAG connection, we can determine source and target, also in
terms of Operator.

Is it possible to determine on which dataset is the operator working on?

Thanks and Regards
Amit Pawar


On Thu, Apr 23, 2015 at 3:01 PM, Stephan Ewen <[hidden email]> wrote:

> Okay, nice to hear!
>
> Ping us if you run into other trouble...
>
> On Thu, Apr 23, 2015 at 2:28 PM, Amit Pawar <[hidden email]>
> wrote:
>
> > Thanks Stephan.
> > Using flink-dist jar solves the issue.
> >
> >
> > Thanks and Regards
> > Amit Pawar
> >
> >
> > On Thu, Apr 23, 2015 at 2:02 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi!
> > >
> > > What you describe sounds pretty much like a version mixup -
> NoSuchMethod
> > > indicates one part of the code is out of sync with the other. Can you
> > make
> > > sure that you have all jars from the same Flink version in the
> classpath?
> > >
> > > For the Optimizer Exception: The cause may be a similar issue (version
> > > mixup) or a completely missing jar file. If you use the big jar file
> from
> > > flink-dist in version 0.9, does that error occur?
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Thu, Apr 23, 2015 at 2:01 AM, Amit Pawar <[hidden email]>
> > > wrote:
> > >
> > > > Many thanks Stephan.
> > > > I followed your instructions and it was working fine when I had the
> > > > required flink projects in the build path,
> > > > later when I substituted it by adding respective dependencies with
> the
> > > > snapshots in pom, I am getting the below exception at
> > > >
> > > > OptimizedPlan opPlan = op.compile(env.createProgramPlan());
> > > >
> > > > Exception in thread "main"
> > org.apache.flink.optimizer.CompilerException:
> > > > Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not an
> > > > optimizer post-pass.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573)
> > > > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> > > > at thesis.examples.SampleTest.main(SampleTest.java:189)
> > > > Caused by: java.lang.ClassCastException: class
> > > > org.apache.flink.compiler.postpass.JavaApiPostPass
> > > > at java.lang.Class.asSubclass(Class.java:3208)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557)
> > > > ... 2 more
> > > >
> > > > I can work around that by having the necessary flink projects in
> > > > eclipse/build path, but then I face  different issue of scala, No
> such
> > > > method exception on env.execute();
> > > >
> > > > Please advise.
> > > >
> > > > Thanks and Regards
> > > > Amit Pawar
> > > >
> > > >
> > > > On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <[hidden email]>
> > wrote:
> > > >
> > > > > Hi Amit!
> > > > >
> > > > > The DataSet API is basically a fluent builder for the internal DAG
> of
> > > > > operations, the "Plan". This plan is build when you call
> > > "env.execute()".
> > > > >
> > > > > You can directly get the Plan by calling
> > > > > ExecutionEnvironment#createProgramPlan()
> > > > >
> > > > > The JSON plan has in addition the information inserted by the
> > Optimizer
> > > > > (what partitioning to use where, what keys to use). This is called
> > the
> > > > > "OptimizedPlan".
> > > > > To obtain that, you have to push the Plan through the Optimizer:
> > > > > "OptimizedPlan op = new Optimizer(new DataStaristics(), new
> > > > > DefaultCostEstimator()).compile(plan)"
> > > > >
> > > > > That optimized plan has everything in information for the
> execution.
> > > The
> > > > > JSON is created from that OptimizedPlan via "new
> > > > > PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"
> > > > >
> > > > > Note: These classnames and instructions refer to Flink 0.9. For
> > version
> > > > > 0.8, the names are a bit different.
> > > > >
> > > > > Greetings,
> > > > > Stephan
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <
> [hidden email]
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi
> > > > > >
> > > > > > I am trying to extract/retrieve the Flink execution plan. I
> managed
> > > to
> > > > > get
> > > > > > it as JSON string in following ways:
> > > > > > 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or
> > > > > > 2. Directly in program - via ExecutionEnvironment's
> > > getExecutionPlan()
> > > > > >
> > > > > > My question is - Is it possible to retrieve directly the Plan
> > object?
> > > > > > I tried for this but was not successful as submitting the jar
> takes
> > > us
> > > > > into
> > > > > > interactive mode, and in order to use the other mode,
> > > > programEntryPoint,
> > > > > > the main class needs to implement Program interface with getPlan
> > > > method.
> > > > > >
> > > > > > Even if we manage to get the execution plan as a Plan object,
> will
> > it
> > > > be
> > > > > > different from what we have using JSON string? like in terms of -
> > > > > > 1. What are the datatypes used in the dataset's tuple
> > > > > > 2. On what key is the Join taking place
> > > > > > 3. Filtering predicate
> > > > > > 4. Field for Distinct and so on
> > > > > > (JSON plan does have the operator tree but the contents field
> > points
> > > to
> > > > > the
> > > > > > line of code in the class, which is not that helpful)
> > > > > >
> > > > > > If not, is it possible (by some other way) to get the above
> details
> > > > just
> > > > > by
> > > > > > using the Flink job/jar as an input?
> > > > > >
> > > > > >
> > > > > > Thanks and Regards
> > > > > > Amit Pawar
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Extracting detailed Flink execution plan

Stephan Ewen
Hi!

It seems you are actually working on the Optimizer's representation of the
program. The optimizer (and the runtime as well) think about Flink programs
in terms of data flows, not data sets any more (that is only in the
Java/Scala API).

The Java API operations get translated to the data flow operators for
example in
"org.apache.flink.api.java.operators.SingleInputOperator#translateToDataFlow()".

Actually, even the Java API is in some sense only a fluid data flow builder
pattern. All DataSets are actually the results from operators and the
concrete implementations of the abstract data set class are the operator
classes.

Greetings,
Stephan



On Mon, Apr 27, 2015 at 2:51 PM, Amit Pawar <[hidden email]> wrote:

> Hi
>
> For a given Operator<?>, it is easily possible to find the input and output
> TypeInformation<?>,which gives the datatype and other information of the
> dataset. For e.g.
>
> if(operator instanceof SingleInputOperator){
>
> inputTypes.add(((SingleInputOperator)operator).getOperatorInfo().getInputType());
> }
>
> For a given DAG connection, we can determine source and target, also in
> terms of Operator.
>
> Is it possible to determine on which dataset is the operator working on?
>
> Thanks and Regards
> Amit Pawar
>
>
> On Thu, Apr 23, 2015 at 3:01 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Okay, nice to hear!
> >
> > Ping us if you run into other trouble...
> >
> > On Thu, Apr 23, 2015 at 2:28 PM, Amit Pawar <[hidden email]>
> > wrote:
> >
> > > Thanks Stephan.
> > > Using flink-dist jar solves the issue.
> > >
> > >
> > > Thanks and Regards
> > > Amit Pawar
> > >
> > >
> > > On Thu, Apr 23, 2015 at 2:02 PM, Stephan Ewen <[hidden email]>
> wrote:
> > >
> > > > Hi!
> > > >
> > > > What you describe sounds pretty much like a version mixup -
> > NoSuchMethod
> > > > indicates one part of the code is out of sync with the other. Can you
> > > make
> > > > sure that you have all jars from the same Flink version in the
> > classpath?
> > > >
> > > > For the Optimizer Exception: The cause may be a similar issue
> (version
> > > > mixup) or a completely missing jar file. If you use the big jar file
> > from
> > > > flink-dist in version 0.9, does that error occur?
> > > >
> > > > Greetings,
> > > > Stephan
> > > >
> > > >
> > > > On Thu, Apr 23, 2015 at 2:01 AM, Amit Pawar <[hidden email]
> >
> > > > wrote:
> > > >
> > > > > Many thanks Stephan.
> > > > > I followed your instructions and it was working fine when I had the
> > > > > required flink projects in the build path,
> > > > > later when I substituted it by adding respective dependencies with
> > the
> > > > > snapshots in pom, I am getting the below exception at
> > > > >
> > > > > OptimizedPlan opPlan = op.compile(env.createProgramPlan());
> > > > >
> > > > > Exception in thread "main"
> > > org.apache.flink.optimizer.CompilerException:
> > > > > Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not
> an
> > > > > optimizer post-pass.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573)
> > > > > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> > > > > at thesis.examples.SampleTest.main(SampleTest.java:189)
> > > > > Caused by: java.lang.ClassCastException: class
> > > > > org.apache.flink.compiler.postpass.JavaApiPostPass
> > > > > at java.lang.Class.asSubclass(Class.java:3208)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557)
> > > > > ... 2 more
> > > > >
> > > > > I can work around that by having the necessary flink projects in
> > > > > eclipse/build path, but then I face  different issue of scala, No
> > such
> > > > > method exception on env.execute();
> > > > >
> > > > > Please advise.
> > > > >
> > > > > Thanks and Regards
> > > > > Amit Pawar
> > > > >
> > > > >
> > > > > On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <[hidden email]>
> > > wrote:
> > > > >
> > > > > > Hi Amit!
> > > > > >
> > > > > > The DataSet API is basically a fluent builder for the internal
> DAG
> > of
> > > > > > operations, the "Plan". This plan is build when you call
> > > > "env.execute()".
> > > > > >
> > > > > > You can directly get the Plan by calling
> > > > > > ExecutionEnvironment#createProgramPlan()
> > > > > >
> > > > > > The JSON plan has in addition the information inserted by the
> > > Optimizer
> > > > > > (what partitioning to use where, what keys to use). This is
> called
> > > the
> > > > > > "OptimizedPlan".
> > > > > > To obtain that, you have to push the Plan through the Optimizer:
> > > > > > "OptimizedPlan op = new Optimizer(new DataStaristics(), new
> > > > > > DefaultCostEstimator()).compile(plan)"
> > > > > >
> > > > > > That optimized plan has everything in information for the
> > execution.
> > > > The
> > > > > > JSON is created from that OptimizedPlan via "new
> > > > > > PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"
> > > > > >
> > > > > > Note: These classnames and instructions refer to Flink 0.9. For
> > > version
> > > > > > 0.8, the names are a bit different.
> > > > > >
> > > > > > Greetings,
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <
> > [hidden email]
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi
> > > > > > >
> > > > > > > I am trying to extract/retrieve the Flink execution plan. I
> > managed
> > > > to
> > > > > > get
> > > > > > > it as JSON string in following ways:
> > > > > > > 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or
> > > > > > > 2. Directly in program - via ExecutionEnvironment's
> > > > getExecutionPlan()
> > > > > > >
> > > > > > > My question is - Is it possible to retrieve directly the Plan
> > > object?
> > > > > > > I tried for this but was not successful as submitting the jar
> > takes
> > > > us
> > > > > > into
> > > > > > > interactive mode, and in order to use the other mode,
> > > > > programEntryPoint,
> > > > > > > the main class needs to implement Program interface with
> getPlan
> > > > > method.
> > > > > > >
> > > > > > > Even if we manage to get the execution plan as a Plan object,
> > will
> > > it
> > > > > be
> > > > > > > different from what we have using JSON string? like in terms
> of -
> > > > > > > 1. What are the datatypes used in the dataset's tuple
> > > > > > > 2. On what key is the Join taking place
> > > > > > > 3. Filtering predicate
> > > > > > > 4. Field for Distinct and so on
> > > > > > > (JSON plan does have the operator tree but the contents field
> > > points
> > > > to
> > > > > > the
> > > > > > > line of code in the class, which is not that helpful)
> > > > > > >
> > > > > > > If not, is it possible (by some other way) to get the above
> > details
> > > > > just
> > > > > > by
> > > > > > > using the Flink job/jar as an input?
> > > > > > >
> > > > > > >
> > > > > > > Thanks and Regards
> > > > > > > Amit Pawar
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Extracting detailed Flink execution plan

Amit Pawar
Thanks, Stephan.

So the follow-up question -
Is it possible to cast an operator from optimizers/common API to Java
API? ( as Java to Common is possible via translateToDataFlow())
For eg -
org.apache.flink.api.common.operators.base.JoinOperatorBase;
to
org.apache.flink.api.java.operators.JoinOperator;

JavaPlan, too, in turn gives us the List of Common API operators.
Thanks for your help.

Thanks and Regards
Amit Pawar


On Mon, Apr 27, 2015 at 3:18 PM, Stephan Ewen <[hidden email]> wrote:

> Hi!
>
> It seems you are actually working on the Optimizer's representation of the
> program. The optimizer (and the runtime as well) think about Flink programs
> in terms of data flows, not data sets any more (that is only in the
> Java/Scala API).
>
> The Java API operations get translated to the data flow operators for
> example in
>
> "org.apache.flink.api.java.operators.SingleInputOperator#translateToDataFlow()".
>
> Actually, even the Java API is in some sense only a fluid data flow builder
> pattern. All DataSets are actually the results from operators and the
> concrete implementations of the abstract data set class are the operator
> classes.
>
> Greetings,
> Stephan
>
>
>
> On Mon, Apr 27, 2015 at 2:51 PM, Amit Pawar <[hidden email]>
> wrote:
>
> > Hi
> >
> > For a given Operator<?>, it is easily possible to find the input and
> output
> > TypeInformation<?>,which gives the datatype and other information of the
> > dataset. For e.g.
> >
> > if(operator instanceof SingleInputOperator){
> >
> >
> inputTypes.add(((SingleInputOperator)operator).getOperatorInfo().getInputType());
> > }
> >
> > For a given DAG connection, we can determine source and target, also in
> > terms of Operator.
> >
> > Is it possible to determine on which dataset is the operator working on?
> >
> > Thanks and Regards
> > Amit Pawar
> >
> >
> > On Thu, Apr 23, 2015 at 3:01 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Okay, nice to hear!
> > >
> > > Ping us if you run into other trouble...
> > >
> > > On Thu, Apr 23, 2015 at 2:28 PM, Amit Pawar <[hidden email]>
> > > wrote:
> > >
> > > > Thanks Stephan.
> > > > Using flink-dist jar solves the issue.
> > > >
> > > >
> > > > Thanks and Regards
> > > > Amit Pawar
> > > >
> > > >
> > > > On Thu, Apr 23, 2015 at 2:02 PM, Stephan Ewen <[hidden email]>
> > wrote:
> > > >
> > > > > Hi!
> > > > >
> > > > > What you describe sounds pretty much like a version mixup -
> > > NoSuchMethod
> > > > > indicates one part of the code is out of sync with the other. Can
> you
> > > > make
> > > > > sure that you have all jars from the same Flink version in the
> > > classpath?
> > > > >
> > > > > For the Optimizer Exception: The cause may be a similar issue
> > (version
> > > > > mixup) or a completely missing jar file. If you use the big jar
> file
> > > from
> > > > > flink-dist in version 0.9, does that error occur?
> > > > >
> > > > > Greetings,
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Thu, Apr 23, 2015 at 2:01 AM, Amit Pawar <
> [hidden email]
> > >
> > > > > wrote:
> > > > >
> > > > > > Many thanks Stephan.
> > > > > > I followed your instructions and it was working fine when I had
> the
> > > > > > required flink projects in the build path,
> > > > > > later when I substituted it by adding respective dependencies
> with
> > > the
> > > > > > snapshots in pom, I am getting the below exception at
> > > > > >
> > > > > > OptimizedPlan opPlan = op.compile(env.createProgramPlan());
> > > > > >
> > > > > > Exception in thread "main"
> > > > org.apache.flink.optimizer.CompilerException:
> > > > > > Class 'org.apache.flink.compiler.postpass.JavaApiPostPass' is not
> > an
> > > > > > optimizer post-pass.
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:573)
> > > > > > at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> > > > > > at thesis.examples.SampleTest.main(SampleTest.java:189)
> > > > > > Caused by: java.lang.ClassCastException: class
> > > > > > org.apache.flink.compiler.postpass.JavaApiPostPass
> > > > > > at java.lang.Class.asSubclass(Class.java:3208)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.optimizer.Optimizer.getPostPassFromPlan(Optimizer.java:557)
> > > > > > ... 2 more
> > > > > >
> > > > > > I can work around that by having the necessary flink projects in
> > > > > > eclipse/build path, but then I face  different issue of scala, No
> > > such
> > > > > > method exception on env.execute();
> > > > > >
> > > > > > Please advise.
> > > > > >
> > > > > > Thanks and Regards
> > > > > > Amit Pawar
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 30, 2015 at 6:26 PM, Stephan Ewen <[hidden email]>
> > > > wrote:
> > > > > >
> > > > > > > Hi Amit!
> > > > > > >
> > > > > > > The DataSet API is basically a fluent builder for the internal
> > DAG
> > > of
> > > > > > > operations, the "Plan". This plan is build when you call
> > > > > "env.execute()".
> > > > > > >
> > > > > > > You can directly get the Plan by calling
> > > > > > > ExecutionEnvironment#createProgramPlan()
> > > > > > >
> > > > > > > The JSON plan has in addition the information inserted by the
> > > > Optimizer
> > > > > > > (what partitioning to use where, what keys to use). This is
> > called
> > > > the
> > > > > > > "OptimizedPlan".
> > > > > > > To obtain that, you have to push the Plan through the
> Optimizer:
> > > > > > > "OptimizedPlan op = new Optimizer(new DataStaristics(), new
> > > > > > > DefaultCostEstimator()).compile(plan)"
> > > > > > >
> > > > > > > That optimized plan has everything in information for the
> > > execution.
> > > > > The
> > > > > > > JSON is created from that OptimizedPlan via "new
> > > > > > > PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan)"
> > > > > > >
> > > > > > > Note: These classnames and instructions refer to Flink 0.9. For
> > > > version
> > > > > > > 0.8, the names are a bit different.
> > > > > > >
> > > > > > > Greetings,
> > > > > > > Stephan
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar <
> > > [hidden email]
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi
> > > > > > > >
> > > > > > > > I am trying to extract/retrieve the Flink execution plan. I
> > > managed
> > > > > to
> > > > > > > get
> > > > > > > > it as JSON string in following ways:
> > > > > > > > 1. Using JAR - via PackagedProgram using getPreviewPlan() ;
> or
> > > > > > > > 2. Directly in program - via ExecutionEnvironment's
> > > > > getExecutionPlan()
> > > > > > > >
> > > > > > > > My question is - Is it possible to retrieve directly the Plan
> > > > object?
> > > > > > > > I tried for this but was not successful as submitting the jar
> > > takes
> > > > > us
> > > > > > > into
> > > > > > > > interactive mode, and in order to use the other mode,
> > > > > > programEntryPoint,
> > > > > > > > the main class needs to implement Program interface with
> > getPlan
> > > > > > method.
> > > > > > > >
> > > > > > > > Even if we manage to get the execution plan as a Plan object,
> > > will
> > > > it
> > > > > > be
> > > > > > > > different from what we have using JSON string? like in terms
> > of -
> > > > > > > > 1. What are the datatypes used in the dataset's tuple
> > > > > > > > 2. On what key is the Join taking place
> > > > > > > > 3. Filtering predicate
> > > > > > > > 4. Field for Distinct and so on
> > > > > > > > (JSON plan does have the operator tree but the contents field
> > > > points
> > > > > to
> > > > > > > the
> > > > > > > > line of code in the class, which is not that helpful)
> > > > > > > >
> > > > > > > > If not, is it possible (by some other way) to get the above
> > > details
> > > > > > just
> > > > > > > by
> > > > > > > > using the Flink job/jar as an input?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks and Regards
> > > > > > > > Amit Pawar
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>