Dear ladies and gentlemen, I got a problem using Gelly in Flink. Currently I am loading a Virtuoso Graph into Flink's Gelly and I want to analyze it for the different paths one can take to link the different nodes. Therefore I am using the ScatterGatherIteration. However, my code just works with about ten to twenty nodes. When I try to upload a hundred nodes, the following error occurs: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: 33685504 Message: null at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392) at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54) at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(GatherFunction.java:123) at org.apache.flink.quickstart.PathRank$PathUpdateFunction.updateVertex(PathRank.java:357) at org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) I tried to google it a bit, and this problems seems to occur often when using Gelly. I hope you have any ideas or approaches how I can handle this error. Thank you in advance! All the best, Dennis |
Hi Dennis,
can you give us some details about your setup? e.g. where you are running your job, your input size, the configured memory, etc. It would also be helpful if you could share your code. Getting an out of memory error with just 100 nodes seems weird. Best, -Vasia. On 6 October 2016 at 13:29, <[hidden email]> wrote: > > Dear ladies and gentlemen, > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > Graph into > Flink's Gelly and I want to analyze it for the different paths one can > take to link > the different nodes. Therefore I am using the ScatterGatherIteration. > However, my code just works with about ten to twenty nodes. When I try to > upload > a hundred nodes, the following error occurs: > > Exception in thread "main" org.apache.flink.runtime. > client.JobExecutionException: Job execution failed. > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > mcV$sp(JobManager.scala:822) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > 33685504 Message: null > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > insertRecordIntoPartition(CompactingHashTable.java:457) > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > insertOrReplaceRecord(CompactingHashTable.java:392) > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect > or.collect(SolutionSetUpdateOutputCollector.java:54) > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > GatherFunction.java:123) > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > updateVertex(PathRank.java:357) > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > AbstractIterativeTask.java:146) > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > IterationTailTask.java:107) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > > I tried to google it a bit, and this problems seems to occur often when > using Gelly. I hope you have any ideas or approaches how I can handle this > error. > > Thank you in advance! > All the best, > Dennis > |
Hi Vasia,
thanks for your reply. Currently I am testing it on my normal workstation (16GB Ram) but I also tried it on out cluster. Both are failing at the same amount of nodes, so I guess it has something to do with Gelly or with the properties. The configured memory is default. I did not change it because I thought that flink is not the problem but I might be wrong. The Input should not be much... I wrote an API for Virtuoso which is requesting a RDF-graph. But I limited it to 10 Data Sets only. This is my code, it is a bit messy and their might be improvement: public static final class PathMessageFunction extends ScatterFunction<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>, List<String>, String> { @Override public void sendMessages( Vertex<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>> vertex) throws Exception { // The list "path" collects the ID's of the verticies a message was send to. List<String> path = new ArrayList<String>(); if (super.getSuperstepNumber() == 1) { path.add(vertex.getId()); } if (super.getSuperstepNumber() > 1) { for (String values : vertex.f1.f1.get(super.getSuperstepNumber() - 1)) { path.add(values + ";" + vertex.getId()); } } // The Path-List is send to the next neighbouring Nodes. for (Edge<String, String> edge : getEdges()) { sendMessageTo(edge.getTarget(), path); } } } public static final class PathUpdateFunction extends GatherFunction<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>, List<String>> { @Override public void updateVertex( Vertex<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>> vertex, MessageIterator<List<String>> messenger) throws Exception { List<String> newValues = new ArrayList<String>(); // The Path-List which was send as a message is also stored within the vertex value, therefore the Paths are saved to a new List "newValues". // This List should not contain the ID of the vertex itself to avoid cycles. for (List<String> msg : messenger) { for (String value : msg) { if (!value.contains(vertex.getId())) { newValues.add(value); } } } // Creation of a new HashMap with the new and old values for the setNewVertexValue function HashMap<Integer, List<String>> newHashMap = vertex.f1.f1; newHashMap.put(super.getSuperstepNumber(), newValues); HashMap<String, List<String>> multiPaths = new HashMap<String, List<String>>(); // Here it gets a bit complicated... However... I try to analyze the given paths for possible combinations of them. // For example... I got the path "a;b;c" and the patch "c;d;e", so I predict that "a;b;c;d;e" should also be possible. for (int i = 0; i < oriList.size(); i++) { String oriTemp = oriList.get(i); String destTemp = destList.get(i); String oriDest = oriTemp + destTemp; List<String> tempList = new ArrayList<String>(); List<String> setsWithOrigin = new ArrayList<String>(); List<String> setsWithDestination = new ArrayList<String>(); for (Entry<Integer, List<String>> entry : newHashMap.entrySet()) { for (String value : entry.getValue()) { if (value.contains(oriTemp)) { setsWithOrigin.add(value); } if (value.contains(destTemp)) { setsWithDestination.add(value); } } } for (String originIter : setsWithOrigin) { for (String destinationIter : setsWithDestination) { String concat = ""; if ((originIter.indexOf(oriTemp) == 0 && destinationIter .indexOf(destTemp) == 0)) { String reverse = destinationIter; if (destinationIter.length() > 1) { reverse = ""; int d = destinationIter.length(); for (int a = 0; a < destinationIter.length(); a++) { reverse = reverse + destinationIter.substring(d - 1, d); d--; } } concat = originIter + ";" + vertex.getId() + ";" + reverse; } if (isFormatValid(concat) && concat.length() > 0) { if (!tempList.contains(concat)) { tempList.add(concat); } } } } multiPaths.put(oriDest, tempList); } // The combined paths are also saved into a HashMap which is additionally set as a Vertex Value // Later the paths are filtered for redundance Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>( multiPaths, newHashMap); setNewVertexValue(testTuple3); } } Let me know if you need any further information. Thanks in advance. All the best, Dennis Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr Von: "Vasiliki Kalavri" <[hidden email]> An: [hidden email] Betreff: Re: Flink Gelly Hi Dennis, can you give us some details about your setup? e.g. where you are running your job, your input size, the configured memory, etc. It would also be helpful if you could share your code. Getting an out of memory error with just 100 nodes seems weird. Best, -Vasia. On 6 October 2016 at 13:29, <[hidden email]> wrote: > > Dear ladies and gentlemen, > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > Graph into > Flink's Gelly and I want to analyze it for the different paths one can > take to link > the different nodes. Therefore I am using the ScatterGatherIteration. > However, my code just works with about ten to twenty nodes. When I try to > upload > a hundred nodes, the following error occurs: > > Exception in thread "main" org.apache.flink.runtime. > client.JobExecutionException: Job execution failed. > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > mcV$sp(JobManager.scala:822) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > 33685504 Message: null > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > insertRecordIntoPartition(CompactingHashTable.java:457) > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > insertOrReplaceRecord(CompactingHashTable.java:392) > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect > or.collect(SolutionSetUpdateOutputCollector.java:54) > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > GatherFunction.java:123) > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > updateVertex(PathRank.java:357) > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > AbstractIterativeTask.java:146) > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > IterationTailTask.java:107) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > > I tried to google it a bit, and this problems seems to occur often when > using Gelly. I hope you have any ideas or approaches how I can handle this > error. > > Thank you in advance! > All the best, > Dennis > |
Hi Dennis,
On 7 October 2016 at 11:29, <[hidden email]> wrote: > Hi Vasia, > > thanks for your reply. > > Currently I am testing it on my normal workstation (16GB Ram) but I also > tried it on out cluster. > Both are failing at the same amount of nodes, so I guess it has something > to do with Gelly > or with the properties. > > The configured memory is default. I did not change it because I thought > that flink is not the problem > but I might be wrong. > The default configured memory is only 512MB. If you have 16GB but you don't let Flink know about it, it won't use it. I see that your vertex values are Tuple2's of HashMap's of Lists and I suspect these grow big. If you're running the program from your IDE make sure to add a VM option. If you're running the program from the command line, make sure to edit the flink-conf.yaml file. You can find the available configuration options and what they mean in the docs [1]. I hope this helps, -Vasia. [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html > > The Input should not be much... I wrote an API for Virtuoso which is > requesting a RDF-graph. But > I limited it to 10 Data Sets only. > > This is my code, it is a bit messy and their might be improvement: > > > public static final class PathMessageFunction > extends > ScatterFunction<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>, List<String>, String> { > @Override > public void sendMessages( > Vertex<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>> vertex) > throws Exception { > > // The list "path" collects the ID's of the verticies a > message was send to. > > List<String> path = new ArrayList<String>(); > if (super.getSuperstepNumber() == 1) { > path.add(vertex.getId()); > } > if (super.getSuperstepNumber() > 1) { > for (String values : vertex.f1.f1.get(super.getSuperstepNumber() > - 1)) { > path.add(values + ";" + vertex.getId()); > } > } > > // The Path-List is send to the next neighbouring Nodes. > > for (Edge<String, String> edge : getEdges()) { > sendMessageTo(edge.getTarget(), path); > } > } > } > > > > public static final class PathUpdateFunction > extends > GatherFunction<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>, List<String>> { > @Override > public void updateVertex( > Vertex<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>> vertex, > MessageIterator<List<String>> messenger) > throws Exception { > > List<String> newValues = new ArrayList<String>(); > > // The Path-List which was send as a message is also stored > within the vertex value, therefore the Paths are saved to a new List > "newValues". > // This List should not contain the ID of the vertex itself to > avoid cycles. > > for (List<String> msg : messenger) { > for (String value : msg) { > if (!value.contains(vertex.getId())) { > newValues.add(value); > } > } > } > > // Creation of a new HashMap with the new and old values for > the setNewVertexValue function > > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1; > newHashMap.put(super.getSuperstepNumber(), newValues); > > > HashMap<String, List<String>> multiPaths = new HashMap<String, > List<String>>(); > > > // Here it gets a bit complicated... However... I try to > analyze the given paths for possible combinations of them. > // For example... I got the path "a;b;c" and the patch > "c;d;e", so I predict that "a;b;c;d;e" should also be possible. > > for (int i = 0; i < oriList.size(); i++) { > > String oriTemp = oriList.get(i); > String destTemp = destList.get(i); > > String oriDest = oriTemp + destTemp; > > List<String> tempList = new ArrayList<String>(); > List<String> setsWithOrigin = new ArrayList<String>(); > List<String> setsWithDestination = new ArrayList<String>(); > for (Entry<Integer, List<String>> entry : > newHashMap.entrySet()) { > for (String value : entry.getValue()) { > if (value.contains(oriTemp)) { > setsWithOrigin.add(value); > } > if (value.contains(destTemp)) { > setsWithDestination.add(value); > } > } > } > for (String originIter : setsWithOrigin) { > for (String destinationIter : setsWithDestination) { > String concat = ""; > if ((originIter.indexOf(oriTemp) == 0 && > destinationIter > .indexOf(destTemp) == 0)) { > String reverse = destinationIter; > if (destinationIter.length() > 1) { > reverse = ""; > int d = destinationIter.length(); > for (int a = 0; a < > destinationIter.length(); a++) { > reverse = reverse > + destinationIter.substring(d > - 1, > d); > d--; > } > } > concat = originIter + ";" + vertex.getId() + > ";" > + reverse; > } > if (isFormatValid(concat) && concat.length() > 0) { > if (!tempList.contains(concat)) { > tempList.add(concat); > } > } > } > } > multiPaths.put(oriDest, tempList); > } > > > // The combined paths are also saved into a HashMap which is > additionally set as a Vertex Value > // Later the paths are filtered for redundance > > Tuple2<HashMap<String, List<String>>, HashMap<Integer, > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>( > multiPaths, newHashMap); > > > setNewVertexValue(testTuple3); > } > } > > > Let me know if you need any further information. > Thanks in advance. > > All the best, > Dennis > > > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr > Von: "Vasiliki Kalavri" <[hidden email]> > An: [hidden email] > Betreff: Re: Flink Gelly > Hi Dennis, > > can you give us some details about your setup? e.g. where you are running > your job, your input size, the configured memory, etc. It would also be > helpful if you could share your code. Getting an out of memory error with > just 100 nodes seems weird. > > Best, > -Vasia. > > On 6 October 2016 at 13:29, <[hidden email]> wrote: > > > > > Dear ladies and gentlemen, > > > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > > Graph into > > Flink's Gelly and I want to analyze it for the different paths one can > > take to link > > the different nodes. Therefore I am using the ScatterGatherIteration. > > However, my code just works with about ten to twenty nodes. When I try to > > upload > > a hundred nodes, the following error occurs: > > > > Exception in thread "main" org.apache.flink.runtime. > > client.JobExecutionException: Job execution failed. > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > > mcV$sp(JobManager.scala:822) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > JobManager.scala:768) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > JobManager.scala:768) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > liftedTree1$1(Future.scala:24) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > Future.scala:24) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > > AbstractDispatcher.scala:401) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > ForkJoinPool.java:1979) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > ForkJoinWorkerThread.java:107) > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > > 33685504 Message: null > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > insertRecordIntoPartition(CompactingHashTable.java:457) > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > insertOrReplaceRecord(CompactingHashTable.java:392) > > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect > > or.collect(SolutionSetUpdateOutputCollector.java:54) > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > > GatherFunction.java:123) > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > > updateVertex(PathRank.java:357) > > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > > AbstractIterativeTask.java:146) > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > > IterationTailTask.java:107) > > at org.apache.flink.runtime.operators.BatchTask.invoke( > BatchTask.java:351) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > at java.lang.Thread.run(Thread.java:745) > > > > > > I tried to google it a bit, and this problems seems to occur often when > > using Gelly. I hope you have any ideas or approaches how I can handle > this > > error. > > > > Thank you in advance! > > All the best, > > Dennis > > > |
Hi again, I tried to change some configs and set the available amount memory to 4096mb but there is no difference at all. Furthermore I monitored the usage of of RAM by the JVM and it does not go beyond 3gb at all. I still don't think that the memory is the propblem. For me it looks like an internal problem with Gelly. Best, Dennis Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr Von: "Vasiliki Kalavri" <[hidden email]> An: [hidden email] Betreff: Re: Re: Flink Gelly Hi Dennis, On 7 October 2016 at 11:29, <[hidden email]> wrote: > Hi Vasia, > > thanks for your reply. > > Currently I am testing it on my normal workstation (16GB Ram) but I also > tried it on out cluster. > Both are failing at the same amount of nodes, so I guess it has something > to do with Gelly > or with the properties. > > The configured memory is default. I did not change it because I thought > that flink is not the problem > but I might be wrong. > The default configured memory is only 512MB. If you have 16GB but you don't let Flink know about it, it won't use it. I see that your vertex values are Tuple2's of HashMap's of Lists and I suspect these grow big. If you're running the program from your IDE make sure to add a VM option. If you're running the program from the command line, make sure to edit the flink-conf.yaml file. You can find the available configuration options and what they mean in the docs [1]. I hope this helps, -Vasia. [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html > > The Input should not be much... I wrote an API for Virtuoso which is > requesting a RDF-graph. But > I limited it to 10 Data Sets only. > > This is my code, it is a bit messy and their might be improvement: > > > public static final class PathMessageFunction > extends > ScatterFunction<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>, List<String>, String> { > @Override > public void sendMessages( > Vertex<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>> vertex) > throws Exception { > > // The list "path" collects the ID's of the verticies a > message was send to. > > List<String> path = new ArrayList<String>(); > if (super.getSuperstepNumber() == 1) { > path.add(vertex.getId()); > } > if (super.getSuperstepNumber() > 1) { > for (String values : vertex.f1.f1.get(super.getSuperstepNumber() > - 1)) { > path.add(values + ";" + vertex.getId()); > } > } > > // The Path-List is send to the next neighbouring Nodes. > > for (Edge<String, String> edge : getEdges()) { > sendMessageTo(edge.getTarget(), path); > } > } > } > > > > public static final class PathUpdateFunction > extends > GatherFunction<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>, List<String>> { > @Override > public void updateVertex( > Vertex<String, Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>> vertex, > MessageIterator<List<String>> messenger) > throws Exception { > > List<String> newValues = new ArrayList<String>(); > > // The Path-List which was send as a message is also stored > within the vertex value, therefore the Paths are saved to a new List > "newValues". > // This List should not contain the ID of the vertex itself to > avoid cycles. > > for (List<String> msg : messenger) { > for (String value : msg) { > if (!value.contains(vertex.getId())) { > newValues.add(value); > } > } > } > > // Creation of a new HashMap with the new and old values for > the setNewVertexValue function > > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1; > newHashMap.put(super.getSuperstepNumber(), newValues); > > > HashMap<String, List<String>> multiPaths = new HashMap<String, > List<String>>(); > > > // Here it gets a bit complicated... However... I try to > analyze the given paths for possible combinations of them. > // For example... I got the path "a;b;c" and the patch > "c;d;e", so I predict that "a;b;c;d;e" should also be possible. > > for (int i = 0; i < oriList.size(); i++) { > > String oriTemp = oriList.get(i); > String destTemp = destList.get(i); > > String oriDest = oriTemp + destTemp; > > List<String> tempList = new ArrayList<String>(); > List<String> setsWithOrigin = new ArrayList<String>(); > List<String> setsWithDestination = new ArrayList<String>(); > for (Entry<Integer, List<String>> entry : > newHashMap.entrySet()) { > for (String value : entry.getValue()) { > if (value.contains(oriTemp)) { > setsWithOrigin.add(value); > } > if (value.contains(destTemp)) { > setsWithDestination.add(value); > } > } > } > for (String originIter : setsWithOrigin) { > for (String destinationIter : setsWithDestination) { > String concat = ""; > if ((originIter.indexOf(oriTemp) == 0 && > destinationIter > .indexOf(destTemp) == 0)) { > String reverse = destinationIter; > if (destinationIter.length() > 1) { > reverse = ""; > int d = destinationIter.length(); > for (int a = 0; a < > destinationIter.length(); a++) { > reverse = reverse > + destinationIter.substring(d > - 1, > d); > d--; > } > } > concat = originIter + ";" + vertex.getId() + > ";" > + reverse; > } > if (isFormatValid(concat) && concat.length() > 0) { > if (!tempList.contains(concat)) { > tempList.add(concat); > } > } > } > } > multiPaths.put(oriDest, tempList); > } > > > // The combined paths are also saved into a HashMap which is > additionally set as a Vertex Value > // Later the paths are filtered for redundance > > Tuple2<HashMap<String, List<String>>, HashMap<Integer, > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, > HashMap<Integer, List<String>>>( > multiPaths, newHashMap); > > > setNewVertexValue(testTuple3); > } > } > > > Let me know if you need any further information. > Thanks in advance. > > All the best, > Dennis > > > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr > Von: "Vasiliki Kalavri" <[hidden email]> > An: [hidden email] > Betreff: Re: Flink Gelly > Hi Dennis, > > can you give us some details about your setup? e.g. where you are running > your job, your input size, the configured memory, etc. It would also be > helpful if you could share your code. Getting an out of memory error with > just 100 nodes seems weird. > > Best, > -Vasia. > > On 6 October 2016 at 13:29, <[hidden email]> wrote: > > > > > Dear ladies and gentlemen, > > > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > > Graph into > > Flink's Gelly and I want to analyze it for the different paths one can > > take to link > > the different nodes. Therefore I am using the ScatterGatherIteration. > > However, my code just works with about ten to twenty nodes. When I try to > > upload > > a hundred nodes, the following error occurs: > > > > Exception in thread "main" org.apache.flink.runtime. > > client.JobExecutionException: Job execution failed. > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > > mcV$sp(JobManager.scala:822) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > JobManager.scala:768) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > JobManager.scala:768) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > liftedTree1$1(Future.scala:24) > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > Future.scala:24) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > > AbstractDispatcher.scala:401) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > ForkJoinPool.java:1979) > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > ForkJoinWorkerThread.java:107) > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > > 33685504 Message: null > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > insertRecordIntoPartition(CompactingHashTable.java:457) > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > insertOrReplaceRecord(CompactingHashTable.java:392) > > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect > > or.collect(SolutionSetUpdateOutputCollector.java:54) > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > > GatherFunction.java:123) > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > > updateVertex(PathRank.java:357) > > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > > AbstractIterativeTask.java:146) > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > > IterationTailTask.java:107) > > at org.apache.flink.runtime.operators.BatchTask.invoke( > BatchTask.java:351) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > at java.lang.Thread.run(Thread.java:745) > > > > > > I tried to google it a bit, and this problems seems to occur often when > > using Gelly. I hope you have any ideas or approaches how I can handle > this > > error. > > > > Thank you in advance! > > All the best, > > Dennis > > > |
Hi,
the exception > java.lang.RuntimeException: Memory ran out. Compaction failed. says that the hash table ran out of memory. Gelly is implemented on top of Flink's DataSet API. So this would rather be a problem with DataSet than Gelly. I think Vasia is right about the memory configuration. Providing more memory usually is the solution to this problem. Another thing you can do is to try a hashtable in JVM memory instead of managed memory. See the options to set the solution set to unmanaged memory in the docs [1] Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/libs/gelly.html#iterative-graph-processing 2016-10-07 13:21 GMT+02:00 <[hidden email]>: > > > Hi again, > > I tried to change some configs and set the available amount memory to > 4096mb > but there is no difference at all. Furthermore I monitored the usage of of > RAM by > the JVM and it does not go beyond 3gb at all. > > I still don't think that the memory is the propblem. For me it looks like > an internal > problem with Gelly. > > Best, > Dennis > > > Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr > Von: "Vasiliki Kalavri" <[hidden email]> > An: [hidden email] > Betreff: Re: Re: Flink Gelly > Hi Dennis, > > On 7 October 2016 at 11:29, <[hidden email]> wrote: > > > Hi Vasia, > > > > thanks for your reply. > > > > Currently I am testing it on my normal workstation (16GB Ram) but I also > > tried it on out cluster. > > Both are failing at the same amount of nodes, so I guess it has something > > to do with Gelly > > or with the properties. > > > > The configured memory is default. I did not change it because I thought > > that flink is not the problem > > but I might be wrong. > > > > The default configured memory is only 512MB. If you have 16GB but you > don't let Flink know about it, it won't use it. > I see that your vertex values are Tuple2's of HashMap's of Lists and I > suspect these grow big. > If you're running the program from your IDE make sure to add a VM option. > If you're running the program from the command line, make sure to edit the > flink-conf.yaml file. You can find the available configuration options and > what they mean in the docs [1]. > > I hope this helps, > -Vasia. > > [1]: > https://ci.apache.org/projects/flink/flink-docs- > release-1.1/setup/config.html > > > > > > > > The Input should not be much... I wrote an API for Virtuoso which is > > requesting a RDF-graph. But > > I limited it to 10 Data Sets only. > > > > This is my code, it is a bit messy and their might be improvement: > > > > > > public static final class PathMessageFunction > > extends > > ScatterFunction<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>, List<String>, String> { > > @Override > > public void sendMessages( > > Vertex<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>> vertex) > > throws Exception { > > > > // The list "path" collects the ID's of the verticies a > > message was send to. > > > > List<String> path = new ArrayList<String>(); > > if (super.getSuperstepNumber() == 1) { > > path.add(vertex.getId()); > > } > > if (super.getSuperstepNumber() > 1) { > > for (String values : vertex.f1.f1.get(super.getSuperstepNumber() > > - 1)) { > > path.add(values + ";" + vertex.getId()); > > } > > } > > > > // The Path-List is send to the next neighbouring Nodes. > > > > for (Edge<String, String> edge : getEdges()) { > > sendMessageTo(edge.getTarget(), path); > > } > > } > > } > > > > > > > > public static final class PathUpdateFunction > > extends > > GatherFunction<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>, List<String>> { > > @Override > > public void updateVertex( > > Vertex<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>> vertex, > > MessageIterator<List<String>> messenger) > > throws Exception { > > > > List<String> newValues = new ArrayList<String>(); > > > > // The Path-List which was send as a message is also stored > > within the vertex value, therefore the Paths are saved to a new List > > "newValues". > > // This List should not contain the ID of the vertex itself to > > avoid cycles. > > > > for (List<String> msg : messenger) { > > for (String value : msg) { > > if (!value.contains(vertex.getId())) { > > newValues.add(value); > > } > > } > > } > > > > // Creation of a new HashMap with the new and old values for > > the setNewVertexValue function > > > > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1; > > newHashMap.put(super.getSuperstepNumber(), newValues); > > > > > > HashMap<String, List<String>> multiPaths = new HashMap<String, > > List<String>>(); > > > > > > // Here it gets a bit complicated... However... I try to > > analyze the given paths for possible combinations of them. > > // For example... I got the path "a;b;c" and the patch > > "c;d;e", so I predict that "a;b;c;d;e" should also be possible. > > > > for (int i = 0; i < oriList.size(); i++) { > > > > String oriTemp = oriList.get(i); > > String destTemp = destList.get(i); > > > > String oriDest = oriTemp + destTemp; > > > > List<String> tempList = new ArrayList<String>(); > > List<String> setsWithOrigin = new ArrayList<String>(); > > List<String> setsWithDestination = new ArrayList<String>(); > > for (Entry<Integer, List<String>> entry : > > newHashMap.entrySet()) { > > for (String value : entry.getValue()) { > > if (value.contains(oriTemp)) { > > setsWithOrigin.add(value); > > } > > if (value.contains(destTemp)) { > > setsWithDestination.add(value); > > } > > } > > } > > for (String originIter : setsWithOrigin) { > > for (String destinationIter : setsWithDestination) { > > String concat = ""; > > if ((originIter.indexOf(oriTemp) == 0 && > > destinationIter > > .indexOf(destTemp) == 0)) { > > String reverse = destinationIter; > > if (destinationIter.length() > 1) { > > reverse = ""; > > int d = destinationIter.length(); > > for (int a = 0; a < > > destinationIter.length(); a++) { > > reverse = reverse > > + destinationIter.substring(d > > - 1, > > d); > > d--; > > } > > } > > concat = originIter + ";" + vertex.getId() + > > ";" > > + reverse; > > } > > if (isFormatValid(concat) && concat.length() > 0) { > > if (!tempList.contains(concat)) { > > tempList.add(concat); > > } > > } > > } > > } > > multiPaths.put(oriDest, tempList); > > } > > > > > > // The combined paths are also saved into a HashMap which is > > additionally set as a Vertex Value > > // Later the paths are filtered for redundance > > > > Tuple2<HashMap<String, List<String>>, HashMap<Integer, > > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>( > > multiPaths, newHashMap); > > > > > > setNewVertexValue(testTuple3); > > } > > } > > > > > > Let me know if you need any further information. > > Thanks in advance. > > > > All the best, > > Dennis > > > > > > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr > > Von: "Vasiliki Kalavri" <[hidden email]> > > An: [hidden email] > > Betreff: Re: Flink Gelly > > Hi Dennis, > > > > can you give us some details about your setup? e.g. where you are running > > your job, your input size, the configured memory, etc. It would also be > > helpful if you could share your code. Getting an out of memory error with > > just 100 nodes seems weird. > > > > Best, > > -Vasia. > > > > On 6 October 2016 at 13:29, <[hidden email]> wrote: > > > > > > > > Dear ladies and gentlemen, > > > > > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > > > Graph into > > > Flink's Gelly and I want to analyze it for the different paths one can > > > take to link > > > the different nodes. Therefore I am using the ScatterGatherIteration. > > > However, my code just works with about ten to twenty nodes. When I try > to > > > upload > > > a hundred nodes, the following error occurs: > > > > > > Exception in thread "main" org.apache.flink.runtime. > > > client.JobExecutionException: Job execution failed. > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > > > mcV$sp(JobManager.scala:822) > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > > JobManager.scala:768) > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > > JobManager.scala:768) > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > > liftedTree1$1(Future.scala:24) > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > > Future.scala:24) > > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > > > AbstractDispatcher.scala:401) > > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > > runTask(ForkJoinPool.java:1339) > > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > > ForkJoinPool.java:1979) > > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > > ForkJoinWorkerThread.java:107) > > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction > failed. > > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > > > 33685504 Message: null > > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > > insertRecordIntoPartition(CompactingHashTable.java:457) > > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > > insertOrReplaceRecord(CompactingHashTable.java:392) > > > at org.apache.flink.runtime.iterative.io. > SolutionSetUpdateOutputCollect > > > or.collect(SolutionSetUpdateOutputCollector.java:54) > > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > > > GatherFunction.java:123) > > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > > > updateVertex(PathRank.java:357) > > > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > > > at org.apache.flink.runtime.operators.BatchTask.run( > BatchTask.java:486) > > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > > > AbstractIterativeTask.java:146) > > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > > > IterationTailTask.java:107) > > > at org.apache.flink.runtime.operators.BatchTask.invoke( > > BatchTask.java:351) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > I tried to google it a bit, and this problems seems to occur often when > > > using Gelly. I hope you have any ideas or approaches how I can handle > > this > > > error. > > > > > > Thank you in advance! > > > All the best, > > > Dennis > > > > > > |
Hi Fabian, thanks for your reply. I guess I will have to change something in my code in general to keep the Hash Tables rather small. Have a nice weekend! Best, Dennis Gesendet: Freitag, 07. Oktober 2016 um 13:31 Uhr Von: "Fabian Hueske" <[hidden email]> An: "[hidden email]" <[hidden email]> Betreff: Re: Re: Re: Flink Gelly Hi, the exception > java.lang.RuntimeException: Memory ran out. Compaction failed. says that the hash table ran out of memory. Gelly is implemented on top of Flink's DataSet API. So this would rather be a problem with DataSet than Gelly. I think Vasia is right about the memory configuration. Providing more memory usually is the solution to this problem. Another thing you can do is to try a hashtable in JVM memory instead of managed memory. See the options to set the solution set to unmanaged memory in the docs [1] Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/libs/gelly.html#iterative-graph-processing 2016-10-07 13:21 GMT+02:00 <[hidden email]>: > > > Hi again, > > I tried to change some configs and set the available amount memory to > 4096mb > but there is no difference at all. Furthermore I monitored the usage of of > RAM by > the JVM and it does not go beyond 3gb at all. > > I still don't think that the memory is the propblem. For me it looks like > an internal > problem with Gelly. > > Best, > Dennis > > > Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr > Von: "Vasiliki Kalavri" <[hidden email]> > An: [hidden email] > Betreff: Re: Re: Flink Gelly > Hi Dennis, > > On 7 October 2016 at 11:29, <[hidden email]> wrote: > > > Hi Vasia, > > > > thanks for your reply. > > > > Currently I am testing it on my normal workstation (16GB Ram) but I also > > tried it on out cluster. > > Both are failing at the same amount of nodes, so I guess it has something > > to do with Gelly > > or with the properties. > > > > The configured memory is default. I did not change it because I thought > > that flink is not the problem > > but I might be wrong. > > > > The default configured memory is only 512MB. If you have 16GB but you > don't let Flink know about it, it won't use it. > I see that your vertex values are Tuple2's of HashMap's of Lists and I > suspect these grow big. > If you're running the program from your IDE make sure to add a VM option. > If you're running the program from the command line, make sure to edit the > flink-conf.yaml file. You can find the available configuration options and > what they mean in the docs [1]. > > I hope this helps, > -Vasia. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-[https://ci.apache.org/projects/flink/flink-docs-] > release-1.1/setup/config.html > > > > > > > > The Input should not be much... I wrote an API for Virtuoso which is > > requesting a RDF-graph. But > > I limited it to 10 Data Sets only. > > > > This is my code, it is a bit messy and their might be improvement: > > > > > > public static final class PathMessageFunction > > extends > > ScatterFunction<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>, List<String>, String> { > > @Override > > public void sendMessages( > > Vertex<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>> vertex) > > throws Exception { > > > > // The list "path" collects the ID's of the verticies a > > message was send to. > > > > List<String> path = new ArrayList<String>(); > > if (super.getSuperstepNumber() == 1) { > > path.add(vertex.getId()); > > } > > if (super.getSuperstepNumber() > 1) { > > for (String values : vertex.f1.f1.get(super.getSuperstepNumber() > > - 1)) { > > path.add(values + ";" + vertex.getId()); > > } > > } > > > > // The Path-List is send to the next neighbouring Nodes. > > > > for (Edge<String, String> edge : getEdges()) { > > sendMessageTo(edge.getTarget(), path); > > } > > } > > } > > > > > > > > public static final class PathUpdateFunction > > extends > > GatherFunction<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>, List<String>> { > > @Override > > public void updateVertex( > > Vertex<String, Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>> vertex, > > MessageIterator<List<String>> messenger) > > throws Exception { > > > > List<String> newValues = new ArrayList<String>(); > > > > // The Path-List which was send as a message is also stored > > within the vertex value, therefore the Paths are saved to a new List > > "newValues". > > // This List should not contain the ID of the vertex itself to > > avoid cycles. > > > > for (List<String> msg : messenger) { > > for (String value : msg) { > > if (!value.contains(vertex.getId())) { > > newValues.add(value); > > } > > } > > } > > > > // Creation of a new HashMap with the new and old values for > > the setNewVertexValue function > > > > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1; > > newHashMap.put(super.getSuperstepNumber(), newValues); > > > > > > HashMap<String, List<String>> multiPaths = new HashMap<String, > > List<String>>(); > > > > > > // Here it gets a bit complicated... However... I try to > > analyze the given paths for possible combinations of them. > > // For example... I got the path "a;b;c" and the patch > > "c;d;e", so I predict that "a;b;c;d;e" should also be possible. > > > > for (int i = 0; i < oriList.size(); i++) { > > > > String oriTemp = oriList.get(i); > > String destTemp = destList.get(i); > > > > String oriDest = oriTemp + destTemp; > > > > List<String> tempList = new ArrayList<String>(); > > List<String> setsWithOrigin = new ArrayList<String>(); > > List<String> setsWithDestination = new ArrayList<String>(); > > for (Entry<Integer, List<String>> entry : > > newHashMap.entrySet()) { > > for (String value : entry.getValue()) { > > if (value.contains(oriTemp)) { > > setsWithOrigin.add(value); > > } > > if (value.contains(destTemp)) { > > setsWithDestination.add(value); > > } > > } > > } > > for (String originIter : setsWithOrigin) { > > for (String destinationIter : setsWithDestination) { > > String concat = ""; > > if ((originIter.indexOf(oriTemp) == 0 && > > destinationIter > > .indexOf(destTemp) == 0)) { > > String reverse = destinationIter; > > if (destinationIter.length() > 1) { > > reverse = ""; > > int d = destinationIter.length(); > > for (int a = 0; a < > > destinationIter.length(); a++) { > > reverse = reverse > > + destinationIter.substring(d > > - 1, > > d); > > d--; > > } > > } > > concat = originIter + ";" + vertex.getId() + > > ";" > > + reverse; > > } > > if (isFormatValid(concat) && concat.length() > 0) { > > if (!tempList.contains(concat)) { > > tempList.add(concat); > > } > > } > > } > > } > > multiPaths.put(oriDest, tempList); > > } > > > > > > // The combined paths are also saved into a HashMap which is > > additionally set as a Vertex Value > > // Later the paths are filtered for redundance > > > > Tuple2<HashMap<String, List<String>>, HashMap<Integer, > > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, > > HashMap<Integer, List<String>>>( > > multiPaths, newHashMap); > > > > > > setNewVertexValue(testTuple3); > > } > > } > > > > > > Let me know if you need any further information. > > Thanks in advance. > > > > All the best, > > Dennis > > > > > > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr > > Von: "Vasiliki Kalavri" <[hidden email]> > > An: [hidden email] > > Betreff: Re: Flink Gelly > > Hi Dennis, > > > > can you give us some details about your setup? e.g. where you are running > > your job, your input size, the configured memory, etc. It would also be > > helpful if you could share your code. Getting an out of memory error with > > just 100 nodes seems weird. > > > > Best, > > -Vasia. > > > > On 6 October 2016 at 13:29, <[hidden email]> wrote: > > > > > > > > Dear ladies and gentlemen, > > > > > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > > > Graph into > > > Flink's Gelly and I want to analyze it for the different paths one can > > > take to link > > > the different nodes. Therefore I am using the ScatterGatherIteration. > > > However, my code just works with about ten to twenty nodes. When I try > to > > > upload > > > a hundred nodes, the following error occurs: > > > > > > Exception in thread "main" org.apache.flink.runtime. > > > client.JobExecutionException: Job execution failed. > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > > > mcV$sp(JobManager.scala:822) > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > > JobManager.scala:768) > > > at org.apache.flink.runtime.jobmanager.JobManager$$ > > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply( > > JobManager.scala:768) > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > > > liftedTree1$1(Future.scala:24) > > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > > > Future.scala:24) > > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > > > AbstractDispatcher.scala:401) > > > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > > runTask(ForkJoinPool.java:1339) > > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > > ForkJoinPool.java:1979) > > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > > ForkJoinWorkerThread.java:107) > > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction > failed. > > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > > > 33685504 Message: null > > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > > insertRecordIntoPartition(CompactingHashTable.java:457) > > > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > > > insertOrReplaceRecord(CompactingHashTable.java:392) > > > at org.apache.flink.runtime.iterative.io. > SolutionSetUpdateOutputCollect > > > or.collect(SolutionSetUpdateOutputCollector.java:54) > > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > > > GatherFunction.java:123) > > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > > > updateVertex(PathRank.java:357) > > > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > > > at org.apache.flink.runtime.operators.BatchTask.run( > BatchTask.java:486) > > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > > > AbstractIterativeTask.java:146) > > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > > > IterationTailTask.java:107) > > > at org.apache.flink.runtime.operators.BatchTask.invoke( > > BatchTask.java:351) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > I tried to google it a bit, and this problems seems to occur often when > > > using Gelly. I hope you have any ideas or approaches how I can handle > > this > > > error. > > > > > > Thank you in advance! > > > All the best, > > > Dennis > > > > > > |
Free forum by Nabble | Edit this page |