Hello all,
I am having some troubles getting nested iterations to work. The basic outline of my application looks like this : 1. create vertex dataset 2. create edge dataset 3. bulk iterate 100 times on edges { 3a. Create graph from nodes and edges 3b. Perform GatherSumApply (delta iteration) 3c. Map Vertices 3d. Perform GatherSumApply in other direction (again a delta iteration) 3e. Join with edges on target 3f. Output new edges } 4. write edges to file Am I correct in assuming that the two delta iterations (GSA) inside the bulk iteration are not allowed at this point in time? Or should I continue looking for bugs in my code? The stack trace doesn't help me all that much: Exception in thread "main" java.lang.IllegalStateException at org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342) at org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) at org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301) at org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333) at org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580) Regards, Pieter-Jan Van Aeken |
Hello Pieter,
Nested iterations are indeed not supported in Flink. http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3Cop.xw24u7fhf7e33m@vaio-sb%3E The problem is not in your code. On Tue, Aug 18, 2015 at 11:27 AM, Pieter-Jan Van Aeken < [hidden email]> wrote: > Hello all, > > > I am having some troubles getting nested iterations to work. The basic > outline of my application looks like this : > > 1. create vertex dataset > 2. create edge dataset > 3. bulk iterate 100 times on edges { > 3a. Create graph from nodes and edges > 3b. Perform GatherSumApply (delta iteration) > 3c. Map Vertices > 3d. Perform GatherSumApply in other direction (again a delta iteration) > 3e. Join with edges on target > 3f. Output new edges > } > 4. write edges to file > > Am I correct in assuming that the two delta iterations (GSA) inside the > bulk iteration are not allowed at this point in time? Or should I continue > looking for bugs in my code? The stack trace doesn't help me all that much: > > Exception in thread "main" java.lang.IllegalStateException > at > > org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86) > at > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > at > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > at > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > at > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > at > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > at > > org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342) > at > > org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) > at > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) > at > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > at > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > at > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > at > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > at > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > at > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > at > > org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301) > at > > org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396) > at > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333) > at > > org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173) > at > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) > at > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > at > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580) > > > Regards, > > Pieter-Jan Van Aeken > |
Hi,
Thanks for the quick response. Thinking of a workaround right now, where I simply run the same application 100 times but I would like to avoid as much duplicate work as possible. Is it possible to "broadcast" nodes and edges datasets from one execution environment to the next without passing through a data sink to collect them into heap or write them to a file? Regards, Pieter-Jan Van Aeken 2015-08-18 12:11 GMT+02:00 Andra Lungu <[hidden email]>: > Hello Pieter, > > Nested iterations are indeed not supported in Flink. > > http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3Cop.xw24u7fhf7e33m@vaio-sb%3E > > The problem is not in your code. > > > On Tue, Aug 18, 2015 at 11:27 AM, Pieter-Jan Van Aeken < > [hidden email]> wrote: > > > Hello all, > > > > > > I am having some troubles getting nested iterations to work. The basic > > outline of my application looks like this : > > > > 1. create vertex dataset > > 2. create edge dataset > > 3. bulk iterate 100 times on edges { > > 3a. Create graph from nodes and edges > > 3b. Perform GatherSumApply (delta iteration) > > 3c. Map Vertices > > 3d. Perform GatherSumApply in other direction (again a delta > iteration) > > 3e. Join with edges on target > > 3f. Output new edges > > } > > 4. write edges to file > > > > Am I correct in assuming that the two delta iterations (GSA) inside the > > bulk iteration are not allowed at this point in time? Or should I > continue > > looking for bugs in my code? The stack trace doesn't help me all that > much: > > > > Exception in thread "main" java.lang.IllegalStateException > > at > > > > > org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86) > > at > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > at > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > at > > > > > org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342) > > at > > > > > org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) > > at > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > at > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > at > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > > at > > > > > org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396) > > at > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333) > > at > > > > > org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204) > > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500) > > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > > at > > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173) > > at > > > > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) > > at > > > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > > at > > > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580) > > > > > > Regards, > > > > Pieter-Jan Van Aeken > > > |
Hi Pieter!
The pattern you need is common and we have code for such loops in the works. It is being delayed right now due to work on fault-tolerance / high availability, but we plan to resume with that after those issues are done. Greetings, Stephan On Tue, Aug 18, 2015 at 12:38 PM, Pieter-Jan Van Aeken < [hidden email]> wrote: > Hi, > > > > Thanks for the quick response. Thinking of a workaround right now, where I > simply run the same application 100 times but I would like to avoid as much > duplicate work as possible. Is it possible to "broadcast" nodes and edges > datasets from one execution environment to the next without passing through > a data sink to collect them into heap or write them to a file? > > > Regards, > > Pieter-Jan Van Aeken > > 2015-08-18 12:11 GMT+02:00 Andra Lungu <[hidden email]>: > > > Hello Pieter, > > > > Nested iterations are indeed not supported in Flink. > > > > > http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3Cop.xw24u7fhf7e33m@vaio-sb%3E > > > > The problem is not in your code. > > > > > > On Tue, Aug 18, 2015 at 11:27 AM, Pieter-Jan Van Aeken < > > [hidden email]> wrote: > > > > > Hello all, > > > > > > > > > I am having some troubles getting nested iterations to work. The basic > > > outline of my application looks like this : > > > > > > 1. create vertex dataset > > > 2. create edge dataset > > > 3. bulk iterate 100 times on edges { > > > 3a. Create graph from nodes and edges > > > 3b. Perform GatherSumApply (delta iteration) > > > 3c. Map Vertices > > > 3d. Perform GatherSumApply in other direction (again a delta > > iteration) > > > 3e. Join with edges on target > > > 3f. Output new edges > > > } > > > 4. write edges to file > > > > > > Am I correct in assuming that the two delta iterations (GSA) inside the > > > bulk iteration are not allowed at this point in time? Or should I > > continue > > > looking for bugs in my code? The stack trace doesn't help me all that > > much: > > > > > > Exception in thread "main" java.lang.IllegalStateException > > > at > > > > > > > > > org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333) > > > at > > > > > > > > > org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204) > > > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500) > > > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > > > at > > > > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173) > > > at > > > > > > > > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) > > > at > > > > > > > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > > > at > > > > > > > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580) > > > > > > > > > Regards, > > > > > > Pieter-Jan Van Aeken > > > > > > |
Free forum by Nabble | Edit this page |