Flink Gelly

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Gelly

dfki

Dear ladies and gentlemen, 
 
I got a problem using Gelly in Flink. Currently I am loading a Virtuoso Graph into 
Flink's Gelly and I want to  analyze it for the different paths one can take to link
the different nodes. Therefore I am using the ScatterGatherIteration.
However, my code just works with about ten to twenty nodes. When I try to upload
a hundred nodes, the following error occurs:
 
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: 33685504 Message: null
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(GatherFunction.java:123)
at org.apache.flink.quickstart.PathRank$PathUpdateFunction.updateVertex(PathRank.java:357)
at org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
 
 
I tried to google it a bit, and this problems seems to occur often when using Gelly. I hope you have any ideas or approaches how I can handle this error.
 
Thank you in advance!
All the best,
Dennis 
Reply | Threaded
Open this post in threaded view
|

Re: Flink Gelly

Vasiliki Kalavri
Hi Dennis,

can you give us some details about your setup? e.g. where you are running
your job, your input size, the configured memory, etc. It would also be
helpful if you could share your code. Getting an out of memory error with
just 100 nodes seems weird.

Best,
-Vasia.

On 6 October 2016 at 13:29, <[hidden email]> wrote:

>
> Dear ladies and gentlemen,
>
> I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> Graph into
> Flink's Gelly and I want to  analyze it for the different paths one can
> take to link
> the different nodes. Therefore I am using the ScatterGatherIteration.
> However, my code just works with about ten to twenty nodes. When I try to
> upload
> a hundred nodes, the following error occurs:
>
> Exception in thread "main" org.apache.flink.runtime.
> client.JobExecutionException: Job execution failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> mcV$sp(JobManager.scala:822)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed.
> numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> 33685504 Message: null
> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> insertRecordIntoPartition(CompactingHashTable.java:457)
> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> insertOrReplaceRecord(CompactingHashTable.java:392)
> at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect
> or.collect(SolutionSetUpdateOutputCollector.java:54)
> at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> GatherFunction.java:123)
> at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> updateVertex(PathRank.java:357)
> at org.apache.flink.graph.spargel.ScatterGatherIteration$
> GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I tried to google it a bit, and this problems seems to occur often when
> using Gelly. I hope you have any ideas or approaches how I can handle this
> error.
>
> Thank you in advance!
> All the best,
> Dennis
>
Reply | Threaded
Open this post in threaded view
|

Aw: Re: Flink Gelly

dfki
Hi Vasia, 
 
thanks for your reply. 
 
Currently I am testing it on my normal workstation (16GB Ram) but I also tried it on out cluster. 
Both are failing at the same amount of nodes, so I guess it has something to do with Gelly
or with the properties.
 
The configured memory is default. I did not change it because I thought that flink is not the problem
but I might be wrong. 
 
The Input should not be much... I wrote an API for Virtuoso which is requesting a RDF-graph. But
I limited it to 10 Data Sets only.
 
This is my code, it is a bit messy and their might be improvement:
 

public static final class PathMessageFunction
            extends
            ScatterFunction<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>, List<String>, String> {
        @Override
        public void sendMessages(
                Vertex<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>> vertex)
                throws Exception {
            
            // The list "path" collects the ID's of the verticies a message was send to.   
            
            List<String> path = new ArrayList<String>();
            if (super.getSuperstepNumber() == 1) {
                path.add(vertex.getId());
            }
            if (super.getSuperstepNumber() > 1) {
                for (String values : vertex.f1.f1.get(super.getSuperstepNumber() - 1)) {
                            path.add(values + ";" + vertex.getId());
                }
            }
    
            // The Path-List is send to the next neighbouring Nodes. 
            
            for (Edge<String, String> edge : getEdges()) {
                sendMessageTo(edge.getTarget(), path);
            }
        }
    }
    
    
    
    public static final class PathUpdateFunction
            extends
            GatherFunction<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>, List<String>> {
        @Override
        public void updateVertex(
                Vertex<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>> vertex,
                MessageIterator<List<String>> messenger)
                throws Exception {
        
            List<String> newValues = new ArrayList<String>();
                        
            // The Path-List which was send as a message is also stored within the vertex value, therefore the Paths are saved to a new List "newValues".
            // This List should not contain the ID of the vertex itself to avoid cycles.
            
            for (List<String> msg : messenger) {
                for (String value : msg) {
                    if (!value.contains(vertex.getId())) {
                        newValues.add(value);
                    }
                }
            }
            
            // Creation of a new HashMap with the new and old values for the setNewVertexValue function 
                        
            HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
            newHashMap.put(super.getSuperstepNumber(), newValues);
            
            
            HashMap<String, List<String>> multiPaths = new HashMap<String, List<String>>();
            
            
            // Here it gets a bit complicated... However... I try to analyze the given paths for possible combinations of them.
            // For example... I got the path "a;b;c" and the patch "c;d;e", so I predict that "a;b;c;d;e" should also be possible.
            
            for (int i = 0; i < oriList.size(); i++) {
                
                String oriTemp = oriList.get(i);
                String destTemp = destList.get(i);
                
                String oriDest = oriTemp + destTemp;
                
                List<String> tempList = new ArrayList<String>();
                List<String> setsWithOrigin = new ArrayList<String>();
                List<String> setsWithDestination = new ArrayList<String>();
                for (Entry<Integer, List<String>> entry : newHashMap.entrySet()) {
                    for (String value : entry.getValue()) {
                        if (value.contains(oriTemp)) {
                            setsWithOrigin.add(value);
                        }
                        if (value.contains(destTemp)) {
                            setsWithDestination.add(value);
                        }
                    }
                }
                for (String originIter : setsWithOrigin) {
                    for (String destinationIter : setsWithDestination) {
                        String concat = "";
                        if ((originIter.indexOf(oriTemp) == 0 && destinationIter
                                .indexOf(destTemp) == 0)) {
                            String reverse = destinationIter;
                            if (destinationIter.length() > 1) {
                                reverse = "";
                                int d = destinationIter.length();
                                for (int a = 0; a < destinationIter.length(); a++) {
                                    reverse = reverse
                                            + destinationIter.substring(d - 1,
                                                    d);
                                    d--;
                                }
                            }
                            concat = originIter + ";" + vertex.getId() + ";"
                                    + reverse;
                        }
                        if (isFormatValid(concat) && concat.length() > 0) {
                            if (!tempList.contains(concat)) {
                                tempList.add(concat);
                            }
                        }
                    }
                }
                multiPaths.put(oriDest, tempList);
            }
                        
            
            // The combined paths are also saved into a HashMap which is additionally set as a Vertex Value
            // Later the paths are filtered for redundance
            
            Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>(
                    multiPaths, newHashMap);
            
            
            setNewVertexValue(testTuple3);
        }
    }
 
 
Let me know if you need any further information.
Thanks in advance.
 
All the best, 
Dennis  
 

Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
Von: "Vasiliki Kalavri" <[hidden email]>
An: [hidden email]
Betreff: Re: Flink Gelly
Hi Dennis,

can you give us some details about your setup? e.g. where you are running
your job, your input size, the configured memory, etc. It would also be
helpful if you could share your code. Getting an out of memory error with
just 100 nodes seems weird.

Best,
-Vasia.

On 6 October 2016 at 13:29, <[hidden email]> wrote:

>
> Dear ladies and gentlemen,
>
> I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> Graph into
> Flink's Gelly and I want to analyze it for the different paths one can
> take to link
> the different nodes. Therefore I am using the ScatterGatherIteration.
> However, my code just works with about ten to twenty nodes. When I try to
> upload
> a hundred nodes, the following error occurs:
>
> Exception in thread "main" org.apache.flink.runtime.
> client.JobExecutionException: Job execution failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> mcV$sp(JobManager.scala:822)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed.
> numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> 33685504 Message: null
> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> insertRecordIntoPartition(CompactingHashTable.java:457)
> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> insertOrReplaceRecord(CompactingHashTable.java:392)
> at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect
> or.collect(SolutionSetUpdateOutputCollector.java:54)
> at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> GatherFunction.java:123)
> at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> updateVertex(PathRank.java:357)
> at org.apache.flink.graph.spargel.ScatterGatherIteration$
> GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I tried to google it a bit, and this problems seems to occur often when
> using Gelly. I hope you have any ideas or approaches how I can handle this
> error.
>
> Thank you in advance!
> All the best,
> Dennis
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink Gelly

Vasiliki Kalavri
Hi Dennis,

On 7 October 2016 at 11:29, <[hidden email]> wrote:

> Hi Vasia,
>
> thanks for your reply.
>
> Currently I am testing it on my normal workstation (16GB Ram) but I also
> tried it on out cluster.
> Both are failing at the same amount of nodes, so I guess it has something
> to do with Gelly
> or with the properties.
>
> The configured memory is default. I did not change it because I thought
> that flink is not the problem
> but I might be wrong.
>

​The default configured​ memory is only 512MB. If you have 16GB but you
don't let Flink know about it, it won't use it.
I see that your vertex values are Tuple2's of HashMap's of Lists and I
suspect these grow big.
If you're running the program from your IDE make sure to add a VM option.
If you're running the program from the command line, make sure to edit the
flink-conf.yaml file. You can find the available configuration options and
what they mean in the docs [1].

​I hope this helps,
-Vasia.​

​[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html




>
> The Input should not be much... I wrote an API for Virtuoso which is
> requesting a RDF-graph. But
> I limited it to 10 Data Sets only.
>
> This is my code, it is a bit messy and their might be improvement:
>
>
> public static final class PathMessageFunction
>             extends
>             ScatterFunction<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>, List<String>, String> {
>         @Override
>         public void sendMessages(
>                 Vertex<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>> vertex)
>                 throws Exception {
>
>             // The list "path" collects the ID's of the verticies a
> message was send to.
>
>             List<String> path = new ArrayList<String>();
>             if (super.getSuperstepNumber() == 1) {
>                 path.add(vertex.getId());
>             }
>             if (super.getSuperstepNumber() > 1) {
>                 for (String values : vertex.f1.f1.get(super.getSuperstepNumber()
> - 1)) {
>                             path.add(values + ";" + vertex.getId());
>                 }
>             }
>
>             // The Path-List is send to the next neighbouring Nodes.
>
>             for (Edge<String, String> edge : getEdges()) {
>                 sendMessageTo(edge.getTarget(), path);
>             }
>         }
>     }
>
>
>
>     public static final class PathUpdateFunction
>             extends
>             GatherFunction<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>, List<String>> {
>         @Override
>         public void updateVertex(
>                 Vertex<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>> vertex,
>                 MessageIterator<List<String>> messenger)
>                 throws Exception {
>
>             List<String> newValues = new ArrayList<String>();
>
>             // The Path-List which was send as a message is also stored
> within the vertex value, therefore the Paths are saved to a new List
> "newValues".
>             // This List should not contain the ID of the vertex itself to
> avoid cycles.
>
>             for (List<String> msg : messenger) {
>                 for (String value : msg) {
>                     if (!value.contains(vertex.getId())) {
>                         newValues.add(value);
>                     }
>                 }
>             }
>
>             // Creation of a new HashMap with the new and old values for
> the setNewVertexValue function
>
>             HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
>             newHashMap.put(super.getSuperstepNumber(), newValues);
>
>
>             HashMap<String, List<String>> multiPaths = new HashMap<String,
> List<String>>();
>
>
>             // Here it gets a bit complicated... However... I try to
> analyze the given paths for possible combinations of them.
>             // For example... I got the path "a;b;c" and the patch
> "c;d;e", so I predict that "a;b;c;d;e" should also be possible.
>
>             for (int i = 0; i < oriList.size(); i++) {
>
>                 String oriTemp = oriList.get(i);
>                 String destTemp = destList.get(i);
>
>                 String oriDest = oriTemp + destTemp;
>
>                 List<String> tempList = new ArrayList<String>();
>                 List<String> setsWithOrigin = new ArrayList<String>();
>                 List<String> setsWithDestination = new ArrayList<String>();
>                 for (Entry<Integer, List<String>> entry :
> newHashMap.entrySet()) {
>                     for (String value : entry.getValue()) {
>                         if (value.contains(oriTemp)) {
>                             setsWithOrigin.add(value);
>                         }
>                         if (value.contains(destTemp)) {
>                             setsWithDestination.add(value);
>                         }
>                     }
>                 }
>                 for (String originIter : setsWithOrigin) {
>                     for (String destinationIter : setsWithDestination) {
>                         String concat = "";
>                         if ((originIter.indexOf(oriTemp) == 0 &&
> destinationIter
>                                 .indexOf(destTemp) == 0)) {
>                             String reverse = destinationIter;
>                             if (destinationIter.length() > 1) {
>                                 reverse = "";
>                                 int d = destinationIter.length();
>                                 for (int a = 0; a <
> destinationIter.length(); a++) {
>                                     reverse = reverse
>                                             + destinationIter.substring(d
> - 1,
>                                                     d);
>                                     d--;
>                                 }
>                             }
>                             concat = originIter + ";" + vertex.getId() +
> ";"
>                                     + reverse;
>                         }
>                         if (isFormatValid(concat) && concat.length() > 0) {
>                             if (!tempList.contains(concat)) {
>                                 tempList.add(concat);
>                             }
>                         }
>                     }
>                 }
>                 multiPaths.put(oriDest, tempList);
>             }
>
>
>             // The combined paths are also saved into a HashMap which is
> additionally set as a Vertex Value
>             // Later the paths are filtered for redundance
>
>             Tuple2<HashMap<String, List<String>>, HashMap<Integer,
> List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>(
>                     multiPaths, newHashMap);
>
>
>             setNewVertexValue(testTuple3);
>         }
>     }
>
>
> Let me know if you need any further information.
> Thanks in advance.
>
> All the best,
> Dennis
>
>
> Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
> Von: "Vasiliki Kalavri" <[hidden email]>
> An: [hidden email]
> Betreff: Re: Flink Gelly
> Hi Dennis,
>
> can you give us some details about your setup? e.g. where you are running
> your job, your input size, the configured memory, etc. It would also be
> helpful if you could share your code. Getting an out of memory error with
> just 100 nodes seems weird.
>
> Best,
> -Vasia.
>
> On 6 October 2016 at 13:29, <[hidden email]> wrote:
>
> >
> > Dear ladies and gentlemen,
> >
> > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> > Graph into
> > Flink's Gelly and I want to analyze it for the different paths one can
> > take to link
> > the different nodes. Therefore I am using the ScatterGatherIteration.
> > However, my code just works with about ten to twenty nodes. When I try to
> > upload
> > a hundred nodes, the following error occurs:
> >
> > Exception in thread "main" org.apache.flink.runtime.
> > client.JobExecutionException: Job execution failed.
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> > mcV$sp(JobManager.scala:822)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:768)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:768)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > liftedTree1$1(Future.scala:24)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > Future.scala:24)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > AbstractDispatcher.scala:401)
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > runTask(ForkJoinPool.java:1339)
> > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > ForkJoinPool.java:1979)
> > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed.
> > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> > 33685504 Message: null
> > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > insertRecordIntoPartition(CompactingHashTable.java:457)
> > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > insertOrReplaceRecord(CompactingHashTable.java:392)
> > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect
> > or.collect(SolutionSetUpdateOutputCollector.java:54)
> > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> > GatherFunction.java:123)
> > at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> > updateVertex(PathRank.java:357)
> > at org.apache.flink.graph.spargel.ScatterGatherIteration$
> > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> > iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> > AbstractIterativeTask.java:146)
> > at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> > IterationTailTask.java:107)
> > at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > I tried to google it a bit, and this problems seems to occur often when
> > using Gelly. I hope you have any ideas or approaches how I can handle
> this
> > error.
> >
> > Thank you in advance!
> > All the best,
> > Dennis
> >
>
Reply | Threaded
Open this post in threaded view
|

Aw: Re: Re: Flink Gelly

dfki


Hi again,
 
I tried to change some configs and set the available amount memory to 4096mb 
but there is no difference at all. Furthermore I monitored the usage of of RAM by
the JVM and it does not go beyond 3gb at all. 

I still don't think that the memory is the propblem. For me it looks like an internal
problem with Gelly. 

Best,
Dennis 
 

Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr
Von: "Vasiliki Kalavri" <[hidden email]>
An: [hidden email]
Betreff: Re: Re: Flink Gelly
Hi Dennis,

On 7 October 2016 at 11:29, <[hidden email]> wrote:

> Hi Vasia,
>
> thanks for your reply.
>
> Currently I am testing it on my normal workstation (16GB Ram) but I also
> tried it on out cluster.
> Both are failing at the same amount of nodes, so I guess it has something
> to do with Gelly
> or with the properties.
>
> The configured memory is default. I did not change it because I thought
> that flink is not the problem
> but I might be wrong.
>

​The default configured​ memory is only 512MB. If you have 16GB but you
don't let Flink know about it, it won't use it.
I see that your vertex values are Tuple2's of HashMap's of Lists and I
suspect these grow big.
If you're running the program from your IDE make sure to add a VM option.
If you're running the program from the command line, make sure to edit the
flink-conf.yaml file. You can find the available configuration options and
what they mean in the docs [1].

​I hope this helps,
-Vasia.​

​[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html




>
> The Input should not be much... I wrote an API for Virtuoso which is
> requesting a RDF-graph. But
> I limited it to 10 Data Sets only.
>
> This is my code, it is a bit messy and their might be improvement:
>
>
> public static final class PathMessageFunction
> extends
> ScatterFunction<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>, List<String>, String> {
> @Override
> public void sendMessages(
> Vertex<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>> vertex)
> throws Exception {
>
> // The list "path" collects the ID's of the verticies a
> message was send to.
>
> List<String> path = new ArrayList<String>();
> if (super.getSuperstepNumber() == 1) {
> path.add(vertex.getId());
> }
> if (super.getSuperstepNumber() > 1) {
> for (String values : vertex.f1.f1.get(super.getSuperstepNumber()
> - 1)) {
> path.add(values + ";" + vertex.getId());
> }
> }
>
> // The Path-List is send to the next neighbouring Nodes.
>
> for (Edge<String, String> edge : getEdges()) {
> sendMessageTo(edge.getTarget(), path);
> }
> }
> }
>
>
>
> public static final class PathUpdateFunction
> extends
> GatherFunction<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>, List<String>> {
> @Override
> public void updateVertex(
> Vertex<String, Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>> vertex,
> MessageIterator<List<String>> messenger)
> throws Exception {
>
> List<String> newValues = new ArrayList<String>();
>
> // The Path-List which was send as a message is also stored
> within the vertex value, therefore the Paths are saved to a new List
> "newValues".
> // This List should not contain the ID of the vertex itself to
> avoid cycles.
>
> for (List<String> msg : messenger) {
> for (String value : msg) {
> if (!value.contains(vertex.getId())) {
> newValues.add(value);
> }
> }
> }
>
> // Creation of a new HashMap with the new and old values for
> the setNewVertexValue function
>
> HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
> newHashMap.put(super.getSuperstepNumber(), newValues);
>
>
> HashMap<String, List<String>> multiPaths = new HashMap<String,
> List<String>>();
>
>
> // Here it gets a bit complicated... However... I try to
> analyze the given paths for possible combinations of them.
> // For example... I got the path "a;b;c" and the patch
> "c;d;e", so I predict that "a;b;c;d;e" should also be possible.
>
> for (int i = 0; i < oriList.size(); i++) {
>
> String oriTemp = oriList.get(i);
> String destTemp = destList.get(i);
>
> String oriDest = oriTemp + destTemp;
>
> List<String> tempList = new ArrayList<String>();
> List<String> setsWithOrigin = new ArrayList<String>();
> List<String> setsWithDestination = new ArrayList<String>();
> for (Entry<Integer, List<String>> entry :
> newHashMap.entrySet()) {
> for (String value : entry.getValue()) {
> if (value.contains(oriTemp)) {
> setsWithOrigin.add(value);
> }
> if (value.contains(destTemp)) {
> setsWithDestination.add(value);
> }
> }
> }
> for (String originIter : setsWithOrigin) {
> for (String destinationIter : setsWithDestination) {
> String concat = "";
> if ((originIter.indexOf(oriTemp) == 0 &&
> destinationIter
> .indexOf(destTemp) == 0)) {
> String reverse = destinationIter;
> if (destinationIter.length() > 1) {
> reverse = "";
> int d = destinationIter.length();
> for (int a = 0; a <
> destinationIter.length(); a++) {
> reverse = reverse
> + destinationIter.substring(d
> - 1,
> d);
> d--;
> }
> }
> concat = originIter + ";" + vertex.getId() +
> ";"
> + reverse;
> }
> if (isFormatValid(concat) && concat.length() > 0) {
> if (!tempList.contains(concat)) {
> tempList.add(concat);
> }
> }
> }
> }
> multiPaths.put(oriDest, tempList);
> }
>
>
> // The combined paths are also saved into a HashMap which is
> additionally set as a Vertex Value
> // Later the paths are filtered for redundance
>
> Tuple2<HashMap<String, List<String>>, HashMap<Integer,
> List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>,
> HashMap<Integer, List<String>>>(
> multiPaths, newHashMap);
>
>
> setNewVertexValue(testTuple3);
> }
> }
>
>
> Let me know if you need any further information.
> Thanks in advance.
>
> All the best,
> Dennis
>
>
> Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
> Von: "Vasiliki Kalavri" <[hidden email]>
> An: [hidden email]
> Betreff: Re: Flink Gelly
> Hi Dennis,
>
> can you give us some details about your setup? e.g. where you are running
> your job, your input size, the configured memory, etc. It would also be
> helpful if you could share your code. Getting an out of memory error with
> just 100 nodes seems weird.
>
> Best,
> -Vasia.
>
> On 6 October 2016 at 13:29, <[hidden email]> wrote:
>
> >
> > Dear ladies and gentlemen,
> >
> > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> > Graph into
> > Flink's Gelly and I want to analyze it for the different paths one can
> > take to link
> > the different nodes. Therefore I am using the ScatterGatherIteration.
> > However, my code just works with about ten to twenty nodes. When I try to
> > upload
> > a hundred nodes, the following error occurs:
> >
> > Exception in thread "main" org.apache.flink.runtime.
> > client.JobExecutionException: Job execution failed.
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> > mcV$sp(JobManager.scala:822)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:768)
> > at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> JobManager.scala:768)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > liftedTree1$1(Future.scala:24)
> > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > Future.scala:24)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > AbstractDispatcher.scala:401)
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > runTask(ForkJoinPool.java:1339)
> > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > ForkJoinPool.java:1979)
> > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed.
> > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> > 33685504 Message: null
> > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > insertRecordIntoPartition(CompactingHashTable.java:457)
> > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > insertOrReplaceRecord(CompactingHashTable.java:392)
> > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect
> > or.collect(SolutionSetUpdateOutputCollector.java:54)
> > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> > GatherFunction.java:123)
> > at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> > updateVertex(PathRank.java:357)
> > at org.apache.flink.graph.spargel.ScatterGatherIteration$
> > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> > iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> > AbstractIterativeTask.java:146)
> > at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> > IterationTailTask.java:107)
> > at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:351)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > I tried to google it a bit, and this problems seems to occur often when
> > using Gelly. I hope you have any ideas or approaches how I can handle
> this
> > error.
> >
> > Thank you in advance!
> > All the best,
> > Dennis
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Flink Gelly

Fabian Hueske-2
Hi,

the exception
> java.lang.RuntimeException: Memory ran out. Compaction failed.

says that the hash table ran out of memory. Gelly is implemented on top of
Flink's DataSet API.
So this would rather be a problem with DataSet than Gelly.

I think Vasia is right about the memory configuration. Providing more
memory usually is the solution to this problem.
Another thing you can do is to try a hashtable in JVM memory instead of
managed memory.
See the options to set the solution set to unmanaged memory in the docs [1]

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/libs/gelly.html#iterative-graph-processing

2016-10-07 13:21 GMT+02:00 <[hidden email]>:

>
>
> Hi again,
>
> I tried to change some configs and set the available amount memory to
> 4096mb
> but there is no difference at all. Furthermore I monitored the usage of of
> RAM by
> the JVM and it does not go beyond 3gb at all.
>
> I still don't think that the memory is the propblem. For me it looks like
> an internal
> problem with Gelly.
>
> Best,
> Dennis
>
>
> Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr
> Von: "Vasiliki Kalavri" <[hidden email]>
> An: [hidden email]
> Betreff: Re: Re: Flink Gelly
> Hi Dennis,
>
> On 7 October 2016 at 11:29, <[hidden email]> wrote:
>
> > Hi Vasia,
> >
> > thanks for your reply.
> >
> > Currently I am testing it on my normal workstation (16GB Ram) but I also
> > tried it on out cluster.
> > Both are failing at the same amount of nodes, so I guess it has something
> > to do with Gelly
> > or with the properties.
> >
> > The configured memory is default. I did not change it because I thought
> > that flink is not the problem
> > but I might be wrong.
> >
>
> ​The default configured​ memory is only 512MB. If you have 16GB but you
> don't let Flink know about it, it won't use it.
> I see that your vertex values are Tuple2's of HashMap's of Lists and I
> suspect these grow big.
> If you're running the program from your IDE make sure to add a VM option.
> If you're running the program from the command line, make sure to edit the
> flink-conf.yaml file. You can find the available configuration options and
> what they mean in the docs [1].
>
> ​I hope this helps,
> -Vasia.​
>
> ​[1]:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/setup/config.html
> ​
>
>
>
> >
> > The Input should not be much... I wrote an API for Virtuoso which is
> > requesting a RDF-graph. But
> > I limited it to 10 Data Sets only.
> >
> > This is my code, it is a bit messy and their might be improvement:
> >
> >
> > public static final class PathMessageFunction
> > extends
> > ScatterFunction<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>, List<String>, String> {
> > @Override
> > public void sendMessages(
> > Vertex<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>> vertex)
> > throws Exception {
> >
> > // The list "path" collects the ID's of the verticies a
> > message was send to.
> >
> > List<String> path = new ArrayList<String>();
> > if (super.getSuperstepNumber() == 1) {
> > path.add(vertex.getId());
> > }
> > if (super.getSuperstepNumber() > 1) {
> > for (String values : vertex.f1.f1.get(super.getSuperstepNumber()
> > - 1)) {
> > path.add(values + ";" + vertex.getId());
> > }
> > }
> >
> > // The Path-List is send to the next neighbouring Nodes.
> >
> > for (Edge<String, String> edge : getEdges()) {
> > sendMessageTo(edge.getTarget(), path);
> > }
> > }
> > }
> >
> >
> >
> > public static final class PathUpdateFunction
> > extends
> > GatherFunction<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>, List<String>> {
> > @Override
> > public void updateVertex(
> > Vertex<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>> vertex,
> > MessageIterator<List<String>> messenger)
> > throws Exception {
> >
> > List<String> newValues = new ArrayList<String>();
> >
> > // The Path-List which was send as a message is also stored
> > within the vertex value, therefore the Paths are saved to a new List
> > "newValues".
> > // This List should not contain the ID of the vertex itself to
> > avoid cycles.
> >
> > for (List<String> msg : messenger) {
> > for (String value : msg) {
> > if (!value.contains(vertex.getId())) {
> > newValues.add(value);
> > }
> > }
> > }
> >
> > // Creation of a new HashMap with the new and old values for
> > the setNewVertexValue function
> >
> > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
> > newHashMap.put(super.getSuperstepNumber(), newValues);
> >
> >
> > HashMap<String, List<String>> multiPaths = new HashMap<String,
> > List<String>>();
> >
> >
> > // Here it gets a bit complicated... However... I try to
> > analyze the given paths for possible combinations of them.
> > // For example... I got the path "a;b;c" and the patch
> > "c;d;e", so I predict that "a;b;c;d;e" should also be possible.
> >
> > for (int i = 0; i < oriList.size(); i++) {
> >
> > String oriTemp = oriList.get(i);
> > String destTemp = destList.get(i);
> >
> > String oriDest = oriTemp + destTemp;
> >
> > List<String> tempList = new ArrayList<String>();
> > List<String> setsWithOrigin = new ArrayList<String>();
> > List<String> setsWithDestination = new ArrayList<String>();
> > for (Entry<Integer, List<String>> entry :
> > newHashMap.entrySet()) {
> > for (String value : entry.getValue()) {
> > if (value.contains(oriTemp)) {
> > setsWithOrigin.add(value);
> > }
> > if (value.contains(destTemp)) {
> > setsWithDestination.add(value);
> > }
> > }
> > }
> > for (String originIter : setsWithOrigin) {
> > for (String destinationIter : setsWithDestination) {
> > String concat = "";
> > if ((originIter.indexOf(oriTemp) == 0 &&
> > destinationIter
> > .indexOf(destTemp) == 0)) {
> > String reverse = destinationIter;
> > if (destinationIter.length() > 1) {
> > reverse = "";
> > int d = destinationIter.length();
> > for (int a = 0; a <
> > destinationIter.length(); a++) {
> > reverse = reverse
> > + destinationIter.substring(d
> > - 1,
> > d);
> > d--;
> > }
> > }
> > concat = originIter + ";" + vertex.getId() +
> > ";"
> > + reverse;
> > }
> > if (isFormatValid(concat) && concat.length() > 0) {
> > if (!tempList.contains(concat)) {
> > tempList.add(concat);
> > }
> > }
> > }
> > }
> > multiPaths.put(oriDest, tempList);
> > }
> >
> >
> > // The combined paths are also saved into a HashMap which is
> > additionally set as a Vertex Value
> > // Later the paths are filtered for redundance
> >
> > Tuple2<HashMap<String, List<String>>, HashMap<Integer,
> > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>(
> > multiPaths, newHashMap);
> >
> >
> > setNewVertexValue(testTuple3);
> > }
> > }
> >
> >
> > Let me know if you need any further information.
> > Thanks in advance.
> >
> > All the best,
> > Dennis
> >
> >
> > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
> > Von: "Vasiliki Kalavri" <[hidden email]>
> > An: [hidden email]
> > Betreff: Re: Flink Gelly
> > Hi Dennis,
> >
> > can you give us some details about your setup? e.g. where you are running
> > your job, your input size, the configured memory, etc. It would also be
> > helpful if you could share your code. Getting an out of memory error with
> > just 100 nodes seems weird.
> >
> > Best,
> > -Vasia.
> >
> > On 6 October 2016 at 13:29, <[hidden email]> wrote:
> >
> > >
> > > Dear ladies and gentlemen,
> > >
> > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> > > Graph into
> > > Flink's Gelly and I want to analyze it for the different paths one can
> > > take to link
> > > the different nodes. Therefore I am using the ScatterGatherIteration.
> > > However, my code just works with about ten to twenty nodes. When I try
> to
> > > upload
> > > a hundred nodes, the following error occurs:
> > >
> > > Exception in thread "main" org.apache.flink.runtime.
> > > client.JobExecutionException: Job execution failed.
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> > > mcV$sp(JobManager.scala:822)
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> > JobManager.scala:768)
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> > JobManager.scala:768)
> > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > liftedTree1$1(Future.scala:24)
> > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > > Future.scala:24)
> > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > > AbstractDispatcher.scala:401)
> > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
> > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > runTask(ForkJoinPool.java:1339)
> > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > ForkJoinPool.java:1979)
> > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > ForkJoinWorkerThread.java:107)
> > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction
> failed.
> > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> > > 33685504 Message: null
> > > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > > insertRecordIntoPartition(CompactingHashTable.java:457)
> > > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > > insertOrReplaceRecord(CompactingHashTable.java:392)
> > > at org.apache.flink.runtime.iterative.io.
> SolutionSetUpdateOutputCollect
> > > or.collect(SolutionSetUpdateOutputCollector.java:54)
> > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> > > GatherFunction.java:123)
> > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> > > updateVertex(PathRank.java:357)
> > > at org.apache.flink.graph.spargel.ScatterGatherIteration$
> > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> > > at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:486)
> > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> > > AbstractIterativeTask.java:146)
> > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> > > IterationTailTask.java:107)
> > > at org.apache.flink.runtime.operators.BatchTask.invoke(
> > BatchTask.java:351)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > I tried to google it a bit, and this problems seems to occur often when
> > > using Gelly. I hope you have any ideas or approaches how I can handle
> > this
> > > error.
> > >
> > > Thank you in advance!
> > > All the best,
> > > Dennis
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Aw: Re: Re: Re: Flink Gelly

dfki


Hi Fabian, 
 
thanks for your reply. 
I guess I will have to change something in my code in general to keep 
the Hash Tables rather small. 

Have a nice weekend! 
Best,
Dennis
 

Gesendet: Freitag, 07. Oktober 2016 um 13:31 Uhr
Von: "Fabian Hueske" <[hidden email]>
An: "[hidden email]" <[hidden email]>
Betreff: Re: Re: Re: Flink Gelly
Hi,

the exception
> java.lang.RuntimeException: Memory ran out. Compaction failed.

says that the hash table ran out of memory. Gelly is implemented on top of
Flink's DataSet API.
So this would rather be a problem with DataSet than Gelly.

I think Vasia is right about the memory configuration. Providing more
memory usually is the solution to this problem.
Another thing you can do is to try a hashtable in JVM memory instead of
managed memory.
See the options to set the solution set to unmanaged memory in the docs [1]

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/libs/gelly.html#iterative-graph-processing

2016-10-07 13:21 GMT+02:00 <[hidden email]>:

>
>
> Hi again,
>
> I tried to change some configs and set the available amount memory to
> 4096mb
> but there is no difference at all. Furthermore I monitored the usage of of
> RAM by
> the JVM and it does not go beyond 3gb at all.
>
> I still don't think that the memory is the propblem. For me it looks like
> an internal
> problem with Gelly.
>
> Best,
> Dennis
>
>
> Gesendet: Freitag, 07. Oktober 2016 um 12:29 Uhr
> Von: "Vasiliki Kalavri" <[hidden email]>
> An: [hidden email]
> Betreff: Re: Re: Flink Gelly
> Hi Dennis,
>
> On 7 October 2016 at 11:29, <[hidden email]> wrote:
>
> > Hi Vasia,
> >
> > thanks for your reply.
> >
> > Currently I am testing it on my normal workstation (16GB Ram) but I also
> > tried it on out cluster.
> > Both are failing at the same amount of nodes, so I guess it has something
> > to do with Gelly
> > or with the properties.
> >
> > The configured memory is default. I did not change it because I thought
> > that flink is not the problem
> > but I might be wrong.
> >
>
> ​The default configured​ memory is only 512MB. If you have 16GB but you
> don't let Flink know about it, it won't use it.
> I see that your vertex values are Tuple2's of HashMap's of Lists and I
> suspect these grow big.
> If you're running the program from your IDE make sure to add a VM option.
> If you're running the program from the command line, make sure to edit the
> flink-conf.yaml file. You can find the available configuration options and
> what they mean in the docs [1].
>
> ​I hope this helps,
> -Vasia.​
>
> ​[1]:
> https://ci.apache.org/projects/flink/flink-docs-[https://ci.apache.org/projects/flink/flink-docs-]
> release-1.1/setup/config.html
> ​
>
>
>
> >
> > The Input should not be much... I wrote an API for Virtuoso which is
> > requesting a RDF-graph. But
> > I limited it to 10 Data Sets only.
> >
> > This is my code, it is a bit messy and their might be improvement:
> >
> >
> > public static final class PathMessageFunction
> > extends
> > ScatterFunction<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>, List<String>, String> {
> > @Override
> > public void sendMessages(
> > Vertex<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>> vertex)
> > throws Exception {
> >
> > // The list "path" collects the ID's of the verticies a
> > message was send to.
> >
> > List<String> path = new ArrayList<String>();
> > if (super.getSuperstepNumber() == 1) {
> > path.add(vertex.getId());
> > }
> > if (super.getSuperstepNumber() > 1) {
> > for (String values : vertex.f1.f1.get(super.getSuperstepNumber()
> > - 1)) {
> > path.add(values + ";" + vertex.getId());
> > }
> > }
> >
> > // The Path-List is send to the next neighbouring Nodes.
> >
> > for (Edge<String, String> edge : getEdges()) {
> > sendMessageTo(edge.getTarget(), path);
> > }
> > }
> > }
> >
> >
> >
> > public static final class PathUpdateFunction
> > extends
> > GatherFunction<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>, List<String>> {
> > @Override
> > public void updateVertex(
> > Vertex<String, Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>> vertex,
> > MessageIterator<List<String>> messenger)
> > throws Exception {
> >
> > List<String> newValues = new ArrayList<String>();
> >
> > // The Path-List which was send as a message is also stored
> > within the vertex value, therefore the Paths are saved to a new List
> > "newValues".
> > // This List should not contain the ID of the vertex itself to
> > avoid cycles.
> >
> > for (List<String> msg : messenger) {
> > for (String value : msg) {
> > if (!value.contains(vertex.getId())) {
> > newValues.add(value);
> > }
> > }
> > }
> >
> > // Creation of a new HashMap with the new and old values for
> > the setNewVertexValue function
> >
> > HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
> > newHashMap.put(super.getSuperstepNumber(), newValues);
> >
> >
> > HashMap<String, List<String>> multiPaths = new HashMap<String,
> > List<String>>();
> >
> >
> > // Here it gets a bit complicated... However... I try to
> > analyze the given paths for possible combinations of them.
> > // For example... I got the path "a;b;c" and the patch
> > "c;d;e", so I predict that "a;b;c;d;e" should also be possible.
> >
> > for (int i = 0; i < oriList.size(); i++) {
> >
> > String oriTemp = oriList.get(i);
> > String destTemp = destList.get(i);
> >
> > String oriDest = oriTemp + destTemp;
> >
> > List<String> tempList = new ArrayList<String>();
> > List<String> setsWithOrigin = new ArrayList<String>();
> > List<String> setsWithDestination = new ArrayList<String>();
> > for (Entry<Integer, List<String>> entry :
> > newHashMap.entrySet()) {
> > for (String value : entry.getValue()) {
> > if (value.contains(oriTemp)) {
> > setsWithOrigin.add(value);
> > }
> > if (value.contains(destTemp)) {
> > setsWithDestination.add(value);
> > }
> > }
> > }
> > for (String originIter : setsWithOrigin) {
> > for (String destinationIter : setsWithDestination) {
> > String concat = "";
> > if ((originIter.indexOf(oriTemp) == 0 &&
> > destinationIter
> > .indexOf(destTemp) == 0)) {
> > String reverse = destinationIter;
> > if (destinationIter.length() > 1) {
> > reverse = "";
> > int d = destinationIter.length();
> > for (int a = 0; a <
> > destinationIter.length(); a++) {
> > reverse = reverse
> > + destinationIter.substring(d
> > - 1,
> > d);
> > d--;
> > }
> > }
> > concat = originIter + ";" + vertex.getId() +
> > ";"
> > + reverse;
> > }
> > if (isFormatValid(concat) && concat.length() > 0) {
> > if (!tempList.contains(concat)) {
> > tempList.add(concat);
> > }
> > }
> > }
> > }
> > multiPaths.put(oriDest, tempList);
> > }
> >
> >
> > // The combined paths are also saved into a HashMap which is
> > additionally set as a Vertex Value
> > // Later the paths are filtered for redundance
> >
> > Tuple2<HashMap<String, List<String>>, HashMap<Integer,
> > List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>,
> > HashMap<Integer, List<String>>>(
> > multiPaths, newHashMap);
> >
> >
> > setNewVertexValue(testTuple3);
> > }
> > }
> >
> >
> > Let me know if you need any further information.
> > Thanks in advance.
> >
> > All the best,
> > Dennis
> >
> >
> > Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
> > Von: "Vasiliki Kalavri" <[hidden email]>
> > An: [hidden email]
> > Betreff: Re: Flink Gelly
> > Hi Dennis,
> >
> > can you give us some details about your setup? e.g. where you are running
> > your job, your input size, the configured memory, etc. It would also be
> > helpful if you could share your code. Getting an out of memory error with
> > just 100 nodes seems weird.
> >
> > Best,
> > -Vasia.
> >
> > On 6 October 2016 at 13:29, <[hidden email]> wrote:
> >
> > >
> > > Dear ladies and gentlemen,
> > >
> > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> > > Graph into
> > > Flink's Gelly and I want to analyze it for the different paths one can
> > > take to link
> > > the different nodes. Therefore I am using the ScatterGatherIteration.
> > > However, my code just works with about ten to twenty nodes. When I try
> to
> > > upload
> > > a hundred nodes, the following error occurs:
> > >
> > > Exception in thread "main" org.apache.flink.runtime.
> > > client.JobExecutionException: Job execution failed.
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> > > mcV$sp(JobManager.scala:822)
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> > JobManager.scala:768)
> > > at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(
> > JobManager.scala:768)
> > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > liftedTree1$1(Future.scala:24)
> > > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > > Future.scala:24)
> > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > > AbstractDispatcher.scala:401)
> > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
> > > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > runTask(ForkJoinPool.java:1339)
> > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > ForkJoinPool.java:1979)
> > > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > ForkJoinWorkerThread.java:107)
> > > Caused by: java.lang.RuntimeException: Memory ran out. Compaction
> failed.
> > > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> > > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> > > 33685504 Message: null
> > > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > > insertRecordIntoPartition(CompactingHashTable.java:457)
> > > at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> > > insertOrReplaceRecord(CompactingHashTable.java:392)
> > > at org.apache.flink.runtime.iterative.io.
> SolutionSetUpdateOutputCollect
> > > or.collect(SolutionSetUpdateOutputCollector.java:54)
> > > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> > > GatherFunction.java:123)
> > > at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> > > updateVertex(PathRank.java:357)
> > > at org.apache.flink.graph.spargel.ScatterGatherIteration$
> > > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> > > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> > > iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> > > at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:486)
> > > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> > > AbstractIterativeTask.java:146)
> > > at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> > > IterationTailTask.java:107)
> > > at org.apache.flink.runtime.operators.BatchTask.invoke(
> > BatchTask.java:351)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > I tried to google it a bit, and this problems seems to occur often when
> > > using Gelly. I hope you have any ideas or approaches how I can handle
> > this
> > > error.
> > >
> > > Thank you in advance!
> > > All the best,
> > > Dennis
> > >
> >
>