Nested iterations not supported?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Nested iterations not supported?

Pieter-Jan Van Aeken
Hello all,


I am having some troubles getting nested iterations to work. The basic
outline of my application looks like this :

1. create vertex dataset
2. create edge dataset
3. bulk iterate 100 times on edges {
    3a. Create graph from nodes and edges
    3b. Perform GatherSumApply (delta iteration)
    3c. Map Vertices
    3d. Perform GatherSumApply in other direction (again a delta iteration)
    3e. Join with edges on target
    3f. Output new edges
}
4. write edges to file

Am I correct in assuming that the two delta iterations (GSA) inside the
bulk iteration are not allowed at this point in time? Or should I continue
looking for bugs in my code? The stack trace doesn't help me all that much:

Exception in thread "main" java.lang.IllegalStateException
    at
org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342)
    at
org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
    at
org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301)
    at
org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333)
    at
org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
    at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
    at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
    at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
    at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580)


Regards,

Pieter-Jan Van Aeken
Reply | Threaded
Open this post in threaded view
|

Re: Nested iterations not supported?

Andra Lungu
Hello Pieter,

Nested iterations are indeed not supported in Flink.
http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3Cop.xw24u7fhf7e33m@vaio-sb%3E

The problem is not in your code.


On Tue, Aug 18, 2015 at 11:27 AM, Pieter-Jan Van Aeken <
[hidden email]> wrote:

> Hello all,
>
>
> I am having some troubles getting nested iterations to work. The basic
> outline of my application looks like this :
>
> 1. create vertex dataset
> 2. create edge dataset
> 3. bulk iterate 100 times on edges {
>     3a. Create graph from nodes and edges
>     3b. Perform GatherSumApply (delta iteration)
>     3c. Map Vertices
>     3d. Perform GatherSumApply in other direction (again a delta iteration)
>     3e. Join with edges on target
>     3f. Output new edges
> }
> 4. write edges to file
>
> Am I correct in assuming that the two delta iterations (GSA) inside the
> bulk iteration are not allowed at this point in time? Or should I continue
> looking for bugs in my code? The stack trace doesn't help me all that much:
>
> Exception in thread "main" java.lang.IllegalStateException
>     at
>
> org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
>     at
>
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
>     at
>
> org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396)
>     at
>
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333)
>     at
>
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
>     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
>     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>     at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
>     at
>
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
>     at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>     at
>
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580)
>
>
> Regards,
>
> Pieter-Jan Van Aeken
>
Reply | Threaded
Open this post in threaded view
|

Re: Nested iterations not supported?

Pieter-Jan Van Aeken
Hi,



Thanks for the quick response. Thinking of a workaround right now, where I
simply run the same application 100 times but I would like to avoid as much
duplicate work as possible. Is it possible to "broadcast" nodes and edges
datasets from one execution environment to the next without passing through
a data sink to collect them into heap or write them to a file?


Regards,

Pieter-Jan Van Aeken

2015-08-18 12:11 GMT+02:00 Andra Lungu <[hidden email]>:

> Hello Pieter,
>
> Nested iterations are indeed not supported in Flink.
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3Cop.xw24u7fhf7e33m@vaio-sb%3E
>
> The problem is not in your code.
>
>
> On Tue, Aug 18, 2015 at 11:27 AM, Pieter-Jan Van Aeken <
> [hidden email]> wrote:
>
> > Hello all,
> >
> >
> > I am having some troubles getting nested iterations to work. The basic
> > outline of my application looks like this :
> >
> > 1. create vertex dataset
> > 2. create edge dataset
> > 3. bulk iterate 100 times on edges {
> >     3a. Create graph from nodes and edges
> >     3b. Perform GatherSumApply (delta iteration)
> >     3c. Map Vertices
> >     3d. Perform GatherSumApply in other direction (again a delta
> iteration)
> >     3e. Join with edges on target
> >     3f. Output new edges
> > }
> > 4. write edges to file
> >
> > Am I correct in assuming that the two delta iterations (GSA) inside the
> > bulk iteration are not allowed at this point in time? Or should I
> continue
> > looking for bugs in my code? The stack trace doesn't help me all that
> much:
> >
> > Exception in thread "main" java.lang.IllegalStateException
> >     at
> >
> >
> org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333)
> >     at
> >
> >
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
> >     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
> >     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> >     at
> > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
> >     at
> >
> >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
> >     at
> >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> >     at
> >
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580)
> >
> >
> > Regards,
> >
> > Pieter-Jan Van Aeken
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Nested iterations not supported?

Stephan Ewen
Hi Pieter!

The pattern you need is common and we have code for such loops in the
works. It is being delayed right now due to work on fault-tolerance / high
availability, but we plan to resume with that
after those issues are done.

Greetings,
Stephan


On Tue, Aug 18, 2015 at 12:38 PM, Pieter-Jan Van Aeken <
[hidden email]> wrote:

> Hi,
>
>
>
> Thanks for the quick response. Thinking of a workaround right now, where I
> simply run the same application 100 times but I would like to avoid as much
> duplicate work as possible. Is it possible to "broadcast" nodes and edges
> datasets from one execution environment to the next without passing through
> a data sink to collect them into heap or write them to a file?
>
>
> Regards,
>
> Pieter-Jan Van Aeken
>
> 2015-08-18 12:11 GMT+02:00 Andra Lungu <[hidden email]>:
>
> > Hello Pieter,
> >
> > Nested iterations are indeed not supported in Flink.
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3Cop.xw24u7fhf7e33m@vaio-sb%3E
> >
> > The problem is not in your code.
> >
> >
> > On Tue, Aug 18, 2015 at 11:27 AM, Pieter-Jan Van Aeken <
> > [hidden email]> wrote:
> >
> > > Hello all,
> > >
> > >
> > > I am having some troubles getting nested iterations to work. The basic
> > > outline of my application looks like this :
> > >
> > > 1. create vertex dataset
> > > 2. create edge dataset
> > > 3. bulk iterate 100 times on edges {
> > >     3a. Create graph from nodes and edges
> > >     3b. Perform GatherSumApply (delta iteration)
> > >     3c. Map Vertices
> > >     3d. Perform GatherSumApply in other direction (again a delta
> > iteration)
> > >     3e. Join with edges on target
> > >     3f. Output new edges
> > > }
> > > 4. write edges to file
> > >
> > > Am I correct in assuming that the two delta iterations (GSA) inside the
> > > bulk iteration are not allowed at this point in time? Or should I
> > continue
> > > looking for bugs in my code? The stack trace doesn't help me all that
> > much:
> > >
> > > Exception in thread "main" java.lang.IllegalStateException
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
> > >     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
> > >     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> > >     at
> > >
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
> > >     at
> > >
> > >
> >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
> > >     at
> > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > >     at
> > >
> > >
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580)
> > >
> > >
> > > Regards,
> > >
> > > Pieter-Jan Van Aeken
> > >
> >
>