Flink 0.6 hang up

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 0.6 hang up

Márton Balassi
Hey,

We managed to produce a code, for which the legacy Stratophere 0.5 release
implementation works nicely, however the updated Flink 0.6 release
implementation hangs up for slightly larger inputs.


Please check out the issue here:
https://github.com/mbalassi/als-comparison

Any suggestions are welcome.

Cheers,

Marton
Reply | Threaded
Open this post in threaded view
|

Re: Flink 0.6 hang up

Ufuk Celebi-2
Hey Marton,

thanks for reporting the issue and the link to the repo to reproduce the
problem. I will look into it later today.

If you like, you could provide some more information in the meantime:

- How the CPU load?
- What are TM logs saying?
- Can you give a stack trace? Where is it hanging?



On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <[hidden email]>
wrote:

> Hey,
>
> We managed to produce a code, for which the legacy Stratophere 0.5 release
> implementation works nicely, however the updated Flink 0.6 release
> implementation hangs up for slightly larger inputs.
>
>
> Please check out the issue here:
> https://github.com/mbalassi/als-comparison
>
> Any suggestions are welcome.
>
> Cheers,
>
> Marton
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 0.6 hang up

Márton Balassi
CPU load:

Tested on my 4-core machine the CPU load spikes up at the beginning of the
job and stays relatively high during the whole job when run with version
0.5, then finishes gracefully. On version 0.6 it works seemingly well until
the hangup. Interestingly enough even when no more log messages appear my
CPU utilization stays 10-15% higher per core then without running the job.

logs:

For both the implementation it starts like this:

09/04/2014 17:05:51: Job execution switched to status SCHEDULED
09/04/2014 17:05:51: DataSource (CSV Input (|)
/home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) switched
to SCHEDULED
09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched to
SCHEDULED
09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
switched to SCHEDULED
09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) (1/1)
switched to SCHEDULED
09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
switched to SCHEDULED
09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) (1/1)
switched to SCHEDULED
09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1)
switched to SCHEDULED
09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED
09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) (1/1)
switched to SCHEDULED
09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
switched to SCHEDULED

[Omitted quite some healthy messages...]

09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
switched to READY
09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
switched to STARTING
09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
switched to RUNNING
09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
switched to READY
09/04/2014 17:05:53: Fake Tail (1/1) switched to READY
09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
switched to STARTING
09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING
09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1)
switched to RUNNING
09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING
09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1)
switched to READY
09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1)
switched to STARTING
09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1)
switched to RUNNING
09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
switched to READY
09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
switched to STARTING
09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1)
switched to RUNNING

Flink stops here, Strato continues:

09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to FINISHING
09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
switched to READY
09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
switched to STARTING
09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
switched to RUNNING
09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched to
FINISHING
09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to
READY
09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to
STARTING
09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to
RUNNING
09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to
FINISHING
09/04/2014 17:09:09:
DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1)
(1/1) switched to READY
09/04/2014 17:09:09:
DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1)
(1/1) switched to STARTING

[Omitted quite some healthy messages...]

09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
switched to FINISHED
09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1)
switched to FINISHED
09/04/2014 17:09:10:
DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3)
(1/1) switched to RUNNING
09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1)
switched to FINISHING
09/04/2014 17:09:10:
DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3)
(1/1) switched to FINISHING
09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) (1/1)
switched to FINISHED
09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1)
switched to FINISHED
09/04/2014 17:09:11:
DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3)
(1/1) switched to FINISHED
09/04/2014 17:09:11: Job execution switched to status FINISHED





On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote:

> Hey Marton,
>
> thanks for reporting the issue and the link to the repo to reproduce the
> problem. I will look into it later today.
>
> If you like, you could provide some more information in the meantime:
>
> - How the CPU load?
> - What are TM logs saying?
> - Can you give a stack trace? Where is it hanging?
>
>
>
> On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <[hidden email]>
> wrote:
>
> > Hey,
> >
> > We managed to produce a code, for which the legacy Stratophere 0.5
> release
> > implementation works nicely, however the updated Flink 0.6 release
> > implementation hangs up for slightly larger inputs.
> >
> >
> > Please check out the issue here:
> > https://github.com/mbalassi/als-comparison
> >
> > Any suggestions are welcome.
> >
> > Cheers,
> >
> > Marton
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 0.6 hang up

Fabian Hueske
Hi Marton,

a jstack Java stacktrace can help to identify where the code got stuck.
Can you open a JIRA and post a stacktrace there?

Cheers, Fabian


2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>:

> CPU load:
>
> Tested on my 4-core machine the CPU load spikes up at the beginning of the
> job and stays relatively high during the whole job when run with version
> 0.5, then finishes gracefully. On version 0.6 it works seemingly well until
> the hangup. Interestingly enough even when no more log messages appear my
> CPU utilization stays 10-15% higher per core then without running the job.
>
> logs:
>
> For both the implementation it starts like this:
>
> 09/04/2014 17:05:51: Job execution switched to status SCHEDULED
> 09/04/2014 17:05:51: DataSource (CSV Input (|)
> /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) switched
> to SCHEDULED
> 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched to
> SCHEDULED
> 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
> switched to SCHEDULED
> 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) (1/1)
> switched to SCHEDULED
> 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> switched to SCHEDULED
> 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) (1/1)
> switched to SCHEDULED
> 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1)
> switched to SCHEDULED
> 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED
> 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) (1/1)
> switched to SCHEDULED
> 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> switched to SCHEDULED
>
> [Omitted quite some healthy messages...]
>
> 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> switched to READY
> 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> switched to STARTING
> 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> switched to RUNNING
> 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> switched to READY
> 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY
> 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> switched to STARTING
> 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING
> 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1)
> switched to RUNNING
> 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING
> 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1)
> switched to READY
> 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1)
> switched to STARTING
> 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) (1/1)
> switched to RUNNING
> 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> switched to READY
> 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> switched to STARTING
> 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1)
> switched to RUNNING
>
> Flink stops here, Strato continues:
>
> 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to FINISHING
> 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
> switched to READY
> 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
> switched to STARTING
> 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
> switched to RUNNING
> 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched to
> FINISHING
> 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to
> READY
> 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to
> STARTING
> 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to
> RUNNING
> 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) switched to
> FINISHING
> 09/04/2014 17:09:09:
>
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> )
> (1/1) switched to READY
> 09/04/2014 17:09:09:
>
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> )
> (1/1) switched to STARTING
>
> [Omitted quite some healthy messages...]
>
> 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) (1/1)
> switched to FINISHED
> 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1)
> switched to FINISHED
> 09/04/2014 17:09:10:
>
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> )
> (1/1) switched to RUNNING
> 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1)
> switched to FINISHING
> 09/04/2014 17:09:10:
>
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> )
> (1/1) switched to FINISHING
> 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) (1/1)
> switched to FINISHED
> 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1)
> switched to FINISHED
> 09/04/2014 17:09:11:
>
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> )
> (1/1) switched to FINISHED
> 09/04/2014 17:09:11: Job execution switched to status FINISHED
>
>
>
>
>
> On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote:
>
> > Hey Marton,
> >
> > thanks for reporting the issue and the link to the repo to reproduce the
> > problem. I will look into it later today.
> >
> > If you like, you could provide some more information in the meantime:
> >
> > - How the CPU load?
> > - What are TM logs saying?
> > - Can you give a stack trace? Where is it hanging?
> >
> >
> >
> > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <[hidden email]
> >
> > wrote:
> >
> > > Hey,
> > >
> > > We managed to produce a code, for which the legacy Stratophere 0.5
> > release
> > > implementation works nicely, however the updated Flink 0.6 release
> > > implementation hangs up for slightly larger inputs.
> > >
> > >
> > > Please check out the issue here:
> > > https://github.com/mbalassi/als-comparison
> > >
> > > Any suggestions are welcome.
> > >
> > > Cheers,
> > >
> > > Marton
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 0.6 hang up

Ufuk Celebi-2
Just talked to Marton and we figured out the issue with the help of
Stephan. ;-)

Marton will file an issue.


On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[hidden email]> wrote:

> Hi Marton,
>
> a jstack Java stacktrace can help to identify where the code got stuck.
> Can you open a JIRA and post a stacktrace there?
>
> Cheers, Fabian
>
>
> 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>:
>
> > CPU load:
> >
> > Tested on my 4-core machine the CPU load spikes up at the beginning of
> the
> > job and stays relatively high during the whole job when run with version
> > 0.5, then finishes gracefully. On version 0.6 it works seemingly well
> until
> > the hangup. Interestingly enough even when no more log messages appear my
> > CPU utilization stays 10-15% higher per core then without running the
> job.
> >
> > logs:
> >
> > For both the implementation it starts like this:
> >
> > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED
> > 09/04/2014 17:05:51: DataSource (CSV Input (|)
> > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) switched
> > to SCHEDULED
> > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched
> to
> > SCHEDULED
> > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED
> > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to SCHEDULED
> >
> > [Omitted quite some healthy messages...]
> >
> > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> > switched to READY
> > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> > switched to STARTING
> > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> > switched to RUNNING
> > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > switched to READY
> > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY
> > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > switched to STARTING
> > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING
> > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1)
> > switched to RUNNING
> > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING
> > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to READY
> > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to STARTING
> > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to RUNNING
> > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to READY
> > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to STARTING
> > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to RUNNING
> >
> > Flink stops here, Strato continues:
> >
> > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to
> FINISHING
> > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to READY
> > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to STARTING
> > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to RUNNING
> > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched
> to
> > FINISHING
> > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched
> to
> > READY
> > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched
> to
> > STARTING
> > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched
> to
> > RUNNING
> > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) switched
> to
> > FINISHING
> > 09/04/2014 17:09:09:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > )
> > (1/1) switched to READY
> > 09/04/2014 17:09:09:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > )
> > (1/1) switched to STARTING
> >
> > [Omitted quite some healthy messages...]
> >
> > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to FINISHED
> > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1)
> > switched to FINISHED
> > 09/04/2014 17:09:10:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > )
> > (1/1) switched to RUNNING
> > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1)
> > switched to FINISHING
> > 09/04/2014 17:09:10:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > )
> > (1/1) switched to FINISHING
> > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to FINISHED
> > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1)
> > switched to FINISHED
> > 09/04/2014 17:09:11:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > )
> > (1/1) switched to FINISHED
> > 09/04/2014 17:09:11: Job execution switched to status FINISHED
> >
> >
> >
> >
> >
> > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote:
> >
> > > Hey Marton,
> > >
> > > thanks for reporting the issue and the link to the repo to reproduce
> the
> > > problem. I will look into it later today.
> > >
> > > If you like, you could provide some more information in the meantime:
> > >
> > > - How the CPU load?
> > > - What are TM logs saying?
> > > - Can you give a stack trace? Where is it hanging?
> > >
> > >
> > >
> > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <
> [hidden email]
> > >
> > > wrote:
> > >
> > > > Hey,
> > > >
> > > > We managed to produce a code, for which the legacy Stratophere 0.5
> > > release
> > > > implementation works nicely, however the updated Flink 0.6 release
> > > > implementation hangs up for slightly larger inputs.
> > > >
> > > >
> > > > Please check out the issue here:
> > > > https://github.com/mbalassi/als-comparison
> > > >
> > > > Any suggestions are welcome.
> > > >
> > > > Cheers,
> > > >
> > > > Marton
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 0.6 hang up

Stephan Ewen
Unnecessarily constrained the initialization of the superstep kickoff latch
to the iteration head.

I think it will work once we make it a "first requester initializes" latch.

Stephan
Reply | Threaded
Open this post in threaded view
|

Re: Flink 0.6 hang up

Márton Balassi
In reply to this post by Fabian Hueske
Thanks, Ufuk found the relevant part in the stacktrace:

"Join(Sends the rows of p with multiple keys)) (1/1)" daemon prio=10
tid=0x00007f8928014800 nid=0x998 waiting on condition [0x00007f8912eed000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007d2668ea0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
        at
org.apache.flink.runtime.iterative.concurrent.Broker.get(Broker.java:63)
        at
org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:84)
        at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
        at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
        at java.lang.Thread.run(Thread.java:744)

This part waits for the iteration head which has not been started yet and
thus induces a deadlock.

Opened a JIRA issue on it:
https://issues.apache.org/jira/browse/FLINK-1088

Thanks for the quick response by the way!


On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[hidden email]> wrote:

> Hi Marton,
>
> a jstack Java stacktrace can help to identify where the code got stuck.
> Can you open a JIRA and post a stacktrace there?
>
> Cheers, Fabian
>
>
> 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>:
>
> > CPU load:
> >
> > Tested on my 4-core machine the CPU load spikes up at the beginning of
> the
> > job and stays relatively high during the whole job when run with version
> > 0.5, then finishes gracefully. On version 0.6 it works seemingly well
> until
> > the hangup. Interestingly enough even when no more log messages appear my
> > CPU utilization stays 10-15% higher per core then without running the
> job.
> >
> > logs:
> >
> > For both the implementation it starts like this:
> >
> > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED
> > 09/04/2014 17:05:51: DataSource (CSV Input (|)
> > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) switched
> > to SCHEDULED
> > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched
> to
> > SCHEDULED
> > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED
> > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to SCHEDULED
> > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to SCHEDULED
> >
> > [Omitted quite some healthy messages...]
> >
> > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> > switched to READY
> > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> > switched to STARTING
> > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) (1/1)
> > switched to RUNNING
> > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > switched to READY
> > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY
> > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > switched to STARTING
> > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING
> > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1)
> > switched to RUNNING
> > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING
> > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to READY
> > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to STARTING
> > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to RUNNING
> > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to READY
> > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to STARTING
> > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1)
> > switched to RUNNING
> >
> > Flink stops here, Strato continues:
> >
> > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to
> FINISHING
> > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to READY
> > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to STARTING
> > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to RUNNING
> > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched
> to
> > FINISHING
> > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched
> to
> > READY
> > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched
> to
> > STARTING
> > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) switched
> to
> > RUNNING
> > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) switched
> to
> > FINISHING
> > 09/04/2014 17:09:09:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > )
> > (1/1) switched to READY
> > 09/04/2014 17:09:09:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > )
> > (1/1) switched to STARTING
> >
> > [Omitted quite some healthy messages...]
> >
> > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration))
> (1/1)
> > switched to FINISHED
> > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1)
> > switched to FINISHED
> > 09/04/2014 17:09:10:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > )
> > (1/1) switched to RUNNING
> > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1)
> > switched to FINISHING
> > 09/04/2014 17:09:10:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > )
> > (1/1) switched to FINISHING
> > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys)
> (1/1)
> > switched to FINISHED
> > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1)
> > switched to FINISHED
> > 09/04/2014 17:09:11:
> >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > )
> > (1/1) switched to FINISHED
> > 09/04/2014 17:09:11: Job execution switched to status FINISHED
> >
> >
> >
> >
> >
> > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote:
> >
> > > Hey Marton,
> > >
> > > thanks for reporting the issue and the link to the repo to reproduce
> the
> > > problem. I will look into it later today.
> > >
> > > If you like, you could provide some more information in the meantime:
> > >
> > > - How the CPU load?
> > > - What are TM logs saying?
> > > - Can you give a stack trace? Where is it hanging?
> > >
> > >
> > >
> > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <
> [hidden email]
> > >
> > > wrote:
> > >
> > > > Hey,
> > > >
> > > > We managed to produce a code, for which the legacy Stratophere 0.5
> > > release
> > > > implementation works nicely, however the updated Flink 0.6 release
> > > > implementation hangs up for slightly larger inputs.
> > > >
> > > >
> > > > Please check out the issue here:
> > > > https://github.com/mbalassi/als-comparison
> > > >
> > > > Any suggestions are welcome.
> > > >
> > > > Cheers,
> > > >
> > > > Marton
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 0.6 hang up

Stephan Ewen
The above suggested solution would work for the SuperstepKickoffLatch, but
unfortunately not for the other broker structures.

So the problem does persist in other cases.

A correct solution would be to prevent backpressure on forking flows, which
will come as part of the network-stack rewrite. That solves it properly, I
would prefer to go for that solution.

A temporary workaround would be the following:
 - Find the data sets that are consumed both inside the iteration and
outside the iteration. Those are typically preprocesses matrices or so.
 - Duplicate that code to actually have two different subprograms
(producing different data sets) for that.
 - Use a different data set inside the iteration and outside the iteration.

Stephan




On Thu, Sep 4, 2014 at 6:14 PM, Márton Balassi <[hidden email]>
wrote:

> Thanks, Ufuk found the relevant part in the stacktrace:
>
> "Join(Sends the rows of p with multiple keys)) (1/1)" daemon prio=10
> tid=0x00007f8928014800 nid=0x998 waiting on condition [0x00007f8912eed000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007d2668ea0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>         at
> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
>         at
> org.apache.flink.runtime.iterative.concurrent.Broker.get(Broker.java:63)
>         at
>
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:84)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
>         at java.lang.Thread.run(Thread.java:744)
>
> This part waits for the iteration head which has not been started yet and
> thus induces a deadlock.
>
> Opened a JIRA issue on it:
> https://issues.apache.org/jira/browse/FLINK-1088
>
> Thanks for the quick response by the way!
>
>
> On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[hidden email]> wrote:
>
> > Hi Marton,
> >
> > a jstack Java stacktrace can help to identify where the code got stuck.
> > Can you open a JIRA and post a stacktrace there?
> >
> > Cheers, Fabian
> >
> >
> > 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>:
> >
> > > CPU load:
> > >
> > > Tested on my 4-core machine the CPU load spikes up at the beginning of
> > the
> > > job and stays relatively high during the whole job when run with
> version
> > > 0.5, then finishes gracefully. On version 0.6 it works seemingly well
> > until
> > > the hangup. Interestingly enough even when no more log messages appear
> my
> > > CPU utilization stays 10-15% higher per core then without running the
> > job.
> > >
> > > logs:
> > >
> > > For both the implementation it starts like this:
> > >
> > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED
> > > 09/04/2014 17:05:51: DataSource (CSV Input (|)
> > > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1)
> switched
> > > to SCHEDULED
> > > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) switched
> > to
> > > SCHEDULED
> > > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys))
> (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED
> > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to SCHEDULED
> > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to SCHEDULED
> > >
> > > [Omitted quite some healthy messages...]
> > >
> > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> (1/1)
> > > switched to READY
> > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> (1/1)
> > > switched to RUNNING
> > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > > switched to READY
> > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY
> > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING
> > > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1)
> > > switched to RUNNING
> > > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING
> > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to READY
> > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to RUNNING
> > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to READY
> > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1)
> > > switched to RUNNING
> > >
> > > Flink stops here, Strato continues:
> > >
> > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to
> > FINISHING
> > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to READY
> > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to STARTING
> > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to RUNNING
> > > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) switched
> > to
> > > FINISHING
> > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> switched
> > to
> > > READY
> > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> switched
> > to
> > > STARTING
> > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> switched
> > to
> > > RUNNING
> > > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1)
> switched
> > to
> > > FINISHING
> > > 09/04/2014 17:09:09:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > > )
> > > (1/1) switched to READY
> > > 09/04/2014 17:09:09:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > > )
> > > (1/1) switched to STARTING
> > >
> > > [Omitted quite some healthy messages...]
> > >
> > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration))
> > (1/1)
> > > switched to FINISHED
> > > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1)
> > > switched to FINISHED
> > > 09/04/2014 17:09:10:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > )
> > > (1/1) switched to RUNNING
> > > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1)
> > > switched to FINISHING
> > > 09/04/2014 17:09:10:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > )
> > > (1/1) switched to FINISHING
> > > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys)
> > (1/1)
> > > switched to FINISHED
> > > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1)
> > > switched to FINISHED
> > > 09/04/2014 17:09:11:
> > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > )
> > > (1/1) switched to FINISHED
> > > 09/04/2014 17:09:11: Job execution switched to status FINISHED
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote:
> > >
> > > > Hey Marton,
> > > >
> > > > thanks for reporting the issue and the link to the repo to reproduce
> > the
> > > > problem. I will look into it later today.
> > > >
> > > > If you like, you could provide some more information in the meantime:
> > > >
> > > > - How the CPU load?
> > > > - What are TM logs saying?
> > > > - Can you give a stack trace? Where is it hanging?
> > > >
> > > >
> > > >
> > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <
> > [hidden email]
> > > >
> > > > wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > We managed to produce a code, for which the legacy Stratophere 0.5
> > > > release
> > > > > implementation works nicely, however the updated Flink 0.6 release
> > > > > implementation hangs up for slightly larger inputs.
> > > > >
> > > > >
> > > > > Please check out the issue here:
> > > > > https://github.com/mbalassi/als-comparison
> > > > >
> > > > > Any suggestions are welcome.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Marton
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink 0.6 hang up

Márton Balassi
Thanks, the workaround works flawlessly on my side.


On Thu, Sep 4, 2014 at 6:49 PM, Stephan Ewen <[hidden email]> wrote:

> The above suggested solution would work for the SuperstepKickoffLatch, but
> unfortunately not for the other broker structures.
>
> So the problem does persist in other cases.
>
> A correct solution would be to prevent backpressure on forking flows, which
> will come as part of the network-stack rewrite. That solves it properly, I
> would prefer to go for that solution.
>
> A temporary workaround would be the following:
>  - Find the data sets that are consumed both inside the iteration and
> outside the iteration. Those are typically preprocesses matrices or so.
>  - Duplicate that code to actually have two different subprograms
> (producing different data sets) for that.
>  - Use a different data set inside the iteration and outside the iteration.
>
> Stephan
>
>
>
>
> On Thu, Sep 4, 2014 at 6:14 PM, Márton Balassi <[hidden email]>
> wrote:
>
> > Thanks, Ufuk found the relevant part in the stacktrace:
> >
> > "Join(Sends the rows of p with multiple keys)) (1/1)" daemon prio=10
> > tid=0x00007f8928014800 nid=0x998 waiting on condition
> [0x00007f8912eed000]
> >    java.lang.Thread.State: WAITING (parking)
> >         at sun.misc.Unsafe.park(Native Method)
> >         - parking to wait for  <0x00000007d2668ea0> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >         at
> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >         at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >         at
> > java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
> >         at
> > org.apache.flink.runtime.iterative.concurrent.Broker.get(Broker.java:63)
> >         at
> >
> >
> org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:84)
> >         at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
> >         at
> >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
> >         at java.lang.Thread.run(Thread.java:744)
> >
> > This part waits for the iteration head which has not been started yet and
> > thus induces a deadlock.
> >
> > Opened a JIRA issue on it:
> > https://issues.apache.org/jira/browse/FLINK-1088
> >
> > Thanks for the quick response by the way!
> >
> >
> > On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> > > Hi Marton,
> > >
> > > a jstack Java stacktrace can help to identify where the code got stuck.
> > > Can you open a JIRA and post a stacktrace there?
> > >
> > > Cheers, Fabian
> > >
> > >
> > > 2014-09-04 17:25 GMT+02:00 Márton Balassi <[hidden email]>:
> > >
> > > > CPU load:
> > > >
> > > > Tested on my 4-core machine the CPU load spikes up at the beginning
> of
> > > the
> > > > job and stays relatively high during the whole job when run with
> > version
> > > > 0.5, then finishes gracefully. On version 0.6 it works seemingly well
> > > until
> > > > the hangup. Interestingly enough even when no more log messages
> appear
> > my
> > > > CPU utilization stays 10-15% higher per core then without running the
> > > job.
> > > >
> > > > logs:
> > > >
> > > > For both the implementation it starts like this:
> > > >
> > > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED
> > > > 09/04/2014 17:05:51: DataSource (CSV Input (|)
> > > > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1)
> > switched
> > > > to SCHEDULED
> > > > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1)
> switched
> > > to
> > > > SCHEDULED
> > > > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration))
> > > (1/1)
> > > > switched to SCHEDULED
> > > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> > > (1/1)
> > > > switched to SCHEDULED
> > > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > > > switched to SCHEDULED
> > > > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys))
> > (1/1)
> > > > switched to SCHEDULED
> > > > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1)
> > > > switched to SCHEDULED
> > > > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED
> > > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys)
> > > (1/1)
> > > > switched to SCHEDULED
> > > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1)
> > > > switched to SCHEDULED
> > > >
> > > > [Omitted quite some healthy messages...]
> > > >
> > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> > (1/1)
> > > > switched to READY
> > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> > (1/1)
> > > > switched to STARTING
> > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys))
> > (1/1)
> > > > switched to RUNNING
> > > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > > > switched to READY
> > > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY
> > > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1)
> > > > switched to STARTING
> > > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING
> > > > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1)
> > > > switched to RUNNING
> > > > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING
> > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > > (1/1)
> > > > switched to READY
> > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > > (1/1)
> > > > switched to STARTING
> > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys)
> > > (1/1)
> > > > switched to RUNNING
> > > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > > > switched to READY
> > > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1)
> > > > switched to STARTING
> > > > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1)
> > > > switched to RUNNING
> > > >
> > > > Flink stops here, Strato continues:
> > > >
> > > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to
> > > FINISHING
> > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > > (1/1)
> > > > switched to READY
> > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > > (1/1)
> > > > switched to STARTING
> > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration))
> > > (1/1)
> > > > switched to RUNNING
> > > > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1)
> switched
> > > to
> > > > FINISHING
> > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> > switched
> > > to
> > > > READY
> > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> > switched
> > > to
> > > > STARTING
> > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1)
> > switched
> > > to
> > > > RUNNING
> > > > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1)
> > switched
> > > to
> > > > FINISHING
> > > > 09/04/2014 17:09:09:
> > > >
> > > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > > > )
> > > > (1/1) switched to READY
> > > > 09/04/2014 17:09:09:
> > > >
> > > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1
> > > > )
> > > > (1/1) switched to STARTING
> > > >
> > > > [Omitted quite some healthy messages...]
> > > >
> > > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration))
> > > (1/1)
> > > > switched to FINISHED
> > > > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1)
> > > > switched to FINISHED
> > > > 09/04/2014 17:09:10:
> > > >
> > > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > > )
> > > > (1/1) switched to RUNNING
> > > > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1)
> > > > switched to FINISHING
> > > > 09/04/2014 17:09:10:
> > > >
> > > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > > )
> > > > (1/1) switched to FINISHING
> > > > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys)
> > > (1/1)
> > > > switched to FINISHED
> > > > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1)
> > > > switched to FINISHED
> > > > 09/04/2014 17:09:11:
> > > >
> > > >
> > >
> >
> DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3
> > > > )
> > > > (1/1) switched to FINISHED
> > > > 09/04/2014 17:09:11: Job execution switched to status FINISHED
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[hidden email]> wrote:
> > > >
> > > > > Hey Marton,
> > > > >
> > > > > thanks for reporting the issue and the link to the repo to
> reproduce
> > > the
> > > > > problem. I will look into it later today.
> > > > >
> > > > > If you like, you could provide some more information in the
> meantime:
> > > > >
> > > > > - How the CPU load?
> > > > > - What are TM logs saying?
> > > > > - Can you give a stack trace? Where is it hanging?
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi <
> > > [hidden email]
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hey,
> > > > > >
> > > > > > We managed to produce a code, for which the legacy Stratophere
> 0.5
> > > > > release
> > > > > > implementation works nicely, however the updated Flink 0.6
> release
> > > > > > implementation hangs up for slightly larger inputs.
> > > > > >
> > > > > >
> > > > > > Please check out the issue here:
> > > > > > https://github.com/mbalassi/als-comparison
> > > > > >
> > > > > > Any suggestions are welcome.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Marton
> > > > > >
> > > > >
> > > >
> > >
> >
>