Hi everyone,
I ran a job this morning on 30 wally nodes. DOP 224. Worked like a charm. Then, I ran a similar job, on the exact same configuration, on the same input data set. The only difference between them is that the second job computes the degrees per vertex and, for vertices with degree higher than a user-defined threshold, it does a bit of magic(roughly a bunch of coGroups). The problem is that, even before the extra functions get called, I get the following type of exception: 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at fromDataSet(Graph.java:171)) -> Combine(Distinct at fromDataSet(Graph.java:171))(222/224) switched to FAILED java.lang.IllegalStateException: Update task on instance 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: akka.tcp:// flink@130.149.249.14:44528/user/taskmanager failed due to: at org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) at akka.dispatch.OnFailure.internal(Future.scala:228) at akka.dispatch.OnFailure.internal(Future.scala:227) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@130.149.249.14:44528/user/taskmanager#82700874]] after [100000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:722) At first I thought, okay maybe wally004 is down; then I ssh'd into it. Works fine. The full output can be found here: https://gist.github.com/andralungu/d222b75cb33aea57955d Does anyone have any idea about what may have triggered this? :( Thanks! Andra |
Hi Andra,
the problem seems to be that the deployment of some tasks takes longer than 100s. From the stack trace it looks as if you're not using the latest master. We had problems with previous version where the deployment call waited for the TM to completely download the user code jars. For large setups the BlobServer became a bottleneck and some of the deployment calls timed out. We updated the deployment logic so that the TM sends an immediate ACK backt to the JM when it receives a new task. Could you verify which version of Flink you're running and in case that it's not the latest master, could you please try to run your example with the latest code? Cheers, Till On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu <[hidden email]> wrote: > Hi everyone, > > I ran a job this morning on 30 wally nodes. DOP 224. Worked like a charm. > > Then, I ran a similar job, on the exact same configuration, on the same > input data set. The only difference between them is that the second job > computes the degrees per vertex and, for vertices with degree higher than a > user-defined threshold, it does a bit of magic(roughly a bunch of > coGroups). The problem is that, even before the extra functions get called, > I get the following type of exception: > > 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at > fromDataSet(Graph.java:171)) -> Combine(Distinct at > fromDataSet(Graph.java:171))(222/224) switched to FAILED > java.lang.IllegalStateException: Update task on instance > 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: akka.tcp:// > flink@130.149.249.14:44528/user/taskmanager failed due to: > at > > org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) > at akka.dispatch.OnFailure.internal(Future.scala:228) > at akka.dispatch.OnFailure.internal(Future.scala:227) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) > at > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) > at > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka.tcp://flink@130.149.249.14:44528/user/taskmanager#82700874]] > after [100000 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > at > > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > at > > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > at java.lang.Thread.run(Thread.java:722) > > > At first I thought, okay maybe wally004 is down; then I ssh'd into it. > Works fine. > > The full output can be found here: > https://gist.github.com/andralungu/d222b75cb33aea57955d > > Does anyone have any idea about what may have triggered this? :( > > Thanks! > Andra > |
Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this
version? I'll just fetch the latest master if this is the case. On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann <[hidden email]> wrote: > Hi Andra, > > the problem seems to be that the deployment of some tasks takes longer than > 100s. From the stack trace it looks as if you're not using the latest > master. > > We had problems with previous version where the deployment call waited for > the TM to completely download the user code jars. For large setups the > BlobServer became a bottleneck and some of the deployment calls timed out. > We updated the deployment logic so that the TM sends an immediate ACK backt > to the JM when it receives a new task. > > Could you verify which version of Flink you're running and in case that > it's not the latest master, could you please try to run your example with > the latest code? > > Cheers, > Till > > On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu <[hidden email]> wrote: > > > Hi everyone, > > > > I ran a job this morning on 30 wally nodes. DOP 224. Worked like a charm. > > > > Then, I ran a similar job, on the exact same configuration, on the same > > input data set. The only difference between them is that the second job > > computes the degrees per vertex and, for vertices with degree higher > than a > > user-defined threshold, it does a bit of magic(roughly a bunch of > > coGroups). The problem is that, even before the extra functions get > called, > > I get the following type of exception: > > > > 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at > > fromDataSet(Graph.java:171)) -> Combine(Distinct at > > fromDataSet(Graph.java:171))(222/224) switched to FAILED > > java.lang.IllegalStateException: Update task on instance > > 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: akka.tcp:// > > flink@130.149.249.14:44528/user/taskmanager failed due to: > > at > > > > > org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) > > at akka.dispatch.OnFailure.internal(Future.scala:228) > > at akka.dispatch.OnFailure.internal(Future.scala:227) > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > > at > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > at > > > > > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) > > at > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) > > at > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > at > > > > > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > > at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > > [Actor[akka.tcp://flink@130.149.249.14:44528/user/taskmanager#82700874]] > > after [100000 ms] > > at > > > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > > at > > > > > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > > at > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > > at > > > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > > at > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > > at > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > > at > > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > > at java.lang.Thread.run(Thread.java:722) > > > > > > At first I thought, okay maybe wally004 is down; then I ssh'd into it. > > Works fine. > > > > The full output can be found here: > > https://gist.github.com/andralungu/d222b75cb33aea57955d > > > > Does anyone have any idea about what may have triggered this? :( > > > > Thanks! > > Andra > > > |
Yes, it was an issue for the milestone release.
On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu <[hidden email]> wrote: > Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this > version? > I'll just fetch the latest master if this is the case. > > On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann <[hidden email]> > wrote: > > > Hi Andra, > > > > the problem seems to be that the deployment of some tasks takes longer > than > > 100s. From the stack trace it looks as if you're not using the latest > > master. > > > > We had problems with previous version where the deployment call waited > for > > the TM to completely download the user code jars. For large setups the > > BlobServer became a bottleneck and some of the deployment calls timed > out. > > We updated the deployment logic so that the TM sends an immediate ACK > backt > > to the JM when it receives a new task. > > > > Could you verify which version of Flink you're running and in case that > > it's not the latest master, could you please try to run your example with > > the latest code? > > > > Cheers, > > Till > > > > On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu <[hidden email]> > wrote: > > > > > Hi everyone, > > > > > > I ran a job this morning on 30 wally nodes. DOP 224. Worked like a > charm. > > > > > > Then, I ran a similar job, on the exact same configuration, on the same > > > input data set. The only difference between them is that the second job > > > computes the degrees per vertex and, for vertices with degree higher > > than a > > > user-defined threshold, it does a bit of magic(roughly a bunch of > > > coGroups). The problem is that, even before the extra functions get > > called, > > > I get the following type of exception: > > > > > > 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at > > > fromDataSet(Graph.java:171)) -> Combine(Distinct at > > > fromDataSet(Graph.java:171))(222/224) switched to FAILED > > > java.lang.IllegalStateException: Update task on instance > > > 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: > akka.tcp:// > > > flink@130.149.249.14:44528/user/taskmanager failed due to: > > > at > > > > > > > > > org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) > > > at akka.dispatch.OnFailure.internal(Future.scala:228) > > > at akka.dispatch.OnFailure.internal(Future.scala:227) > > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > > > at > > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > at > > > > > > > > > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) > > > at > > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) > > > at > > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > > at > > > > > > > > > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > > > at > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > at > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > at > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > at > > > > > > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > > > [Actor[akka.tcp://flink@130.149.249.14:44528/user/taskmanager#82700874 > ]] > > > after [100000 ms] > > > at > > > > > > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > > > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > > > at > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > > > at > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > > > at > > > > > > > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > > > at > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > > > at > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > > > at > > > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > > > at java.lang.Thread.run(Thread.java:722) > > > > > > > > > At first I thought, okay maybe wally004 is down; then I ssh'd into it. > > > Works fine. > > > > > > The full output can be found here: > > > https://gist.github.com/andralungu/d222b75cb33aea57955d > > > > > > Does anyone have any idea about what may have triggered this? :( > > > > > > Thanks! > > > Andra > > > > > > |
Another problem that I encountered during the same set of experiments(sorry
if I am asking too many questions, I am eager to get things fixed): - for the same configuration, a piece of code runs perfectly on 10GB of input, then for 38GB it runs forever (no deadlock). I believe that may occur because Flink spills information to disk every time it runs out of memory... Is this fixable by increasing the number of buffers? That's the last question for today, promise :) Thanks! On Fri, Jun 19, 2015 at 2:40 PM, Till Rohrmann <[hidden email]> wrote: > Yes, it was an issue for the milestone release. > > On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu <[hidden email]> wrote: > > > Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this > > version? > > I'll just fetch the latest master if this is the case. > > > > On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann <[hidden email]> > > wrote: > > > > > Hi Andra, > > > > > > the problem seems to be that the deployment of some tasks takes longer > > than > > > 100s. From the stack trace it looks as if you're not using the latest > > > master. > > > > > > We had problems with previous version where the deployment call waited > > for > > > the TM to completely download the user code jars. For large setups the > > > BlobServer became a bottleneck and some of the deployment calls timed > > out. > > > We updated the deployment logic so that the TM sends an immediate ACK > > backt > > > to the JM when it receives a new task. > > > > > > Could you verify which version of Flink you're running and in case that > > > it's not the latest master, could you please try to run your example > with > > > the latest code? > > > > > > Cheers, > > > Till > > > > > > On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu <[hidden email]> > > wrote: > > > > > > > Hi everyone, > > > > > > > > I ran a job this morning on 30 wally nodes. DOP 224. Worked like a > > charm. > > > > > > > > Then, I ran a similar job, on the exact same configuration, on the > same > > > > input data set. The only difference between them is that the second > job > > > > computes the degrees per vertex and, for vertices with degree higher > > > than a > > > > user-defined threshold, it does a bit of magic(roughly a bunch of > > > > coGroups). The problem is that, even before the extra functions get > > > called, > > > > I get the following type of exception: > > > > > > > > 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at > > > > fromDataSet(Graph.java:171)) -> Combine(Distinct at > > > > fromDataSet(Graph.java:171))(222/224) switched to FAILED > > > > java.lang.IllegalStateException: Update task on instance > > > > 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: > > akka.tcp:// > > > > flink@130.149.249.14:44528/user/taskmanager failed due to: > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) > > > > at akka.dispatch.OnFailure.internal(Future.scala:228) > > > > at akka.dispatch.OnFailure.internal(Future.scala:227) > > > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > > > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > > > > at > > > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > > at > > > > > > > > > > > > > > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) > > > > at > > > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) > > > > at > > > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) > > > > at > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > > > at > > > > > > > > > > > > > > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > > > > at > > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > > at > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > > at > > > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > > at > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > > > > [Actor[akka.tcp:// > flink@130.149.249.14:44528/user/taskmanager#82700874 > > ]] > > > > after [100000 ms] > > > > at > > > > > > > > > > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > > > > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > > > > at > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > > > > at > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > > > > at > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > > > > at > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > > > > at > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > > > > at > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > > > > at java.lang.Thread.run(Thread.java:722) > > > > > > > > > > > > At first I thought, okay maybe wally004 is down; then I ssh'd into > it. > > > > Works fine. > > > > > > > > The full output can be found here: > > > > https://gist.github.com/andralungu/d222b75cb33aea57955d > > > > > > > > Does anyone have any idea about what may have triggered this? :( > > > > > > > > Thanks! > > > > Andra > > > > > > > > > > |
What does forever mean? Usually it's the case that you see a steep decline
in performance once the system starts spilling data to disk because of the disk I/O bottleneck. The system always starts spilling to disk when it has no more memory left for its operations. So for example if you want to sort data which cannot be kept completely in memory, then the system has to employ external sorting. If you can give Flink more memory, then you can avoid this problem depending on the actual data size. You can observe disk I/O by using the `iotop` command for example. If you see Flink having a high I/O usage, then this might be an indicator for spilling. Cheers, Till On Fri, Jun 19, 2015 at 2:54 PM Andra Lungu <[hidden email]> wrote: > Another problem that I encountered during the same set of experiments(sorry > if I am asking too many questions, I am eager to get things fixed): > - for the same configuration, a piece of code runs perfectly on 10GB of > input, then for 38GB it runs forever (no deadlock). > > I believe that may occur because Flink spills information to disk every > time it runs out of memory... Is this fixable by increasing the number of > buffers? > > That's the last question for today, promise :) > > Thanks! > > On Fri, Jun 19, 2015 at 2:40 PM, Till Rohrmann <[hidden email]> > wrote: > > > Yes, it was an issue for the milestone release. > > > > On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu <[hidden email]> > wrote: > > > > > Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this > > > version? > > > I'll just fetch the latest master if this is the case. > > > > > > On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann <[hidden email]> > > > wrote: > > > > > > > Hi Andra, > > > > > > > > the problem seems to be that the deployment of some tasks takes > longer > > > than > > > > 100s. From the stack trace it looks as if you're not using the latest > > > > master. > > > > > > > > We had problems with previous version where the deployment call > waited > > > for > > > > the TM to completely download the user code jars. For large setups > the > > > > BlobServer became a bottleneck and some of the deployment calls timed > > > out. > > > > We updated the deployment logic so that the TM sends an immediate ACK > > > backt > > > > to the JM when it receives a new task. > > > > > > > > Could you verify which version of Flink you're running and in case > that > > > > it's not the latest master, could you please try to run your example > > with > > > > the latest code? > > > > > > > > Cheers, > > > > Till > > > > > > > > On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu <[hidden email]> > > > wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > I ran a job this morning on 30 wally nodes. DOP 224. Worked like a > > > charm. > > > > > > > > > > Then, I ran a similar job, on the exact same configuration, on the > > same > > > > > input data set. The only difference between them is that the second > > job > > > > > computes the degrees per vertex and, for vertices with degree > higher > > > > than a > > > > > user-defined threshold, it does a bit of magic(roughly a bunch of > > > > > coGroups). The problem is that, even before the extra functions get > > > > called, > > > > > I get the following type of exception: > > > > > > > > > > 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at > > > > > fromDataSet(Graph.java:171)) -> Combine(Distinct at > > > > > fromDataSet(Graph.java:171))(222/224) switched to FAILED > > > > > java.lang.IllegalStateException: Update task on instance > > > > > 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: > > > akka.tcp:// > > > > > flink@130.149.249.14:44528/user/taskmanager failed due to: > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) > > > > > at akka.dispatch.OnFailure.internal(Future.scala:228) > > > > > at akka.dispatch.OnFailure.internal(Future.scala:227) > > > > > at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > > > > > at > akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > > > > > at > > > > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > > > at > > > > > > > > > > > > > > > > > > > > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) > > > > > at > > > > > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) > > > > > at > > > > > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) > > > > > at > > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > > > > at > > > > > > > > > > > > > > > > > > > > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > > > > > at > > > > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > > > at > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > > > at > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > > > at > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > > > > > [Actor[akka.tcp:// > > flink@130.149.249.14:44528/user/taskmanager#82700874 > > > ]] > > > > > after [100000 ms] > > > > > at > > > > > > > > > > > > > > > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > > > > > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > > > > > at > > > > > > > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > > > > > at > > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > > > > > at > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > > > > > at > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > > > > > at > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > > > > > at > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > > > > > at java.lang.Thread.run(Thread.java:722) > > > > > > > > > > > > > > > At first I thought, okay maybe wally004 is down; then I ssh'd into > > it. > > > > > Works fine. > > > > > > > > > > The full output can be found here: > > > > > https://gist.github.com/andralungu/d222b75cb33aea57955d > > > > > > > > > > Does anyone have any idea about what may have triggered this? :( > > > > > > > > > > Thanks! > > > > > Andra > > > > > > > > > > > > > > > |
Hi Andra,
The system should never deadlock. There is a bug somewhere if that happens. Can you check if the program is really stuck? Cheers, Fabian 2015-06-19 15:08 GMT+02:00 Till Rohrmann <[hidden email]>: > What does forever mean? Usually it's the case that you see a steep decline > in performance once the system starts spilling data to disk because of the > disk I/O bottleneck. > > The system always starts spilling to disk when it has no more memory left > for its operations. So for example if you want to sort data which cannot be > kept completely in memory, then the system has to employ external sorting. > If you can give Flink more memory, then you can avoid this problem > depending on the actual data size. > > You can observe disk I/O by using the `iotop` command for example. If you > see Flink having a high I/O usage, then this might be an indicator for > spilling. > > Cheers, > Till > > On Fri, Jun 19, 2015 at 2:54 PM Andra Lungu <[hidden email]> wrote: > > > Another problem that I encountered during the same set of > experiments(sorry > > if I am asking too many questions, I am eager to get things fixed): > > - for the same configuration, a piece of code runs perfectly on 10GB of > > input, then for 38GB it runs forever (no deadlock). > > > > I believe that may occur because Flink spills information to disk every > > time it runs out of memory... Is this fixable by increasing the number of > > buffers? > > > > That's the last question for today, promise :) > > > > Thanks! > > > > On Fri, Jun 19, 2015 at 2:40 PM, Till Rohrmann <[hidden email]> > > wrote: > > > > > Yes, it was an issue for the milestone release. > > > > > > On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu <[hidden email]> > > wrote: > > > > > > > Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for this > > > > version? > > > > I'll just fetch the latest master if this is the case. > > > > > > > > On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann <[hidden email] > > > > > > wrote: > > > > > > > > > Hi Andra, > > > > > > > > > > the problem seems to be that the deployment of some tasks takes > > longer > > > > than > > > > > 100s. From the stack trace it looks as if you're not using the > latest > > > > > master. > > > > > > > > > > We had problems with previous version where the deployment call > > waited > > > > for > > > > > the TM to completely download the user code jars. For large setups > > the > > > > > BlobServer became a bottleneck and some of the deployment calls > timed > > > > out. > > > > > We updated the deployment logic so that the TM sends an immediate > ACK > > > > backt > > > > > to the JM when it receives a new task. > > > > > > > > > > Could you verify which version of Flink you're running and in case > > that > > > > > it's not the latest master, could you please try to run your > example > > > with > > > > > the latest code? > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu <[hidden email] > > > > > > wrote: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > I ran a job this morning on 30 wally nodes. DOP 224. Worked like > a > > > > charm. > > > > > > > > > > > > Then, I ran a similar job, on the exact same configuration, on > the > > > same > > > > > > input data set. The only difference between them is that the > second > > > job > > > > > > computes the degrees per vertex and, for vertices with degree > > higher > > > > > than a > > > > > > user-defined threshold, it does a bit of magic(roughly a bunch of > > > > > > coGroups). The problem is that, even before the extra functions > get > > > > > called, > > > > > > I get the following type of exception: > > > > > > > > > > > > 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at > > > > > > fromDataSet(Graph.java:171)) -> Combine(Distinct at > > > > > > fromDataSet(Graph.java:171))(222/224) switched to FAILED > > > > > > java.lang.IllegalStateException: Update task on instance > > > > > > 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: > > > > akka.tcp:// > > > > > > flink@130.149.249.14:44528/user/taskmanager failed due to: > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) > > > > > > at akka.dispatch.OnFailure.internal(Future.scala:228) > > > > > > at akka.dispatch.OnFailure.internal(Future.scala:227) > > > > > > at > > akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > > > > > > at > > akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > > > > > > at > > > > > > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) > > > > > > at > > > > > > > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) > > > > > > at > > > > > > > > scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) > > > > > > at > > > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) > > > > > > at > > > > > > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > > > > at > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > > > > > > [Actor[akka.tcp:// > > > flink@130.149.249.14:44528/user/taskmanager#82700874 > > > > ]] > > > > > > after [100000 ms] > > > > > > at > > > > > > > > > > > > > > > > > > > > > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > > > > > > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > > > > > > at > > > > > > > > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > > > > > > at > > > > > > > > > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > > > > > > at java.lang.Thread.run(Thread.java:722) > > > > > > > > > > > > > > > > > > At first I thought, okay maybe wally004 is down; then I ssh'd > into > > > it. > > > > > > Works fine. > > > > > > > > > > > > The full output can be found here: > > > > > > https://gist.github.com/andralungu/d222b75cb33aea57955d > > > > > > > > > > > > Does anyone have any idea about what may have triggered this? :( > > > > > > > > > > > > Thanks! > > > > > > Andra > > > > > > > > > > > > > > > > > > > > > |
I think Andra wrote that there is *no deadlock*.
On Fri, Jun 19, 2015 at 3:18 PM Fabian Hueske [hidden email] <http://mailto:fhueske@...> wrote: Hi Andra, > > The system should never deadlock. > There is a bug somewhere if that happens. > > Can you check if the program is really stuck? > > Cheers, Fabian > > 2015-06-19 15:08 GMT+02:00 Till Rohrmann <[hidden email]>: > > > What does forever mean? Usually it's the case that you see a steep > decline > > in performance once the system starts spilling data to disk because of > the > > disk I/O bottleneck. > > > > The system always starts spilling to disk when it has no more memory left > > for its operations. So for example if you want to sort data which cannot > be > > kept completely in memory, then the system has to employ external > sorting. > > If you can give Flink more memory, then you can avoid this problem > > depending on the actual data size. > > > > You can observe disk I/O by using the `iotop` command for example. If you > > see Flink having a high I/O usage, then this might be an indicator for > > spilling. > > > > Cheers, > > Till > > > > On Fri, Jun 19, 2015 at 2:54 PM Andra Lungu <[hidden email]> > wrote: > > > > > Another problem that I encountered during the same set of > > experiments(sorry > > > if I am asking too many questions, I am eager to get things fixed): > > > - for the same configuration, a piece of code runs perfectly on 10GB of > > > input, then for 38GB it runs forever (no deadlock). > > > > > > I believe that may occur because Flink spills information to disk every > > > time it runs out of memory... Is this fixable by increasing the number > of > > > buffers? > > > > > > That's the last question for today, promise :) > > > > > > Thanks! > > > > > > On Fri, Jun 19, 2015 at 2:40 PM, Till Rohrmann <[hidden email]> > > > wrote: > > > > > > > Yes, it was an issue for the milestone release. > > > > > > > > On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu <[hidden email]> > > > wrote: > > > > > > > > > Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for > this > > > > > version? > > > > > I'll just fetch the latest master if this is the case. > > > > > > > > > > On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann < > [hidden email] > > > > > > > > wrote: > > > > > > > > > > > Hi Andra, > > > > > > > > > > > > the problem seems to be that the deployment of some tasks takes > > > longer > > > > > than > > > > > > 100s. From the stack trace it looks as if you're not using the > > latest > > > > > > master. > > > > > > > > > > > > We had problems with previous version where the deployment call > > > waited > > > > > for > > > > > > the TM to completely download the user code jars. For large > setups > > > the > > > > > > BlobServer became a bottleneck and some of the deployment calls > > timed > > > > > out. > > > > > > We updated the deployment logic so that the TM sends an immediate > > ACK > > > > > backt > > > > > > to the JM when it receives a new task. > > > > > > > > > > > > Could you verify which version of Flink you're running and in > case > > > that > > > > > > it's not the latest master, could you please try to run your > > example > > > > with > > > > > > the latest code? > > > > > > > > > > > > Cheers, > > > > > > Till > > > > > > > > > > > > On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu < > [hidden email] > > > > > > > > wrote: > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > I ran a job this morning on 30 wally nodes. DOP 224. Worked > like > > a > > > > > charm. > > > > > > > > > > > > > > Then, I ran a similar job, on the exact same configuration, on > > the > > > > same > > > > > > > input data set. The only difference between them is that the > > second > > > > job > > > > > > > computes the degrees per vertex and, for vertices with degree > > > higher > > > > > > than a > > > > > > > user-defined threshold, it does a bit of magic(roughly a bunch > of > > > > > > > coGroups). The problem is that, even before the extra functions > > get > > > > > > called, > > > > > > > I get the following type of exception: > > > > > > > > > > > > > > 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at > > > > > > > fromDataSet(Graph.java:171)) -> Combine(Distinct at > > > > > > > fromDataSet(Graph.java:171))(222/224) switched to FAILED > > > > > > > java.lang.IllegalStateException: Update task on instance > > > > > > > 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: > > > > > akka.tcp:// > > > > > > > flink@130.149.249.14:44528/user/taskmanager failed due to: > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) > > > > > > > at akka.dispatch.OnFailure.internal(Future.scala:228) > > > > > > > at akka.dispatch.OnFailure.internal(Future.scala:227) > > > > > > > at > > > akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > > > > > > > at > > > akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > > > > > > > at > > > > > > > > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) > > > > > > > at > > > > > > > > > > scala.concurrent.Future$anonfun$onFailure$1.apply(Future.scala:136) > > > > > > > at > > > > > > > > > > scala.concurrent.Future$anonfun$onFailure$1.apply(Future.scala:134) > > > > > > > at > > > > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.impl.ExecutionContextImpl$anon$3.exec(ExecutionContextImpl.scala:107) > > > > > > > at > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > > > > > at > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > > > > > > > [Actor[akka.tcp:// > > > > flink@130.149.249.14:44528/user/taskmanager#82700874 > > > > > ]] > > > > > > > after [100000 ms] > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.pattern.PromiseActorRef$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > > > > > > > at akka.actor.Scheduler$anon$7.run(Scheduler.scala:117) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$unbatchedExecute(Future.scala:694) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$anon$8.executeBucket$1(Scheduler.scala:419) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$anon$8.nextTick(Scheduler.scala:423) > > > > > > > at > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$anon$8.run(Scheduler.scala:375) > > > > > > > at java.lang.Thread.run(Thread.java:722) > > > > > > > > > > > > > > > > > > > > > At first I thought, okay maybe wally004 is down; then I ssh'd > > into > > > > it. > > > > > > > Works fine. > > > > > > > > > > > > > > The full output can be found here: > > > > > > > https://gist.github.com/andralungu/d222b75cb33aea57955d > > > > > > > > > > > > > > Does anyone have any idea about what may have triggered this? > :( > > > > > > > > > > > > > > Thanks! > > > > > > > Andra > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
woops, sorry!
Whenever I read the word "deadlock" I getting a bit nervous and distracted ;-) 2015-06-19 15:21 GMT+02:00 Till Rohrmann <[hidden email]>: > I think Andra wrote that there is *no deadlock*. > > On Fri, Jun 19, 2015 at 3:18 PM Fabian Hueske [hidden email] > <http://mailto:fhueske@...> wrote: > > Hi Andra, > > > > The system should never deadlock. > > There is a bug somewhere if that happens. > > > > Can you check if the program is really stuck? > > > > Cheers, Fabian > > > > 2015-06-19 15:08 GMT+02:00 Till Rohrmann <[hidden email]>: > > > > > What does forever mean? Usually it's the case that you see a steep > > decline > > > in performance once the system starts spilling data to disk because of > > the > > > disk I/O bottleneck. > > > > > > The system always starts spilling to disk when it has no more memory > left > > > for its operations. So for example if you want to sort data which > cannot > > be > > > kept completely in memory, then the system has to employ external > > sorting. > > > If you can give Flink more memory, then you can avoid this problem > > > depending on the actual data size. > > > > > > You can observe disk I/O by using the `iotop` command for example. If > you > > > see Flink having a high I/O usage, then this might be an indicator for > > > spilling. > > > > > > Cheers, > > > Till > > > > > > On Fri, Jun 19, 2015 at 2:54 PM Andra Lungu <[hidden email]> > > wrote: > > > > > > > Another problem that I encountered during the same set of > > > experiments(sorry > > > > if I am asking too many questions, I am eager to get things fixed): > > > > - for the same configuration, a piece of code runs perfectly on 10GB > of > > > > input, then for 38GB it runs forever (no deadlock). > > > > > > > > I believe that may occur because Flink spills information to disk > every > > > > time it runs out of memory... Is this fixable by increasing the > number > > of > > > > buffers? > > > > > > > > That's the last question for today, promise :) > > > > > > > > Thanks! > > > > > > > > On Fri, Jun 19, 2015 at 2:40 PM, Till Rohrmann <[hidden email] > > > > > > wrote: > > > > > > > > > Yes, it was an issue for the milestone release. > > > > > > > > > > On Fri, Jun 19, 2015 at 2:18 PM Andra Lungu <[hidden email] > > > > > > wrote: > > > > > > > > > > > Yes, so I am using flink-0.9.0-milestone-1. Was it a problem for > > this > > > > > > version? > > > > > > I'll just fetch the latest master if this is the case. > > > > > > > > > > > > On Fri, Jun 19, 2015 at 2:12 PM, Till Rohrmann < > > [hidden email] > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Andra, > > > > > > > > > > > > > > the problem seems to be that the deployment of some tasks takes > > > > longer > > > > > > than > > > > > > > 100s. From the stack trace it looks as if you're not using the > > > latest > > > > > > > master. > > > > > > > > > > > > > > We had problems with previous version where the deployment call > > > > waited > > > > > > for > > > > > > > the TM to completely download the user code jars. For large > > setups > > > > the > > > > > > > BlobServer became a bottleneck and some of the deployment calls > > > timed > > > > > > out. > > > > > > > We updated the deployment logic so that the TM sends an > immediate > > > ACK > > > > > > backt > > > > > > > to the JM when it receives a new task. > > > > > > > > > > > > > > Could you verify which version of Flink you're running and in > > case > > > > that > > > > > > > it's not the latest master, could you please try to run your > > > example > > > > > with > > > > > > > the latest code? > > > > > > > > > > > > > > Cheers, > > > > > > > Till > > > > > > > > > > > > > > On Fri, Jun 19, 2015 at 1:42 PM Andra Lungu < > > [hidden email] > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > I ran a job this morning on 30 wally nodes. DOP 224. Worked > > like > > > a > > > > > > charm. > > > > > > > > > > > > > > > > Then, I ran a similar job, on the exact same configuration, > on > > > the > > > > > same > > > > > > > > input data set. The only difference between them is that the > > > second > > > > > job > > > > > > > > computes the degrees per vertex and, for vertices with degree > > > > higher > > > > > > > than a > > > > > > > > user-defined threshold, it does a bit of magic(roughly a > bunch > > of > > > > > > > > coGroups). The problem is that, even before the extra > functions > > > get > > > > > > > called, > > > > > > > > I get the following type of exception: > > > > > > > > > > > > > > > > 06/19/2015 12:06:43 CHAIN FlatMap (FlatMap at > > > > > > > > fromDataSet(Graph.java:171)) -> Combine(Distinct at > > > > > > > > fromDataSet(Graph.java:171))(222/224) switched to FAILED > > > > > > > > java.lang.IllegalStateException: Update task on instance > > > > > > > > 29073fb0b0957198a2b67569b042d56b @ wally004 - 8 slots - URL: > > > > > > akka.tcp:// > > > > > > > > flink@130.149.249.14:44528/user/taskmanager failed due to: > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.executiongraph.Execution$5.onFailure(Execution.java:860) > > > > > > > > at akka.dispatch.OnFailure.internal(Future.scala:228) > > > > > > > > at akka.dispatch.OnFailure.internal(Future.scala:227) > > > > > > > > at > > > > akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > > > > > > > > at > > > > akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > > > > > > > > at > > > > > > > > > > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) > > > > > > > > at > > > > > > > > > > > > scala.concurrent.Future$anonfun$onFailure$1.apply(Future.scala:136) > > > > > > > > at > > > > > > > > > > > > scala.concurrent.Future$anonfun$onFailure$1.apply(Future.scala:134) > > > > > > > > at > > > > > scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.impl.ExecutionContextImpl$anon$3.exec(ExecutionContextImpl.scala:107) > > > > > > > > at > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > > > > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > > > > > > > > [Actor[akka.tcp:// > > > > > flink@130.149.249.14:44528/user/taskmanager#82700874 > > > > > > ]] > > > > > > > > after [100000 ms] > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.pattern.PromiseActorRef$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > > > > > > > > at > akka.actor.Scheduler$anon$7.run(Scheduler.scala:117) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$unbatchedExecute(Future.scala:694) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$anon$8.executeBucket$1(Scheduler.scala:419) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$anon$8.nextTick(Scheduler.scala:423) > > > > > > > > at > > > > > > > > > > > > > > > akka.actor.LightArrayRevolverScheduler$anon$8.run(Scheduler.scala:375) > > > > > > > > at java.lang.Thread.run(Thread.java:722) > > > > > > > > > > > > > > > > > > > > > > > > At first I thought, okay maybe wally004 is down; then I > ssh'd > > > into > > > > > it. > > > > > > > > Works fine. > > > > > > > > > > > > > > > > The full output can be found here: > > > > > > > > https://gist.github.com/andralungu/d222b75cb33aea57955d > > > > > > > > > > > > > > > > Does anyone have any idea about what may have triggered this? > > :( > > > > > > > > > > > > > > > > Thanks! > > > > > > > > Andra > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
In reply to this post by Andra Lungu
On 19 Jun 2015, at 14:53, Andra Lungu <[hidden email]> wrote:
> Another problem that I encountered during the same set of experiments(sorry > if I am asking too many questions, I am eager to get things fixed): > - for the same configuration, a piece of code runs perfectly on 10GB of > input, then for 38GB it runs forever (no deadlock). > > I believe that may occur because Flink spills information to disk every > time it runs out of memory... Is this fixable by increasing the number of > buffers? If you are referring to the number of network buffers configuration key, that should be unrelated. If this really is the issue, you can increase the heap size for the task managers. Have you confirmed your suspicion as Till suggested via iotop? :) – Ufuk |
In reply to this post by Andra Lungu
Hi Andra,
I would try increasing the memory per task manager, i.e. on a machine with 8 CPUs and 16GBs of memory, instead of spawning 8 TMs with 2GB each, I would try to spawn 2 TMs of 8GBs each. This might help with the spilling problem (in case that the CPU is not your bottleneck, this might even speed up the computations by avoiding spilling) and get you unstuck. Cheers, Asterios On Fri, Jun 19, 2015 at 4:16 PM, Ufuk Celebi <[hidden email]> wrote: > On 19 Jun 2015, at 14:53, Andra Lungu <[hidden email]> wrote: > > > Another problem that I encountered during the same set of > experiments(sorry > > if I am asking too many questions, I am eager to get things fixed): > > - for the same configuration, a piece of code runs perfectly on 10GB of > > input, then for 38GB it runs forever (no deadlock). > > > > I believe that may occur because Flink spills information to disk every > > time it runs out of memory... Is this fixable by increasing the number of > > buffers? > > If you are referring to the number of network buffers configuration key, > that should be unrelated. If this really is the issue, you can increase the > heap size for the task managers. > > Have you confirmed your suspicion as Till suggested via iotop? :) > > – Ufuk |
Free forum by Nabble | Edit this page |