Hey,
We managed to produce a code, for which the legacy Stratophere 0.5 release implementation works nicely, however the updated Flink 0.6 release implementation hangs up for slightly larger inputs. Please check out the issue here: https://github.com/mbalassi/als-comparison Any suggestions are welcome. Cheers, Marton |
Hey Marton,
thanks for reporting the issue and the link to the repo to reproduce the problem. I will look into it later today. If you like, you could provide some more information in the meantime: - How the CPU load? - What are TM logs saying? - Can you give a stack trace? Where is it hanging? On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <[hidden email]> wrote: > Hey, > > We managed to produce a code, for which the legacy Stratophere 0.5 release > implementation works nicely, however the updated Flink 0.6 release > implementation hangs up for slightly larger inputs. > > > Please check out the issue here: > https://github.com/mbalassi/als-comparison > > Any suggestions are welcome. > > Cheers, > > Marton > |
CPU load:
Tested on my 4-core machine the CPU load spikes up at the beginning of the job and stays relatively high during the whole job when run with version 0.5, then finishes gracefully. On version 0.6 it works seemingly well until the hangup. Interestingly enough even when no more log messages appear my CPU utilization stays 10-15% higher per core then without running the job. logs: For both the implementation it starts like this: 09/04/2014 17:05:51: Job execution switched to status SCHEDULED 09/04/2014 17:05:51: DataSource (CSV Input (|) /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) switched to SCHEDULED 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched to SCHEDULED 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) switched to SCHEDULED 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) (1/1) switched to SCHEDULED 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) switched to SCHEDULED 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) (1/1) switched to SCHEDULED 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1) switched to SCHEDULED 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) (1/1) switched to SCHEDULED 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) switched to SCHEDULED [Omitted quite some healthy messages...] 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) switched to READY 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) switched to STARTING 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) switched to RUNNING 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) switched to READY 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) switched to STARTING 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1) switched to RUNNING 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1) switched to READY 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1) switched to STARTING 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1) switched to RUNNING 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) switched to READY 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) switched to STARTING 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1) switched to RUNNING Flink stops here, Strato continues: 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to FINISHING 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) switched to READY 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) switched to STARTING 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) switched to RUNNING 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched to FINISHING 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to READY 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to STARTING 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to RUNNING 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to FINISHING 09/04/2014 17:09:09: DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1) (1/1) switched to READY 09/04/2014 17:09:09: DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1) (1/1) switched to STARTING [Omitted quite some healthy messages...] 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) switched to FINISHED 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1) switched to FINISHED 09/04/2014 17:09:10: DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3) (1/1) switched to RUNNING 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1) switched to FINISHING 09/04/2014 17:09:10: DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3) (1/1) switched to FINISHING 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) (1/1) switched to FINISHED 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1) switched to FINISHED 09/04/2014 17:09:11: DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3) (1/1) switched to FINISHED 09/04/2014 17:09:11: Job execution switched to status FINISHED On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote: > Hey Marton, > > thanks for reporting the issue and the link to the repo to reproduce the > problem. I will look into it later today. > > If you like, you could provide some more information in the meantime: > > - How the CPU load? > - What are TM logs saying? > - Can you give a stack trace? Where is it hanging? > > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <[hidden email]> > wrote: > > > Hey, > > > > We managed to produce a code, for which the legacy Stratophere 0.5 > release > > implementation works nicely, however the updated Flink 0.6 release > > implementation hangs up for slightly larger inputs. > > > > > > Please check out the issue here: > > https://github.com/mbalassi/als-comparison > > > > Any suggestions are welcome. > > > > Cheers, > > > > Marton > > > |
Hi Marton,
a jstack Java stacktrace can help to identify where the code got stuck. Can you open a JIRA and post a stacktrace there? Cheers, Fabian 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>: > CPU load: > > Tested on my 4-core machine the CPU load spikes up at the beginning of the > job and stays relatively high during the whole job when run with version > 0.5, then finishes gracefully. On version 0.6 it works seemingly well until > the hangup. Interestingly enough even when no more log messages appear my > CPU utilization stays 10-15% higher per core then without running the job. > > logs: > > For both the implementation it starts like this: > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED > 09/04/2014 17:05:51: DataSource (CSV Input (|) > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) switched > to SCHEDULED > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched to > SCHEDULED > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) > switched to SCHEDULED > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) (1/1) > switched to SCHEDULED > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > switched to SCHEDULED > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) (1/1) > switched to SCHEDULED > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1) > switched to SCHEDULED > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) (1/1) > switched to SCHEDULED > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > switched to SCHEDULED > > [Omitted quite some healthy messages...] > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > switched to READY > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > switched to STARTING > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > switched to RUNNING > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > switched to READY > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > switched to STARTING > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1) > switched to RUNNING > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1) > switched to READY > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1) > switched to STARTING > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1) > switched to RUNNING > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > switched to READY > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > switched to STARTING > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1) > switched to RUNNING > > Flink stops here, Strato continues: > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to FINISHING > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) > switched to READY > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) > switched to STARTING > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) > switched to RUNNING > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched to > FINISHING > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to > READY > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to > STARTING > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to > RUNNING > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to > FINISHING > 09/04/2014 17:09:09: > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > ) > (1/1) switched to READY > 09/04/2014 17:09:09: > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > ) > (1/1) switched to STARTING > > [Omitted quite some healthy messages...] > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) (1/1) > switched to FINISHED > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1) > switched to FINISHED > 09/04/2014 17:09:10: > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > ) > (1/1) switched to RUNNING > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1) > switched to FINISHING > 09/04/2014 17:09:10: > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > ) > (1/1) switched to FINISHING > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) (1/1) > switched to FINISHED > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1) > switched to FINISHED > 09/04/2014 17:09:11: > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > ) > (1/1) switched to FINISHED > 09/04/2014 17:09:11: Job execution switched to status FINISHED > > > > > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote: > > > Hey Marton, > > > > thanks for reporting the issue and the link to the repo to reproduce the > > problem. I will look into it later today. > > > > If you like, you could provide some more information in the meantime: > > > > - How the CPU load? > > - What are TM logs saying? > > - Can you give a stack trace? Where is it hanging? > > > > > > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <[hidden email] > > > > wrote: > > > > > Hey, > > > > > > We managed to produce a code, for which the legacy Stratophere 0.5 > > release > > > implementation works nicely, however the updated Flink 0.6 release > > > implementation hangs up for slightly larger inputs. > > > > > > > > > Please check out the issue here: > > > https://github.com/mbalassi/als-comparison > > > > > > Any suggestions are welcome. > > > > > > Cheers, > > > > > > Marton > > > > > > |
Just talked to Marton and we figured out the issue with the help of
Stephan. ;-) Marton will file an issue. On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[hidden email]> wrote: > Hi Marton, > > a jstack Java stacktrace can help to identify where the code got stuck. > Can you open a JIRA and post a stacktrace there? > > Cheers, Fabian > > > 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>: > > > CPU load: > > > > Tested on my 4-core machine the CPU load spikes up at the beginning of > the > > job and stays relatively high during the whole job when run with version > > 0.5, then finishes gracefully. On version 0.6 it works seemingly well > until > > the hangup. Interestingly enough even when no more log messages appear my > > CPU utilization stays 10-15% higher per core then without running the > job. > > > > logs: > > > > For both the implementation it starts like this: > > > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED > > 09/04/2014 17:05:51: DataSource (CSV Input (|) > > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) switched > > to SCHEDULED > > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched > to > > SCHEDULED > > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to SCHEDULED > > > > [Omitted quite some healthy messages...] > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > > switched to READY > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > > switched to STARTING > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > > switched to RUNNING > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > switched to READY > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > switched to STARTING > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING > > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1) > > switched to RUNNING > > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to READY > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to STARTING > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to RUNNING > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to READY > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to STARTING > > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to RUNNING > > > > Flink stops here, Strato continues: > > > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to > FINISHING > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to READY > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to STARTING > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to RUNNING > > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched > to > > FINISHING > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched > to > > READY > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched > to > > STARTING > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched > to > > RUNNING > > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) switched > to > > FINISHING > > 09/04/2014 17:09:09: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > ) > > (1/1) switched to READY > > 09/04/2014 17:09:09: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > ) > > (1/1) switched to STARTING > > > > [Omitted quite some healthy messages...] > > > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to FINISHED > > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1) > > switched to FINISHED > > 09/04/2014 17:09:10: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > ) > > (1/1) switched to RUNNING > > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1) > > switched to FINISHING > > 09/04/2014 17:09:10: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > ) > > (1/1) switched to FINISHING > > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to FINISHED > > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1) > > switched to FINISHED > > 09/04/2014 17:09:11: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > ) > > (1/1) switched to FINISHED > > 09/04/2014 17:09:11: Job execution switched to status FINISHED > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote: > > > > > Hey Marton, > > > > > > thanks for reporting the issue and the link to the repo to reproduce > the > > > problem. I will look into it later today. > > > > > > If you like, you could provide some more information in the meantime: > > > > > > - How the CPU load? > > > - What are TM logs saying? > > > - Can you give a stack trace? Where is it hanging? > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi < > [hidden email] > > > > > > wrote: > > > > > > > Hey, > > > > > > > > We managed to produce a code, for which the legacy Stratophere 0.5 > > > release > > > > implementation works nicely, however the updated Flink 0.6 release > > > > implementation hangs up for slightly larger inputs. > > > > > > > > > > > > Please check out the issue here: > > > > https://github.com/mbalassi/als-comparison > > > > > > > > Any suggestions are welcome. > > > > > > > > Cheers, > > > > > > > > Marton > > > > > > > > > > |
Unnecessarily constrained the initialization of the superstep kickoff latch
to the iteration head. I think it will work once we make it a "first requester initializes" latch. Stephan |
In reply to this post by Fabian Hueske
Thanks, Ufuk found the relevant part in the stacktrace:
"Join(Sends the rows of p with multiple keys)) (1/1)" daemon prio=10 tid=0x00007f8928014800 nid=0x998 waiting on condition [0x00007f8912eed000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007d2668ea0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374) at org.apache.flink.runtime.iterative.concurrent.Broker.get(Broker.java:63) at org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:84) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265) at java.lang.Thread.run(Thread.java:744) This part waits for the iteration head which has not been started yet and thus induces a deadlock. Opened a JIRA issue on it: https://issues.apache.org/jira/browse/FLINK-1088 Thanks for the quick response by the way! On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[hidden email]> wrote: > Hi Marton, > > a jstack Java stacktrace can help to identify where the code got stuck. > Can you open a JIRA and post a stacktrace there? > > Cheers, Fabian > > > 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>: > > > CPU load: > > > > Tested on my 4-core machine the CPU load spikes up at the beginning of > the > > job and stays relatively high during the whole job when run with version > > 0.5, then finishes gracefully. On version 0.6 it works seemingly well > until > > the hangup. Interestingly enough even when no more log messages appear my > > CPU utilization stays 10-15% higher per core then without running the > job. > > > > logs: > > > > For both the implementation it starts like this: > > > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED > > 09/04/2014 17:05:51: DataSource (CSV Input (|) > > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) switched > > to SCHEDULED > > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched > to > > SCHEDULED > > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to SCHEDULED > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to SCHEDULED > > > > [Omitted quite some healthy messages...] > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > > switched to READY > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > > switched to STARTING > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1) > > switched to RUNNING > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > switched to READY > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > switched to STARTING > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING > > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1) > > switched to RUNNING > > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to READY > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to STARTING > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to RUNNING > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to READY > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to STARTING > > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1) > > switched to RUNNING > > > > Flink stops here, Strato continues: > > > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to > FINISHING > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to READY > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to STARTING > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to RUNNING > > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched > to > > FINISHING > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched > to > > READY > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched > to > > STARTING > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched > to > > RUNNING > > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) switched > to > > FINISHING > > 09/04/2014 17:09:09: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > ) > > (1/1) switched to READY > > 09/04/2014 17:09:09: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > ) > > (1/1) switched to STARTING > > > > [Omitted quite some healthy messages...] > > > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) > (1/1) > > switched to FINISHED > > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1) > > switched to FINISHED > > 09/04/2014 17:09:10: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > ) > > (1/1) switched to RUNNING > > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1) > > switched to FINISHING > > 09/04/2014 17:09:10: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > ) > > (1/1) switched to FINISHING > > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) > (1/1) > > switched to FINISHED > > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1) > > switched to FINISHED > > 09/04/2014 17:09:11: > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > ) > > (1/1) switched to FINISHED > > 09/04/2014 17:09:11: Job execution switched to status FINISHED > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote: > > > > > Hey Marton, > > > > > > thanks for reporting the issue and the link to the repo to reproduce > the > > > problem. I will look into it later today. > > > > > > If you like, you could provide some more information in the meantime: > > > > > > - How the CPU load? > > > - What are TM logs saying? > > > - Can you give a stack trace? Where is it hanging? > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi < > [hidden email] > > > > > > wrote: > > > > > > > Hey, > > > > > > > > We managed to produce a code, for which the legacy Stratophere 0.5 > > > release > > > > implementation works nicely, however the updated Flink 0.6 release > > > > implementation hangs up for slightly larger inputs. > > > > > > > > > > > > Please check out the issue here: > > > > https://github.com/mbalassi/als-comparison > > > > > > > > Any suggestions are welcome. > > > > > > > > Cheers, > > > > > > > > Marton > > > > > > > > > > |
The above suggested solution would work for the SuperstepKickoffLatch, but
unfortunately not for the other broker structures. So the problem does persist in other cases. A correct solution would be to prevent backpressure on forking flows, which will come as part of the network-stack rewrite. That solves it properly, I would prefer to go for that solution. A temporary workaround would be the following: - Find the data sets that are consumed both inside the iteration and outside the iteration. Those are typically preprocesses matrices or so. - Duplicate that code to actually have two different subprograms (producing different data sets) for that. - Use a different data set inside the iteration and outside the iteration. Stephan On Thu, Sep 4, 2014 at 6:14 PM, Márton Balassi <[hidden email]> wrote: > Thanks, Ufuk found the relevant part in the stacktrace: > > "Join(Sends the rows of p with multiple keys)) (1/1)" daemon prio=10 > tid=0x00007f8928014800 nid=0x998 waiting on condition [0x00007f8912eed000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007d2668ea0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > at > java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374) > at > org.apache.flink.runtime.iterative.concurrent.Broker.get(Broker.java:63) > at > > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:84) > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265) > at java.lang.Thread.run(Thread.java:744) > > This part waits for the iteration head which has not been started yet and > thus induces a deadlock. > > Opened a JIRA issue on it: > https://issues.apache.org/jira/browse/FLINK-1088 > > Thanks for the quick response by the way! > > > On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[hidden email]> wrote: > > > Hi Marton, > > > > a jstack Java stacktrace can help to identify where the code got stuck. > > Can you open a JIRA and post a stacktrace there? > > > > Cheers, Fabian > > > > > > 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>: > > > > > CPU load: > > > > > > Tested on my 4-core machine the CPU load spikes up at the beginning of > > the > > > job and stays relatively high during the whole job when run with > version > > > 0.5, then finishes gracefully. On version 0.6 it works seemingly well > > until > > > the hangup. Interestingly enough even when no more log messages appear > my > > > CPU utilization stays 10-15% higher per core then without running the > > job. > > > > > > logs: > > > > > > For both the implementation it starts like this: > > > > > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED > > > 09/04/2014 17:05:51: DataSource (CSV Input (|) > > > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) > switched > > > to SCHEDULED > > > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched > > to > > > SCHEDULED > > > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) > > (1/1) > > > switched to SCHEDULED > > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > > (1/1) > > > switched to SCHEDULED > > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > > switched to SCHEDULED > > > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) > (1/1) > > > switched to SCHEDULED > > > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1) > > > switched to SCHEDULED > > > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED > > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > > (1/1) > > > switched to SCHEDULED > > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > > switched to SCHEDULED > > > > > > [Omitted quite some healthy messages...] > > > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > (1/1) > > > switched to READY > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > (1/1) > > > switched to STARTING > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > (1/1) > > > switched to RUNNING > > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > > switched to READY > > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY > > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > > switched to STARTING > > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING > > > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1) > > > switched to RUNNING > > > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > (1/1) > > > switched to READY > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > (1/1) > > > switched to STARTING > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > (1/1) > > > switched to RUNNING > > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > > switched to READY > > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > > switched to STARTING > > > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1) > > > switched to RUNNING > > > > > > Flink stops here, Strato continues: > > > > > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to > > FINISHING > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > (1/1) > > > switched to READY > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > (1/1) > > > switched to STARTING > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > (1/1) > > > switched to RUNNING > > > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched > > to > > > FINISHING > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > switched > > to > > > READY > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > switched > > to > > > STARTING > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > switched > > to > > > RUNNING > > > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) > switched > > to > > > FINISHING > > > 09/04/2014 17:09:09: > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > > ) > > > (1/1) switched to READY > > > 09/04/2014 17:09:09: > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > > ) > > > (1/1) switched to STARTING > > > > > > [Omitted quite some healthy messages...] > > > > > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) > > (1/1) > > > switched to FINISHED > > > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1) > > > switched to FINISHED > > > 09/04/2014 17:09:10: > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > ) > > > (1/1) switched to RUNNING > > > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1) > > > switched to FINISHING > > > 09/04/2014 17:09:10: > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > ) > > > (1/1) switched to FINISHING > > > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) > > (1/1) > > > switched to FINISHED > > > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1) > > > switched to FINISHED > > > 09/04/2014 17:09:11: > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > ) > > > (1/1) switched to FINISHED > > > 09/04/2014 17:09:11: Job execution switched to status FINISHED > > > > > > > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote: > > > > > > > Hey Marton, > > > > > > > > thanks for reporting the issue and the link to the repo to reproduce > > the > > > > problem. I will look into it later today. > > > > > > > > If you like, you could provide some more information in the meantime: > > > > > > > > - How the CPU load? > > > > - What are TM logs saying? > > > > - Can you give a stack trace? Where is it hanging? > > > > > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi < > > [hidden email] > > > > > > > > wrote: > > > > > > > > > Hey, > > > > > > > > > > We managed to produce a code, for which the legacy Stratophere 0.5 > > > > release > > > > > implementation works nicely, however the updated Flink 0.6 release > > > > > implementation hangs up for slightly larger inputs. > > > > > > > > > > > > > > > Please check out the issue here: > > > > > https://github.com/mbalassi/als-comparison > > > > > > > > > > Any suggestions are welcome. > > > > > > > > > > Cheers, > > > > > > > > > > Marton > > > > > > > > > > > > > > > |
Thanks, the workaround works flawlessly on my side.
On Thu, Sep 4, 2014 at 6:49 PM, Stephan Ewen <[hidden email]> wrote: > The above suggested solution would work for the SuperstepKickoffLatch, but > unfortunately not for the other broker structures. > > So the problem does persist in other cases. > > A correct solution would be to prevent backpressure on forking flows, which > will come as part of the network-stack rewrite. That solves it properly, I > would prefer to go for that solution. > > A temporary workaround would be the following: > - Find the data sets that are consumed both inside the iteration and > outside the iteration. Those are typically preprocesses matrices or so. > - Duplicate that code to actually have two different subprograms > (producing different data sets) for that. > - Use a different data set inside the iteration and outside the iteration. > > Stephan > > > > > On Thu, Sep 4, 2014 at 6:14 PM, Márton Balassi <[hidden email]> > wrote: > > > Thanks, Ufuk found the relevant part in the stacktrace: > > > > "Join(Sends the rows of p with multiple keys)) (1/1)" daemon prio=10 > > tid=0x00007f8928014800 nid=0x998 waiting on condition > [0x00007f8912eed000] > > java.lang.Thread.State: WAITING (parking) > > at sun.misc.Unsafe.park(Native Method) > > - parking to wait for <0x00000007d2668ea0> (a > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > at > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > at > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > > at > > java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374) > > at > > org.apache.flink.runtime.iterative.concurrent.Broker.get(Broker.java:63) > > at > > > > > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:84) > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375) > > at > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265) > > at java.lang.Thread.run(Thread.java:744) > > > > This part waits for the iteration head which has not been started yet and > > thus induces a deadlock. > > > > Opened a JIRA issue on it: > > https://issues.apache.org/jira/browse/FLINK-1088 > > > > Thanks for the quick response by the way! > > > > > > On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[hidden email]> > wrote: > > > > > Hi Marton, > > > > > > a jstack Java stacktrace can help to identify where the code got stuck. > > > Can you open a JIRA and post a stacktrace there? > > > > > > Cheers, Fabian > > > > > > > > > 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>: > > > > > > > CPU load: > > > > > > > > Tested on my 4-core machine the CPU load spikes up at the beginning > of > > > the > > > > job and stays relatively high during the whole job when run with > > version > > > > 0.5, then finishes gracefully. On version 0.6 it works seemingly well > > > until > > > > the hangup. Interestingly enough even when no more log messages > appear > > my > > > > CPU utilization stays 10-15% higher per core then without running the > > > job. > > > > > > > > logs: > > > > > > > > For both the implementation it starts like this: > > > > > > > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED > > > > 09/04/2014 17:05:51: DataSource (CSV Input (|) > > > > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) > > switched > > > > to SCHEDULED > > > > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) > switched > > > to > > > > SCHEDULED > > > > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) > > (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED > > > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to SCHEDULED > > > > > > > > [Omitted quite some healthy messages...] > > > > > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > > (1/1) > > > > switched to READY > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > > (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > > (1/1) > > > > switched to RUNNING > > > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > > > switched to READY > > > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY > > > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING > > > > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1) > > > > switched to RUNNING > > > > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING > > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to READY > > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to RUNNING > > > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to READY > > > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to RUNNING > > > > > > > > Flink stops here, Strato continues: > > > > > > > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to > > > FINISHING > > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to READY > > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to RUNNING > > > > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) > switched > > > to > > > > FINISHING > > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > > switched > > > to > > > > READY > > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > > switched > > > to > > > > STARTING > > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > > switched > > > to > > > > RUNNING > > > > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) > > switched > > > to > > > > FINISHING > > > > 09/04/2014 17:09:09: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > > > ) > > > > (1/1) switched to READY > > > > 09/04/2014 17:09:09: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > > > ) > > > > (1/1) switched to STARTING > > > > > > > > [Omitted quite some healthy messages...] > > > > > > > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to FINISHED > > > > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1) > > > > switched to FINISHED > > > > 09/04/2014 17:09:10: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > > ) > > > > (1/1) switched to RUNNING > > > > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1) > > > > switched to FINISHING > > > > 09/04/2014 17:09:10: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > > ) > > > > (1/1) switched to FINISHING > > > > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to FINISHED > > > > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1) > > > > switched to FINISHED > > > > 09/04/2014 17:09:11: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > > ) > > > > (1/1) switched to FINISHED > > > > 09/04/2014 17:09:11: Job execution switched to status FINISHED > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote: > > > > > > > > > Hey Marton, > > > > > > > > > > thanks for reporting the issue and the link to the repo to > reproduce > > > the > > > > > problem. I will look into it later today. > > > > > > > > > > If you like, you could provide some more information in the > meantime: > > > > > > > > > > - How the CPU load? > > > > > - What are TM logs saying? > > > > > - Can you give a stack trace? Where is it hanging? > > > > > > > > > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi < > > > [hidden email] > > > > > > > > > > wrote: > > > > > > > > > > > Hey, > > > > > > > > > > > > We managed to produce a code, for which the legacy Stratophere > 0.5 > > > > > release > > > > > > implementation works nicely, however the updated Flink 0.6 > release > > > > > > implementation hangs up for slightly larger inputs. > > > > > > > > > > > > > > > > > > Please check out the issue here: > > > > > > https://github.com/mbalassi/als-comparison > > > > > > > > > > > > Any suggestions are welcome. > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Marton > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |