ALS implementation

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

ALS implementation

Felix Neutatz
Hi,

I played a bit with the ALS recommender algorithm. I used the movielens
dataset: http://files.grouplens.org/datasets/movielens/ml-latest-README.html

The rating matrix has 21.063.128 entries (ratings).

I run the algorithm with 3 configurations:

1. standard jvm heap space:

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(100)

throws:
java.lang.RuntimeException: Hash Join bug in memory management: Memory
buffers leaked.
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

2. 5G jvm heap space

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(150)

throws:

java.lang.NullPointerException
at
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

3. 14G jvm heap space

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(150)
   .setTemporaryPath("/tmp/tmpALS")

-> works

Is this a Flink problem or is it just my bad configuration?

Best regards,
Felix
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Ufuk Celebi-2
I think both are bugs. They are triggered by the different memory
configurations.

@chiwan: is the 2nd error fixed by your recent change?

@felix: if yes, can you try the 2nd run again with the changes?

On Thursday, June 4, 2015, Felix Neutatz <[hidden email]> wrote:

> Hi,
>
> I played a bit with the ALS recommender algorithm. I used the movielens
> dataset:
> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
>
> The rating matrix has 21.063.128 entries (ratings).
>
> I run the algorithm with 3 configurations:
>
> 1. standard jvm heap space:
>
> val als = ALS()
>    .setIterations(10)
>    .setNumFactors(10)
>    .setBlocks(100)
>
> throws:
> java.lang.RuntimeException: Hash Join bug in memory management: Memory
> buffers leaked.
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> at
>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> 2. 5G jvm heap space
>
> val als = ALS()
>    .setIterations(10)
>    .setNumFactors(10)
>    .setBlocks(150)
>
> throws:
>
> java.lang.NullPointerException
> at
>
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> at
>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> 3. 14G jvm heap space
>
> val als = ALS()
>    .setIterations(10)
>    .setNumFactors(10)
>    .setBlocks(150)
>    .setTemporaryPath("/tmp/tmpALS")
>
> -> works
>
> Is this a Flink problem or is it just my bad configuration?
>
> Best regards,
> Felix
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Chiwan Park
Hi. The second bug is fixed by the recent change in PR.
But there is just no test case for first bug.

Regards,
Chiwan Park

> On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
>
> I think both are bugs. They are triggered by the different memory
> configurations.
>
> @chiwan: is the 2nd error fixed by your recent change?
>
> @felix: if yes, can you try the 2nd run again with the changes?
>
> On Thursday, June 4, 2015, Felix Neutatz <[hidden email]> wrote:
>
>> Hi,
>>
>> I played a bit with the ALS recommender algorithm. I used the movielens
>> dataset:
>> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
>>
>> The rating matrix has 21.063.128 entries (ratings).
>>
>> I run the algorithm with 3 configurations:
>>
>> 1. standard jvm heap space:
>>
>> val als = ALS()
>>   .setIterations(10)
>>   .setNumFactors(10)
>>   .setBlocks(100)
>>
>> throws:
>> java.lang.RuntimeException: Hash Join bug in memory management: Memory
>> buffers leaked.
>> at
>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
>> at
>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>> at
>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>> at
>>
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>> at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>> at
>>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>> at
>>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2. 5G jvm heap space
>>
>> val als = ALS()
>>   .setIterations(10)
>>   .setNumFactors(10)
>>   .setBlocks(150)
>>
>> throws:
>>
>> java.lang.NullPointerException
>> at
>>
>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>> at
>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
>> at
>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
>> at
>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
>> at
>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>> at
>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>> at
>>
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>> at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>> at
>>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>> at
>>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 3. 14G jvm heap space
>>
>> val als = ALS()
>>   .setIterations(10)
>>   .setNumFactors(10)
>>   .setBlocks(150)
>>   .setTemporaryPath("/tmp/tmpALS")
>>
>> -> works
>>
>> Is this a Flink problem or is it just my bad configuration?
>>
>> Best regards,
>> Felix
>>





Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

till.rohrmann
If the first error is not fixed by Chiwans PR, then we should create a JIRA
for it to not forget it.

@Felix: Chiwan's PR is here [1]. Could you try to run ALS again with this
version?

Cheers,
Till

[1] https://github.com/apache/flink/pull/751

On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[hidden email]> wrote:

> Hi. The second bug is fixed by the recent change in PR.
> But there is just no test case for first bug.
>
> Regards,
> Chiwan Park
>
> > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
> >
> > I think both are bugs. They are triggered by the different memory
> > configurations.
> >
> > @chiwan: is the 2nd error fixed by your recent change?
> >
> > @felix: if yes, can you try the 2nd run again with the changes?
> >
> > On Thursday, June 4, 2015, Felix Neutatz <[hidden email]> wrote:
> >
> >> Hi,
> >>
> >> I played a bit with the ALS recommender algorithm. I used the movielens
> >> dataset:
> >> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> >>
> >> The rating matrix has 21.063.128 entries (ratings).
> >>
> >> I run the algorithm with 3 configurations:
> >>
> >> 1. standard jvm heap space:
> >>
> >> val als = ALS()
> >>   .setIterations(10)
> >>   .setNumFactors(10)
> >>   .setBlocks(100)
> >>
> >> throws:
> >> java.lang.RuntimeException: Hash Join bug in memory management: Memory
> >> buffers leaked.
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> >> at
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> 2. 5G jvm heap space
> >>
> >> val als = ALS()
> >>   .setIterations(10)
> >>   .setNumFactors(10)
> >>   .setBlocks(150)
> >>
> >> throws:
> >>
> >> java.lang.NullPointerException
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> >> at
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> >> at
> >>
> >>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> 3. 14G jvm heap space
> >>
> >> val als = ALS()
> >>   .setIterations(10)
> >>   .setNumFactors(10)
> >>   .setBlocks(150)
> >>   .setTemporaryPath("/tmp/tmpALS")
> >>
> >> -> works
> >>
> >> Is this a Flink problem or is it just my bad configuration?
> >>
> >> Best regards,
> >> Felix
> >>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Felix Neutatz
Yes, I will try it again with the newest update :)

2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]>:

> If the first error is not fixed by Chiwans PR, then we should create a JIRA
> for it to not forget it.
>
> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with this
> version?
>
> Cheers,
> Till
>
> [1] https://github.com/apache/flink/pull/751
>
> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[hidden email]>
> wrote:
>
> > Hi. The second bug is fixed by the recent change in PR.
> > But there is just no test case for first bug.
> >
> > Regards,
> > Chiwan Park
> >
> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
> > >
> > > I think both are bugs. They are triggered by the different memory
> > > configurations.
> > >
> > > @chiwan: is the 2nd error fixed by your recent change?
> > >
> > > @felix: if yes, can you try the 2nd run again with the changes?
> > >
> > > On Thursday, June 4, 2015, Felix Neutatz <[hidden email]>
> wrote:
> > >
> > >> Hi,
> > >>
> > >> I played a bit with the ALS recommender algorithm. I used the
> movielens
> > >> dataset:
> > >> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > >>
> > >> The rating matrix has 21.063.128 entries (ratings).
> > >>
> > >> I run the algorithm with 3 configurations:
> > >>
> > >> 1. standard jvm heap space:
> > >>
> > >> val als = ALS()
> > >>   .setIterations(10)
> > >>   .setNumFactors(10)
> > >>   .setBlocks(100)
> > >>
> > >> throws:
> > >> java.lang.RuntimeException: Hash Join bug in memory management: Memory
> > >> buffers leaked.
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > >> at
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > >> at java.lang.Thread.run(Thread.java:745)
> > >>
> > >> 2. 5G jvm heap space
> > >>
> > >> val als = ALS()
> > >>   .setIterations(10)
> > >>   .setNumFactors(10)
> > >>   .setBlocks(150)
> > >>
> > >> throws:
> > >>
> > >> java.lang.NullPointerException
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > >> at
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > >> at java.lang.Thread.run(Thread.java:745)
> > >>
> > >> 3. 14G jvm heap space
> > >>
> > >> val als = ALS()
> > >>   .setIterations(10)
> > >>   .setNumFactors(10)
> > >>   .setBlocks(150)
> > >>   .setTemporaryPath("/tmp/tmpALS")
> > >>
> > >> -> works
> > >>
> > >> Is this a Flink problem or is it just my bad configuration?
> > >>
> > >> Best regards,
> > >> Felix
> > >>
> >
> >
> >
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Felix Neutatz
after bug fix:

for 100 blocks and standard jvm heap space

Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of
recursions, without reducing partitions enough to be memory resident.
Probably cause: Too many duplicate keys.
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)


for 150 blocks and 5G jvm heap space

Caused by: java.lang.NullPointerException
at
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
...

Best regards,
Felix

2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:

> Yes, I will try it again with the newest update :)
>
> 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]>:
>
>> If the first error is not fixed by Chiwans PR, then we should create a
>> JIRA
>> for it to not forget it.
>>
>> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with this
>> version?
>>
>> Cheers,
>> Till
>>
>> [1] https://github.com/apache/flink/pull/751
>>
>> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[hidden email]>
>> wrote:
>>
>> > Hi. The second bug is fixed by the recent change in PR.
>> > But there is just no test case for first bug.
>> >
>> > Regards,
>> > Chiwan Park
>> >
>> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
>> > >
>> > > I think both are bugs. They are triggered by the different memory
>> > > configurations.
>> > >
>> > > @chiwan: is the 2nd error fixed by your recent change?
>> > >
>> > > @felix: if yes, can you try the 2nd run again with the changes?
>> > >
>> > > On Thursday, June 4, 2015, Felix Neutatz <[hidden email]>
>> wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> I played a bit with the ALS recommender algorithm. I used the
>> movielens
>> > >> dataset:
>> > >> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
>> > >>
>> > >> The rating matrix has 21.063.128 entries (ratings).
>> > >>
>> > >> I run the algorithm with 3 configurations:
>> > >>
>> > >> 1. standard jvm heap space:
>> > >>
>> > >> val als = ALS()
>> > >>   .setIterations(10)
>> > >>   .setNumFactors(10)
>> > >>   .setBlocks(100)
>> > >>
>> > >> throws:
>> > >> java.lang.RuntimeException: Hash Join bug in memory management:
>> Memory
>> > >> buffers leaked.
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>> > >> at
>> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> > >> at java.lang.Thread.run(Thread.java:745)
>> > >>
>> > >> 2. 5G jvm heap space
>> > >>
>> > >> val als = ALS()
>> > >>   .setIterations(10)
>> > >>   .setNumFactors(10)
>> > >>   .setBlocks(150)
>> > >>
>> > >> throws:
>> > >>
>> > >> java.lang.NullPointerException
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>> > >> at
>> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> > >> at java.lang.Thread.run(Thread.java:745)
>> > >>
>> > >> 3. 14G jvm heap space
>> > >>
>> > >> val als = ALS()
>> > >>   .setIterations(10)
>> > >>   .setNumFactors(10)
>> > >>   .setBlocks(150)
>> > >>   .setTemporaryPath("/tmp/tmpALS")
>> > >>
>> > >> -> works
>> > >>
>> > >> Is this a Flink problem or is it just my bad configuration?
>> > >>
>> > >> Best regards,
>> > >> Felix
>> > >>
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Andra Lungu
Hi Felix,

Passing a JoinHint to your function should help.
see:
http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E

Cheers,
Andra

On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <[hidden email]>
wrote:

> after bug fix:
>
> for 100 blocks and standard jvm heap space
>
> Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of
> recursions, without reducing partitions enough to be memory resident.
> Probably cause: Too many duplicate keys.
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> at
>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> at
>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
>
> for 150 blocks and 5G jvm heap space
>
> Caused by: java.lang.NullPointerException
> at
>
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> ...
>
> Best regards,
> Felix
>
> 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:
>
> > Yes, I will try it again with the newest update :)
> >
> > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]>:
> >
> >> If the first error is not fixed by Chiwans PR, then we should create a
> >> JIRA
> >> for it to not forget it.
> >>
> >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
> this
> >> version?
> >>
> >> Cheers,
> >> Till
> >>
> >> [1] https://github.com/apache/flink/pull/751
> >>
> >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[hidden email]>
> >> wrote:
> >>
> >> > Hi. The second bug is fixed by the recent change in PR.
> >> > But there is just no test case for first bug.
> >> >
> >> > Regards,
> >> > Chiwan Park
> >> >
> >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
> >> > >
> >> > > I think both are bugs. They are triggered by the different memory
> >> > > configurations.
> >> > >
> >> > > @chiwan: is the 2nd error fixed by your recent change?
> >> > >
> >> > > @felix: if yes, can you try the 2nd run again with the changes?
> >> > >
> >> > > On Thursday, June 4, 2015, Felix Neutatz <[hidden email]>
> >> wrote:
> >> > >
> >> > >> Hi,
> >> > >>
> >> > >> I played a bit with the ALS recommender algorithm. I used the
> >> movielens
> >> > >> dataset:
> >> > >>
> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> >> > >>
> >> > >> The rating matrix has 21.063.128 entries (ratings).
> >> > >>
> >> > >> I run the algorithm with 3 configurations:
> >> > >>
> >> > >> 1. standard jvm heap space:
> >> > >>
> >> > >> val als = ALS()
> >> > >>   .setIterations(10)
> >> > >>   .setNumFactors(10)
> >> > >>   .setBlocks(100)
> >> > >>
> >> > >> throws:
> >> > >> java.lang.RuntimeException: Hash Join bug in memory management:
> >> Memory
> >> > >> buffers leaked.
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> >> > >> at
> >> >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> > >> at java.lang.Thread.run(Thread.java:745)
> >> > >>
> >> > >> 2. 5G jvm heap space
> >> > >>
> >> > >> val als = ALS()
> >> > >>   .setIterations(10)
> >> > >>   .setNumFactors(10)
> >> > >>   .setBlocks(150)
> >> > >>
> >> > >> throws:
> >> > >>
> >> > >> java.lang.NullPointerException
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> >> > >> at
> >> >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> >> > >> at
> >> > >>
> >> > >>
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> > >> at java.lang.Thread.run(Thread.java:745)
> >> > >>
> >> > >> 3. 14G jvm heap space
> >> > >>
> >> > >> val als = ALS()
> >> > >>   .setIterations(10)
> >> > >>   .setNumFactors(10)
> >> > >>   .setBlocks(150)
> >> > >>   .setTemporaryPath("/tmp/tmpALS")
> >> > >>
> >> > >> -> works
> >> > >>
> >> > >> Is this a Flink problem or is it just my bad configuration?
> >> > >>
> >> > >> Best regards,
> >> > >> Felix
> >> > >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Felix Neutatz
now the question is, which join in the ALS implementation is the problem :)

2015-06-04 19:09 GMT+02:00 Andra Lungu <[hidden email]>:

> Hi Felix,
>
> Passing a JoinHint to your function should help.
> see:
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
>
> Cheers,
> Andra
>
> On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <[hidden email]>
> wrote:
>
> > after bug fix:
> >
> > for 100 blocks and standard jvm heap space
> >
> > Caused by: java.lang.RuntimeException: Hash join exceeded maximum number
> of
> > recursions, without reducing partitions enough to be memory resident.
> > Probably cause: Too many duplicate keys.
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > at
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > for 150 blocks and 5G jvm heap space
> >
> > Caused by: java.lang.NullPointerException
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > ...
> >
> > Best regards,
> > Felix
> >
> > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:
> >
> > > Yes, I will try it again with the newest update :)
> > >
> > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]>:
> > >
> > >> If the first error is not fixed by Chiwans PR, then we should create a
> > >> JIRA
> > >> for it to not forget it.
> > >>
> > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
> > this
> > >> version?
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> [1] https://github.com/apache/flink/pull/751
> > >>
> > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[hidden email]>
> > >> wrote:
> > >>
> > >> > Hi. The second bug is fixed by the recent change in PR.
> > >> > But there is just no test case for first bug.
> > >> >
> > >> > Regards,
> > >> > Chiwan Park
> > >> >
> > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
> > >> > >
> > >> > > I think both are bugs. They are triggered by the different memory
> > >> > > configurations.
> > >> > >
> > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > >> > >
> > >> > > @felix: if yes, can you try the 2nd run again with the changes?
> > >> > >
> > >> > > On Thursday, June 4, 2015, Felix Neutatz <[hidden email]>
> > >> wrote:
> > >> > >
> > >> > >> Hi,
> > >> > >>
> > >> > >> I played a bit with the ALS recommender algorithm. I used the
> > >> movielens
> > >> > >> dataset:
> > >> > >>
> > http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > >> > >>
> > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > >> > >>
> > >> > >> I run the algorithm with 3 configurations:
> > >> > >>
> > >> > >> 1. standard jvm heap space:
> > >> > >>
> > >> > >> val als = ALS()
> > >> > >>   .setIterations(10)
> > >> > >>   .setNumFactors(10)
> > >> > >>   .setBlocks(100)
> > >> > >>
> > >> > >> throws:
> > >> > >> java.lang.RuntimeException: Hash Join bug in memory management:
> > >> Memory
> > >> > >> buffers leaked.
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > >> > >> at
> > >> >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > >> > >> at java.lang.Thread.run(Thread.java:745)
> > >> > >>
> > >> > >> 2. 5G jvm heap space
> > >> > >>
> > >> > >> val als = ALS()
> > >> > >>   .setIterations(10)
> > >> > >>   .setNumFactors(10)
> > >> > >>   .setBlocks(150)
> > >> > >>
> > >> > >> throws:
> > >> > >>
> > >> > >> java.lang.NullPointerException
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > >> > >> at
> > >> >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > >> > >> at java.lang.Thread.run(Thread.java:745)
> > >> > >>
> > >> > >> 3. 14G jvm heap space
> > >> > >>
> > >> > >> val als = ALS()
> > >> > >>   .setIterations(10)
> > >> > >>   .setNumFactors(10)
> > >> > >>   .setBlocks(150)
> > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > >> > >>
> > >> > >> -> works
> > >> > >>
> > >> > >> Is this a Flink problem or is it just my bad configuration?
> > >> > >>
> > >> > >> Best regards,
> > >> > >> Felix
> > >> > >>
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Chiwan Park
I think that the NPE in second condition is bug in HashTable.
I just found that ConnectedComponents with small memory segments causes same error. (I thought I fixed the bug, but It is still alive.)

Regards,
Chiwan Park
 

> On Jun 5, 2015, at 2:35 AM, Felix Neutatz <[hidden email]> wrote:
>
> now the question is, which join in the ALS implementation is the problem :)
>
> 2015-06-04 19:09 GMT+02:00 Andra Lungu <[hidden email]>:
>
>> Hi Felix,
>>
>> Passing a JoinHint to your function should help.
>> see:
>>
>> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
>>
>> Cheers,
>> Andra
>>
>> On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <[hidden email]>
>> wrote:
>>
>>> after bug fix:
>>>
>>> for 100 blocks and standard jvm heap space
>>>
>>> Caused by: java.lang.RuntimeException: Hash join exceeded maximum number
>> of
>>> recursions, without reducing partitions enough to be memory resident.
>>> Probably cause: Too many duplicate keys.
>>> at
>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
>>> at
>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
>>> at
>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
>>> at
>>>
>>>
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>> at
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>> at
>>>
>>>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>> at
>>>
>>>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> for 150 blocks and 5G jvm heap space
>>>
>>> Caused by: java.lang.NullPointerException
>>> at
>>>
>>>
>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>>> ...
>>>
>>> Best regards,
>>> Felix
>>>
>>> 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:
>>>
>>>> Yes, I will try it again with the newest update :)
>>>>
>>>> 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]>:
>>>>
>>>>> If the first error is not fixed by Chiwans PR, then we should create a
>>>>> JIRA
>>>>> for it to not forget it.
>>>>>
>>>>> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
>>> this
>>>>> version?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/751
>>>>>
>>>>> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi. The second bug is fixed by the recent change in PR.
>>>>>> But there is just no test case for first bug.
>>>>>>
>>>>>> Regards,
>>>>>> Chiwan Park
>>>>>>
>>>>>>> On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
>>>>>>>
>>>>>>> I think both are bugs. They are triggered by the different memory
>>>>>>> configurations.
>>>>>>>
>>>>>>> @chiwan: is the 2nd error fixed by your recent change?
>>>>>>>
>>>>>>> @felix: if yes, can you try the 2nd run again with the changes?
>>>>>>>
>>>>>>> On Thursday, June 4, 2015, Felix Neutatz <[hidden email]>
>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I played a bit with the ALS recommender algorithm. I used the
>>>>> movielens
>>>>>>>> dataset:
>>>>>>>>
>>> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
>>>>>>>>
>>>>>>>> The rating matrix has 21.063.128 entries (ratings).
>>>>>>>>
>>>>>>>> I run the algorithm with 3 configurations:
>>>>>>>>
>>>>>>>> 1. standard jvm heap space:
>>>>>>>>
>>>>>>>> val als = ALS()
>>>>>>>>  .setIterations(10)
>>>>>>>>  .setNumFactors(10)
>>>>>>>>  .setBlocks(100)
>>>>>>>>
>>>>>>>> throws:
>>>>>>>> java.lang.RuntimeException: Hash Join bug in memory management:
>>>>> Memory
>>>>>>>> buffers leaked.
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>>>>>>> at
>>>>>>
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>
>>>>>>>> 2. 5G jvm heap space
>>>>>>>>
>>>>>>>> val als = ALS()
>>>>>>>>  .setIterations(10)
>>>>>>>>  .setNumFactors(10)
>>>>>>>>  .setBlocks(150)
>>>>>>>>
>>>>>>>> throws:
>>>>>>>>
>>>>>>>> java.lang.NullPointerException
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>>>>>>> at
>>>>>>
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>
>>>>>>>> 3. 14G jvm heap space
>>>>>>>>
>>>>>>>> val als = ALS()
>>>>>>>>  .setIterations(10)
>>>>>>>>  .setNumFactors(10)
>>>>>>>>  .setBlocks(150)
>>>>>>>>  .setTemporaryPath("/tmp/tmpALS")
>>>>>>>>
>>>>>>>> -> works
>>>>>>>>
>>>>>>>> Is this a Flink problem or is it just my bad configuration?
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Felix
>>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>



Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

till.rohrmann
In reply to this post by Andra Lungu
I think it is not a problem of join hints, but rather of too little memory
for the join operator. If you set the temporary directory, then the job
will be split in smaller parts and thus each operator gets more memory.
Alternatively, you can increase the memory you give to the Task Managers.

The problem with the NullPointerException won't be solved by this, though.
Could you send the full stack trace for that?

Cheers,
Till
On Jun 4, 2015 7:10 PM, "Andra Lungu" <[hidden email]> wrote:

> Hi Felix,
>
> Passing a JoinHint to your function should help.
> see:
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
>
> Cheers,
> Andra
>
> On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <[hidden email]>
> wrote:
>
> > after bug fix:
> >
> > for 100 blocks and standard jvm heap space
> >
> > Caused by: java.lang.RuntimeException: Hash join exceeded maximum number
> of
> > recursions, without reducing partitions enough to be memory resident.
> > Probably cause: Too many duplicate keys.
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > at
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > for 150 blocks and 5G jvm heap space
> >
> > Caused by: java.lang.NullPointerException
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > ...
> >
> > Best regards,
> > Felix
> >
> > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:
> >
> > > Yes, I will try it again with the newest update :)
> > >
> > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]>:
> > >
> > >> If the first error is not fixed by Chiwans PR, then we should create a
> > >> JIRA
> > >> for it to not forget it.
> > >>
> > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
> > this
> > >> version?
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> [1] https://github.com/apache/flink/pull/751
> > >>
> > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[hidden email]>
> > >> wrote:
> > >>
> > >> > Hi. The second bug is fixed by the recent change in PR.
> > >> > But there is just no test case for first bug.
> > >> >
> > >> > Regards,
> > >> > Chiwan Park
> > >> >
> > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
> > >> > >
> > >> > > I think both are bugs. They are triggered by the different memory
> > >> > > configurations.
> > >> > >
> > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > >> > >
> > >> > > @felix: if yes, can you try the 2nd run again with the changes?
> > >> > >
> > >> > > On Thursday, June 4, 2015, Felix Neutatz <[hidden email]>
> > >> wrote:
> > >> > >
> > >> > >> Hi,
> > >> > >>
> > >> > >> I played a bit with the ALS recommender algorithm. I used the
> > >> movielens
> > >> > >> dataset:
> > >> > >>
> > http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > >> > >>
> > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > >> > >>
> > >> > >> I run the algorithm with 3 configurations:
> > >> > >>
> > >> > >> 1. standard jvm heap space:
> > >> > >>
> > >> > >> val als = ALS()
> > >> > >>   .setIterations(10)
> > >> > >>   .setNumFactors(10)
> > >> > >>   .setBlocks(100)
> > >> > >>
> > >> > >> throws:
> > >> > >> java.lang.RuntimeException: Hash Join bug in memory management:
> > >> Memory
> > >> > >> buffers leaked.
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > >> > >> at
> > >> >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > >> > >> at java.lang.Thread.run(Thread.java:745)
> > >> > >>
> > >> > >> 2. 5G jvm heap space
> > >> > >>
> > >> > >> val als = ALS()
> > >> > >>   .setIterations(10)
> > >> > >>   .setNumFactors(10)
> > >> > >>   .setBlocks(150)
> > >> > >>
> > >> > >> throws:
> > >> > >>
> > >> > >> java.lang.NullPointerException
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > >> > >> at
> > >> >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > >> > >> at
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > >> > >> at java.lang.Thread.run(Thread.java:745)
> > >> > >>
> > >> > >> 3. 14G jvm heap space
> > >> > >>
> > >> > >> val als = ALS()
> > >> > >>   .setIterations(10)
> > >> > >>   .setNumFactors(10)
> > >> > >>   .setBlocks(150)
> > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > >> > >>
> > >> > >> -> works
> > >> > >>
> > >> > >> Is this a Flink problem or is it just my bad configuration?
> > >> > >>
> > >> > >> Best regards,
> > >> > >> Felix
> > >> > >>
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Felix Neutatz
Shouldn't Flink figure it out on its own, how much memory there is for the
join?

The detailed trace for the Nullpointer exception can be found here:
https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt

Best regards,
Felix

2015-06-04 19:41 GMT+02:00 Till Rohrmann <[hidden email]>:

> I think it is not a problem of join hints, but rather of too little memory
> for the join operator. If you set the temporary directory, then the job
> will be split in smaller parts and thus each operator gets more memory.
> Alternatively, you can increase the memory you give to the Task Managers.
>
> The problem with the NullPointerException won't be solved by this, though.
> Could you send the full stack trace for that?
>
> Cheers,
> Till
> On Jun 4, 2015 7:10 PM, "Andra Lungu" <[hidden email]> wrote:
>
> > Hi Felix,
> >
> > Passing a JoinHint to your function should help.
> > see:
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
> >
> > Cheers,
> > Andra
> >
> > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <[hidden email]>
> > wrote:
> >
> > > after bug fix:
> > >
> > > for 100 blocks and standard jvm heap space
> > >
> > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
> number
> > of
> > > recursions, without reducing partitions enough to be memory resident.
> > > Probably cause: Too many duplicate keys.
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > at
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > for 150 blocks and 5G jvm heap space
> > >
> > > Caused by: java.lang.NullPointerException
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > ...
> > >
> > > Best regards,
> > > Felix
> > >
> > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:
> > >
> > > > Yes, I will try it again with the newest update :)
> > > >
> > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]>:
> > > >
> > > >> If the first error is not fixed by Chiwans PR, then we should
> create a
> > > >> JIRA
> > > >> for it to not forget it.
> > > >>
> > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
> > > this
> > > >> version?
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> [1] https://github.com/apache/flink/pull/751
> > > >>
> > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <[hidden email]
> >
> > > >> wrote:
> > > >>
> > > >> > Hi. The second bug is fixed by the recent change in PR.
> > > >> > But there is just no test case for first bug.
> > > >> >
> > > >> > Regards,
> > > >> > Chiwan Park
> > > >> >
> > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]> wrote:
> > > >> > >
> > > >> > > I think both are bugs. They are triggered by the different
> memory
> > > >> > > configurations.
> > > >> > >
> > > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > > >> > >
> > > >> > > @felix: if yes, can you try the 2nd run again with the changes?
> > > >> > >
> > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
> [hidden email]>
> > > >> wrote:
> > > >> > >
> > > >> > >> Hi,
> > > >> > >>
> > > >> > >> I played a bit with the ALS recommender algorithm. I used the
> > > >> movielens
> > > >> > >> dataset:
> > > >> > >>
> > > http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > > >> > >>
> > > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > > >> > >>
> > > >> > >> I run the algorithm with 3 configurations:
> > > >> > >>
> > > >> > >> 1. standard jvm heap space:
> > > >> > >>
> > > >> > >> val als = ALS()
> > > >> > >>   .setIterations(10)
> > > >> > >>   .setNumFactors(10)
> > > >> > >>   .setBlocks(100)
> > > >> > >>
> > > >> > >> throws:
> > > >> > >> java.lang.RuntimeException: Hash Join bug in memory management:
> > > >> Memory
> > > >> > >> buffers leaked.
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > >> > >> at
> > > >> >
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > >> > >>
> > > >> > >> 2. 5G jvm heap space
> > > >> > >>
> > > >> > >> val als = ALS()
> > > >> > >>   .setIterations(10)
> > > >> > >>   .setNumFactors(10)
> > > >> > >>   .setBlocks(150)
> > > >> > >>
> > > >> > >> throws:
> > > >> > >>
> > > >> > >> java.lang.NullPointerException
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > >> > >> at
> > > >> >
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > >> > >> at
> > > >> > >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > >> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > >> > >>
> > > >> > >> 3. 14G jvm heap space
> > > >> > >>
> > > >> > >> val als = ALS()
> > > >> > >>   .setIterations(10)
> > > >> > >>   .setNumFactors(10)
> > > >> > >>   .setBlocks(150)
> > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > > >> > >>
> > > >> > >> -> works
> > > >> > >>
> > > >> > >> Is this a Flink problem or is it just my bad configuration?
> > > >> > >>
> > > >> > >> Best regards,
> > > >> > >> Felix
> > > >> > >>
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Fabian Hueske-2
Hi, the problem with the "maximum number of recursions" is the distribution
of join keys.

If a partition does not fit into memory, HybridHashJoin tries to solve this
problem by recursively partitioning the partition using a different hash
function.
If join keys are heavily skewed, this strategy might fail. Hashing the same
values with a different hash function won't put them into different
partitions so the partition remains too large to fit into memory and
another recursion is done. At some point, the HybridHashJoin gives up and
throws the too many recursions exception.

There are three solutions to this problem:
1. use more memory
2. use a sort-merge join
3. switch the inputs of the hash-join

Cheers, Fabian

2015-06-05 9:13 GMT+02:00 Felix Neutatz <[hidden email]>:

> Shouldn't Flink figure it out on its own, how much memory there is for the
> join?
>
> The detailed trace for the Nullpointer exception can be found here:
>
> https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt
>
> Best regards,
> Felix
>
> 2015-06-04 19:41 GMT+02:00 Till Rohrmann <[hidden email]>:
>
> > I think it is not a problem of join hints, but rather of too little
> memory
> > for the join operator. If you set the temporary directory, then the job
> > will be split in smaller parts and thus each operator gets more memory.
> > Alternatively, you can increase the memory you give to the Task Managers.
> >
> > The problem with the NullPointerException won't be solved by this,
> though.
> > Could you send the full stack trace for that?
> >
> > Cheers,
> > Till
> > On Jun 4, 2015 7:10 PM, "Andra Lungu" <[hidden email]> wrote:
> >
> > > Hi Felix,
> > >
> > > Passing a JoinHint to your function should help.
> > > see:
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
> > >
> > > Cheers,
> > > Andra
> > >
> > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <[hidden email]>
> > > wrote:
> > >
> > > > after bug fix:
> > > >
> > > > for 100 blocks and standard jvm heap space
> > > >
> > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
> > number
> > > of
> > > > recursions, without reducing partitions enough to be memory resident.
> > > > Probably cause: Too many duplicate keys.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > at
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > at java.lang.Thread.run(Thread.java:745)
> > > >
> > > >
> > > > for 150 blocks and 5G jvm heap space
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > ...
> > > >
> > > > Best regards,
> > > > Felix
> > > >
> > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:
> > > >
> > > > > Yes, I will try it again with the newest update :)
> > > > >
> > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]
> >:
> > > > >
> > > > >> If the first error is not fixed by Chiwans PR, then we should
> > create a
> > > > >> JIRA
> > > > >> for it to not forget it.
> > > > >>
> > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again
> with
> > > > this
> > > > >> version?
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > > >> [1] https://github.com/apache/flink/pull/751
> > > > >>
> > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <
> [hidden email]
> > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hi. The second bug is fixed by the recent change in PR.
> > > > >> > But there is just no test case for first bug.
> > > > >> >
> > > > >> > Regards,
> > > > >> > Chiwan Park
> > > > >> >
> > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]>
> wrote:
> > > > >> > >
> > > > >> > > I think both are bugs. They are triggered by the different
> > memory
> > > > >> > > configurations.
> > > > >> > >
> > > > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > > > >> > >
> > > > >> > > @felix: if yes, can you try the 2nd run again with the
> changes?
> > > > >> > >
> > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
> > [hidden email]>
> > > > >> wrote:
> > > > >> > >
> > > > >> > >> Hi,
> > > > >> > >>
> > > > >> > >> I played a bit with the ALS recommender algorithm. I used the
> > > > >> movielens
> > > > >> > >> dataset:
> > > > >> > >>
> > > > http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > > > >> > >>
> > > > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > > > >> > >>
> > > > >> > >> I run the algorithm with 3 configurations:
> > > > >> > >>
> > > > >> > >> 1. standard jvm heap space:
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(100)
> > > > >> > >>
> > > > >> > >> throws:
> > > > >> > >> java.lang.RuntimeException: Hash Join bug in memory
> management:
> > > > >> Memory
> > > > >> > >> buffers leaked.
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > >> > >> at
> > > > >> >
> > > >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >> > >> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > >> > >>
> > > > >> > >> 2. 5G jvm heap space
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(150)
> > > > >> > >>
> > > > >> > >> throws:
> > > > >> > >>
> > > > >> > >> java.lang.NullPointerException
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > >> > >> at
> > > > >> >
> > > >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >> > >> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > >> > >>
> > > > >> > >> 3. 14G jvm heap space
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(150)
> > > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > > > >> > >>
> > > > >> > >> -> works
> > > > >> > >>
> > > > >> > >> Is this a Flink problem or is it just my bad configuration?
> > > > >> > >>
> > > > >> > >> Best regards,
> > > > >> > >> Felix
> > > > >> > >>
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Stephan Ewen
In reply to this post by Felix Neutatz
There are two different issues here:

1) Flink does figure out how much memory a join gets, but that memory may
be too little for the join to accept it. Flink plans highly conservative
right now - too conservative often, which is something we have on the
immediate roadmap to fix.

2) The "Hash Join exceeded recursions" problems is made worse by little
memory, but is usually an indicator that the join is running the wrong way
anyways. The side with many duplicates should rarely be the build side, but
in most cases the probe side.


Stephan




On Fri, Jun 5, 2015 at 9:13 AM, Felix Neutatz <[hidden email]>
wrote:

> Shouldn't Flink figure it out on its own, how much memory there is for the
> join?
>
> The detailed trace for the Nullpointer exception can be found here:
>
> https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt
>
> Best regards,
> Felix
>
> 2015-06-04 19:41 GMT+02:00 Till Rohrmann <[hidden email]>:
>
> > I think it is not a problem of join hints, but rather of too little
> memory
> > for the join operator. If you set the temporary directory, then the job
> > will be split in smaller parts and thus each operator gets more memory.
> > Alternatively, you can increase the memory you give to the Task Managers.
> >
> > The problem with the NullPointerException won't be solved by this,
> though.
> > Could you send the full stack trace for that?
> >
> > Cheers,
> > Till
> > On Jun 4, 2015 7:10 PM, "Andra Lungu" <[hidden email]> wrote:
> >
> > > Hi Felix,
> > >
> > > Passing a JoinHint to your function should help.
> > > see:
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
> > >
> > > Cheers,
> > > Andra
> > >
> > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <[hidden email]>
> > > wrote:
> > >
> > > > after bug fix:
> > > >
> > > > for 100 blocks and standard jvm heap space
> > > >
> > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
> > number
> > > of
> > > > recursions, without reducing partitions enough to be memory resident.
> > > > Probably cause: Too many duplicate keys.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > at
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > at java.lang.Thread.run(Thread.java:745)
> > > >
> > > >
> > > > for 150 blocks and 5G jvm heap space
> > > >
> > > > Caused by: java.lang.NullPointerException
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > ...
> > > >
> > > > Best regards,
> > > > Felix
> > > >
> > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:
> > > >
> > > > > Yes, I will try it again with the newest update :)
> > > > >
> > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <[hidden email]
> >:
> > > > >
> > > > >> If the first error is not fixed by Chiwans PR, then we should
> > create a
> > > > >> JIRA
> > > > >> for it to not forget it.
> > > > >>
> > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again
> with
> > > > this
> > > > >> version?
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > > >> [1] https://github.com/apache/flink/pull/751
> > > > >>
> > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <
> [hidden email]
> > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hi. The second bug is fixed by the recent change in PR.
> > > > >> > But there is just no test case for first bug.
> > > > >> >
> > > > >> > Regards,
> > > > >> > Chiwan Park
> > > > >> >
> > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]>
> wrote:
> > > > >> > >
> > > > >> > > I think both are bugs. They are triggered by the different
> > memory
> > > > >> > > configurations.
> > > > >> > >
> > > > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > > > >> > >
> > > > >> > > @felix: if yes, can you try the 2nd run again with the
> changes?
> > > > >> > >
> > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
> > [hidden email]>
> > > > >> wrote:
> > > > >> > >
> > > > >> > >> Hi,
> > > > >> > >>
> > > > >> > >> I played a bit with the ALS recommender algorithm. I used the
> > > > >> movielens
> > > > >> > >> dataset:
> > > > >> > >>
> > > > http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > > > >> > >>
> > > > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > > > >> > >>
> > > > >> > >> I run the algorithm with 3 configurations:
> > > > >> > >>
> > > > >> > >> 1. standard jvm heap space:
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(100)
> > > > >> > >>
> > > > >> > >> throws:
> > > > >> > >> java.lang.RuntimeException: Hash Join bug in memory
> management:
> > > > >> Memory
> > > > >> > >> buffers leaked.
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > >> > >> at
> > > > >> >
> > > >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >> > >> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > >> > >>
> > > > >> > >> 2. 5G jvm heap space
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(150)
> > > > >> > >>
> > > > >> > >> throws:
> > > > >> > >>
> > > > >> > >> java.lang.NullPointerException
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > >> > >> at
> > > > >> >
> > > >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > >> > >> at
> > > > >> > >>
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >> > >> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > >> > >>
> > > > >> > >> 3. 14G jvm heap space
> > > > >> > >>
> > > > >> > >> val als = ALS()
> > > > >> > >>   .setIterations(10)
> > > > >> > >>   .setNumFactors(10)
> > > > >> > >>   .setBlocks(150)
> > > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > > > >> > >>
> > > > >> > >> -> works
> > > > >> > >>
> > > > >> > >> Is this a Flink problem or is it just my bad configuration?
> > > > >> > >>
> > > > >> > >> Best regards,
> > > > >> > >> Felix
> > > > >> > >>
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Till Rohrmann
I'll look into it to find the responsible join operation.
On Jun 5, 2015 10:50 AM, "Stephan Ewen" <[hidden email]> wrote:

> There are two different issues here:
>
> 1) Flink does figure out how much memory a join gets, but that memory may
> be too little for the join to accept it. Flink plans highly conservative
> right now - too conservative often, which is something we have on the
> immediate roadmap to fix.
>
> 2) The "Hash Join exceeded recursions" problems is made worse by little
> memory, but is usually an indicator that the join is running the wrong way
> anyways. The side with many duplicates should rarely be the build side, but
> in most cases the probe side.
>
>
> Stephan
>
>
>
>
> On Fri, Jun 5, 2015 at 9:13 AM, Felix Neutatz <[hidden email]>
> wrote:
>
> > Shouldn't Flink figure it out on its own, how much memory there is for
> the
> > join?
> >
> > The detailed trace for the Nullpointer exception can be found here:
> >
> >
> https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt
> >
> > Best regards,
> > Felix
> >
> > 2015-06-04 19:41 GMT+02:00 Till Rohrmann <[hidden email]>:
> >
> > > I think it is not a problem of join hints, but rather of too little
> > memory
> > > for the join operator. If you set the temporary directory, then the job
> > > will be split in smaller parts and thus each operator gets more memory.
> > > Alternatively, you can increase the memory you give to the Task
> Managers.
> > >
> > > The problem with the NullPointerException won't be solved by this,
> > though.
> > > Could you send the full stack trace for that?
> > >
> > > Cheers,
> > > Till
> > > On Jun 4, 2015 7:10 PM, "Andra Lungu" <[hidden email]> wrote:
> > >
> > > > Hi Felix,
> > > >
> > > > Passing a JoinHint to your function should help.
> > > > see:
> > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
> > > >
> > > > Cheers,
> > > > Andra
> > > >
> > > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > after bug fix:
> > > > >
> > > > > for 100 blocks and standard jvm heap space
> > > > >
> > > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
> > > number
> > > > of
> > > > > recursions, without reducing partitions enough to be memory
> resident.
> > > > > Probably cause: Too many duplicate keys.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > > at
> > > >
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > > at java.lang.Thread.run(Thread.java:745)
> > > > >
> > > > >
> > > > > for 150 blocks and 5G jvm heap space
> > > > >
> > > > > Caused by: java.lang.NullPointerException
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > > ...
> > > > >
> > > > > Best regards,
> > > > > Felix
> > > > >
> > > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]>:
> > > > >
> > > > > > Yes, I will try it again with the newest update :)
> > > > > >
> > > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <
> [hidden email]
> > >:
> > > > > >
> > > > > >> If the first error is not fixed by Chiwans PR, then we should
> > > create a
> > > > > >> JIRA
> > > > > >> for it to not forget it.
> > > > > >>
> > > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again
> > with
> > > > > this
> > > > > >> version?
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Till
> > > > > >>
> > > > > >> [1] https://github.com/apache/flink/pull/751
> > > > > >>
> > > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <
> > [hidden email]
> > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi. The second bug is fixed by the recent change in PR.
> > > > > >> > But there is just no test case for first bug.
> > > > > >> >
> > > > > >> > Regards,
> > > > > >> > Chiwan Park
> > > > > >> >
> > > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]>
> > wrote:
> > > > > >> > >
> > > > > >> > > I think both are bugs. They are triggered by the different
> > > memory
> > > > > >> > > configurations.
> > > > > >> > >
> > > > > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > > > > >> > >
> > > > > >> > > @felix: if yes, can you try the 2nd run again with the
> > changes?
> > > > > >> > >
> > > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
> > > [hidden email]>
> > > > > >> wrote:
> > > > > >> > >
> > > > > >> > >> Hi,
> > > > > >> > >>
> > > > > >> > >> I played a bit with the ALS recommender algorithm. I used
> the
> > > > > >> movielens
> > > > > >> > >> dataset:
> > > > > >> > >>
> > > > >
> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > > > > >> > >>
> > > > > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > > > > >> > >>
> > > > > >> > >> I run the algorithm with 3 configurations:
> > > > > >> > >>
> > > > > >> > >> 1. standard jvm heap space:
> > > > > >> > >>
> > > > > >> > >> val als = ALS()
> > > > > >> > >>   .setIterations(10)
> > > > > >> > >>   .setNumFactors(10)
> > > > > >> > >>   .setBlocks(100)
> > > > > >> > >>
> > > > > >> > >> throws:
> > > > > >> > >> java.lang.RuntimeException: Hash Join bug in memory
> > management:
> > > > > >> Memory
> > > > > >> > >> buffers leaked.
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > > >> > >> at
> > > > > >> >
> > > > >
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > >> > >> at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > > >> > >>
> > > > > >> > >> 2. 5G jvm heap space
> > > > > >> > >>
> > > > > >> > >> val als = ALS()
> > > > > >> > >>   .setIterations(10)
> > > > > >> > >>   .setNumFactors(10)
> > > > > >> > >>   .setBlocks(150)
> > > > > >> > >>
> > > > > >> > >> throws:
> > > > > >> > >>
> > > > > >> > >> java.lang.NullPointerException
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > > > >> > >> at
> > > > > >> >
> > > > >
> > >
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > > > >> > >> at
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > >> > >> at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > > >> > >> at java.lang.Thread.run(Thread.java:745)
> > > > > >> > >>
> > > > > >> > >> 3. 14G jvm heap space
> > > > > >> > >>
> > > > > >> > >> val als = ALS()
> > > > > >> > >>   .setIterations(10)
> > > > > >> > >>   .setNumFactors(10)
> > > > > >> > >>   .setBlocks(150)
> > > > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
> > > > > >> > >>
> > > > > >> > >> -> works
> > > > > >> > >>
> > > > > >> > >> Is this a Flink problem or is it just my bad configuration?
> > > > > >> > >>
> > > > > >> > >> Best regards,
> > > > > >> > >> Felix
> > > > > >> > >>
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Till Rohrmann
Hi Felix, I tried to reproduce the problem with the *Hash join exceeded
maximum number of recursions, without reducing partitions enough to be
memory resident.* exception. I used the same data set and the same settings
for ALS. However, on my machine it runs through without this exception.
Could you tell me on what computer you’re running the program? How many
cores, how much memory do you give the JVM as standard value?

How does your program looks like? Do you only calculate the factorization
and print it? Or do you also try to calculate predictions?

I found only one possible join operation where this exception could be
thrown, namely the predict operation. However, the join key should be
unique for the build side at this position, because you join the the user
vectors with the user ids of you query there. All other joins use the merge
strategy. Is it correct, that the factorization has been completed before
the exception occurs? You can see that in the output if the 10th iteration
has been reached.

Maybe you can also send us the execution plan:
println(env.getExecutionPlan()).

What we could try is to insert a join hint for the respective join
operation and see whether this solves your problem.

Cheers,
Till


On Sat, Jun 6, 2015 at 1:32 AM Till Rohrmann <[hidden email]> wrote:

> I'll look into it to find the responsible join operation.
> On Jun 5, 2015 10:50 AM, "Stephan Ewen" <[hidden email]> wrote:
>
>> There are two different issues here:
>>
>> 1) Flink does figure out how much memory a join gets, but that memory may
>> be too little for the join to accept it. Flink plans highly conservative
>> right now - too conservative often, which is something we have on the
>> immediate roadmap to fix.
>>
>> 2) The "Hash Join exceeded recursions" problems is made worse by little
>> memory, but is usually an indicator that the join is running the wrong way
>> anyways. The side with many duplicates should rarely be the build side,
>> but
>> in most cases the probe side.
>>
>>
>> Stephan
>>
>>
>>
>>
>> On Fri, Jun 5, 2015 at 9:13 AM, Felix Neutatz <[hidden email]>
>> wrote:
>>
>> > Shouldn't Flink figure it out on its own, how much memory there is for
>> the
>> > join?
>> >
>> > The detailed trace for the Nullpointer exception can be found here:
>> >
>> >
>> https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt
>> >
>> > Best regards,
>> > Felix
>> >
>> > 2015-06-04 19:41 GMT+02:00 Till Rohrmann <[hidden email]>:
>> >
>> > > I think it is not a problem of join hints, but rather of too little
>> > memory
>> > > for the join operator. If you set the temporary directory, then the
>> job
>> > > will be split in smaller parts and thus each operator gets more
>> memory.
>> > > Alternatively, you can increase the memory you give to the Task
>> Managers.
>> > >
>> > > The problem with the NullPointerException won't be solved by this,
>> > though.
>> > > Could you send the full stack trace for that?
>> > >
>> > > Cheers,
>> > > Till
>> > > On Jun 4, 2015 7:10 PM, "Andra Lungu" <[hidden email]> wrote:
>> > >
>> > > > Hi Felix,
>> > > >
>> > > > Passing a JoinHint to your function should help.
>> > > > see:
>> > > >
>> > > >
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
>> > > >
>> > > > Cheers,
>> > > > Andra
>> > > >
>> > > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <
>> [hidden email]>
>> > > > wrote:
>> > > >
>> > > > > after bug fix:
>> > > > >
>> > > > > for 100 blocks and standard jvm heap space
>> > > > >
>> > > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
>> > > number
>> > > > of
>> > > > > recursions, without reducing partitions enough to be memory
>> resident.
>> > > > > Probably cause: Too many duplicate keys.
>> > > > > at
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
>> > > > > at
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
>> > > > > at
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
>> > > > > at
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>> > > > > at
>> > > >
>> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>> > > > > at
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>> > > > > at
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> > > > > at java.lang.Thread.run(Thread.java:745)
>> > > > >
>> > > > >
>> > > > > for 150 blocks and 5G jvm heap space
>> > > > >
>> > > > > Caused by: java.lang.NullPointerException
>> > > > > at
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>> > > > > ...
>> > > > >
>> > > > > Best regards,
>> > > > > Felix
>> > > > >
>> > > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]
>> >:
>> > > > >
>> > > > > > Yes, I will try it again with the newest update :)
>> > > > > >
>> > > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <
>> [hidden email]
>> > >:
>> > > > > >
>> > > > > >> If the first error is not fixed by Chiwans PR, then we should
>> > > create a
>> > > > > >> JIRA
>> > > > > >> for it to not forget it.
>> > > > > >>
>> > > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again
>> > with
>> > > > > this
>> > > > > >> version?
>> > > > > >>
>> > > > > >> Cheers,
>> > > > > >> Till
>> > > > > >>
>> > > > > >> [1] https://github.com/apache/flink/pull/751
>> > > > > >>
>> > > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <
>> > [hidden email]
>> > > >
>> > > > > >> wrote:
>> > > > > >>
>> > > > > >> > Hi. The second bug is fixed by the recent change in PR.
>> > > > > >> > But there is just no test case for first bug.
>> > > > > >> >
>> > > > > >> > Regards,
>> > > > > >> > Chiwan Park
>> > > > > >> >
>> > > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]>
>> > wrote:
>> > > > > >> > >
>> > > > > >> > > I think both are bugs. They are triggered by the different
>> > > memory
>> > > > > >> > > configurations.
>> > > > > >> > >
>> > > > > >> > > @chiwan: is the 2nd error fixed by your recent change?
>> > > > > >> > >
>> > > > > >> > > @felix: if yes, can you try the 2nd run again with the
>> > changes?
>> > > > > >> > >
>> > > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
>> > > [hidden email]>
>> > > > > >> wrote:
>> > > > > >> > >
>> > > > > >> > >> Hi,
>> > > > > >> > >>
>> > > > > >> > >> I played a bit with the ALS recommender algorithm. I used
>> the
>> > > > > >> movielens
>> > > > > >> > >> dataset:
>> > > > > >> > >>
>> > > > >
>> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
>> > > > > >> > >>
>> > > > > >> > >> The rating matrix has 21.063.128 entries (ratings).
>> > > > > >> > >>
>> > > > > >> > >> I run the algorithm with 3 configurations:
>> > > > > >> > >>
>> > > > > >> > >> 1. standard jvm heap space:
>> > > > > >> > >>
>> > > > > >> > >> val als = ALS()
>> > > > > >> > >>   .setIterations(10)
>> > > > > >> > >>   .setNumFactors(10)
>> > > > > >> > >>   .setBlocks(100)
>> > > > > >> > >>
>> > > > > >> > >> throws:
>> > > > > >> > >> java.lang.RuntimeException: Hash Join bug in memory
>> > management:
>> > > > > >> Memory
>> > > > > >> > >> buffers leaked.
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>> > > > > >> > >> at
>> > > > > >> >
>> > > > >
>> > >
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> > > > > >> > >> at
>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> > > > > >> > >> at java.lang.Thread.run(Thread.java:745)
>> > > > > >> > >>
>> > > > > >> > >> 2. 5G jvm heap space
>> > > > > >> > >>
>> > > > > >> > >> val als = ALS()
>> > > > > >> > >>   .setIterations(10)
>> > > > > >> > >>   .setNumFactors(10)
>> > > > > >> > >>   .setBlocks(150)
>> > > > > >> > >>
>> > > > > >> > >> throws:
>> > > > > >> > >>
>> > > > > >> > >> java.lang.NullPointerException
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>> > > > > >> > >> at
>> > > > > >> >
>> > > > >
>> > >
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>> > > > > >> > >> at
>> > > > > >> > >>
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> > > > > >> > >> at
>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> > > > > >> > >> at java.lang.Thread.run(Thread.java:745)
>> > > > > >> > >>
>> > > > > >> > >> 3. 14G jvm heap space
>> > > > > >> > >>
>> > > > > >> > >> val als = ALS()
>> > > > > >> > >>   .setIterations(10)
>> > > > > >> > >>   .setNumFactors(10)
>> > > > > >> > >>   .setBlocks(150)
>> > > > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
>> > > > > >> > >>
>> > > > > >> > >> -> works
>> > > > > >> > >>
>> > > > > >> > >> Is this a Flink problem or is it just my bad
>> configuration?
>> > > > > >> > >>
>> > > > > >> > >> Best regards,
>> > > > > >> > >> Felix
>> > > > > >> > >>
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: ALS implementation

Till Rohrmann
I think I found the possible error. I suspect that the empirical risk
calculation causes the problem with the *Hash join exceeded maximum number
of recursions*. What you do for this calculation is to provide the training
data set DataSet[(Int, Int, Double)] and you calculate for each item the
predicted rating value. In order to do this, you first join each rating
value with the user factors (join key is the first field) and then you join
the result with the item factors (join key is the second field). For the
second join operation you basically have a DataSet[(Int, Int, UserFactors)]
with UserFactors being an array of doubles as one of the joining data sets.
This data set has roughly the size of #ratings * #latentFactors *
sizeof(Double) and many entries with an identical key (= #ratingEntries for
each item).

I assume that the only way to make this robust is to force a sort merge
join strategy here because one cannot get rid of the duplicate join keys.


On Mon, Jun 8, 2015 at 1:59 PM Till Rohrmann <[hidden email]> wrote:

> Hi Felix, I tried to reproduce the problem with the *Hash join exceeded
> maximum number of recursions, without reducing partitions enough to be
> memory resident.* exception. I used the same data set and the same
> settings for ALS. However, on my machine it runs through without this
> exception. Could you tell me on what computer you’re running the program?
> How many cores, how much memory do you give the JVM as standard value?
>
> How does your program looks like? Do you only calculate the factorization
> and print it? Or do you also try to calculate predictions?
>
> I found only one possible join operation where this exception could be
> thrown, namely the predict operation. However, the join key should be
> unique for the build side at this position, because you join the the user
> vectors with the user ids of you query there. All other joins use the merge
> strategy. Is it correct, that the factorization has been completed before
> the exception occurs? You can see that in the output if the 10th iteration
> has been reached.
>
> Maybe you can also send us the execution plan:
> println(env.getExecutionPlan()).
>
> What we could try is to insert a join hint for the respective join
> operation and see whether this solves your problem.
>
> Cheers,
> Till
> ​
>
> On Sat, Jun 6, 2015 at 1:32 AM Till Rohrmann <[hidden email]> wrote:
>
>> I'll look into it to find the responsible join operation.
>> On Jun 5, 2015 10:50 AM, "Stephan Ewen" <[hidden email]> wrote:
>>
>>> There are two different issues here:
>>>
>>> 1) Flink does figure out how much memory a join gets, but that memory may
>>> be too little for the join to accept it. Flink plans highly conservative
>>> right now - too conservative often, which is something we have on the
>>> immediate roadmap to fix.
>>>
>>> 2) The "Hash Join exceeded recursions" problems is made worse by little
>>> memory, but is usually an indicator that the join is running the wrong
>>> way
>>> anyways. The side with many duplicates should rarely be the build side,
>>> but
>>> in most cases the probe side.
>>>
>>>
>>> Stephan
>>>
>>>
>>>
>>>
>>> On Fri, Jun 5, 2015 at 9:13 AM, Felix Neutatz <[hidden email]>
>>> wrote:
>>>
>>> > Shouldn't Flink figure it out on its own, how much memory there is for
>>> the
>>> > join?
>>> >
>>> > The detailed trace for the Nullpointer exception can be found here:
>>> >
>>> >
>>> https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt
>>> >
>>> > Best regards,
>>> > Felix
>>> >
>>> > 2015-06-04 19:41 GMT+02:00 Till Rohrmann <[hidden email]>:
>>> >
>>> > > I think it is not a problem of join hints, but rather of too little
>>> > memory
>>> > > for the join operator. If you set the temporary directory, then the
>>> job
>>> > > will be split in smaller parts and thus each operator gets more
>>> memory.
>>> > > Alternatively, you can increase the memory you give to the Task
>>> Managers.
>>> > >
>>> > > The problem with the NullPointerException won't be solved by this,
>>> > though.
>>> > > Could you send the full stack trace for that?
>>> > >
>>> > > Cheers,
>>> > > Till
>>> > > On Jun 4, 2015 7:10 PM, "Andra Lungu" <[hidden email]> wrote:
>>> > >
>>> > > > Hi Felix,
>>> > > >
>>> > > > Passing a JoinHint to your function should help.
>>> > > > see:
>>> > > >
>>> > > >
>>> > >
>>> >
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3CCANC1h_vFFbQyyiKTzCDPihn09r4HE4OLuiuRSJnci_rWc+cccA@...%3E
>>> > > >
>>> > > > Cheers,
>>> > > > Andra
>>> > > >
>>> > > > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz <
>>> [hidden email]>
>>> > > > wrote:
>>> > > >
>>> > > > > after bug fix:
>>> > > > >
>>> > > > > for 100 blocks and standard jvm heap space
>>> > > > >
>>> > > > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
>>> > > number
>>> > > > of
>>> > > > > recursions, without reducing partitions enough to be memory
>>> resident.
>>> > > > > Probably cause: Too many duplicate keys.
>>> > > > > at
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
>>> > > > > at
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
>>> > > > > at
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
>>> > > > > at
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>> > > > > at
>>> > > >
>>> >
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>> > > > > at
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>> > > > > at
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> > > > > at java.lang.Thread.run(Thread.java:745)
>>> > > > >
>>> > > > >
>>> > > > > for 150 blocks and 5G jvm heap space
>>> > > > >
>>> > > > > Caused by: java.lang.NullPointerException
>>> > > > > at
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>>> > > > > ...
>>> > > > >
>>> > > > > Best regards,
>>> > > > > Felix
>>> > > > >
>>> > > > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz <[hidden email]
>>> >:
>>> > > > >
>>> > > > > > Yes, I will try it again with the newest update :)
>>> > > > > >
>>> > > > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann <
>>> [hidden email]
>>> > >:
>>> > > > > >
>>> > > > > >> If the first error is not fixed by Chiwans PR, then we should
>>> > > create a
>>> > > > > >> JIRA
>>> > > > > >> for it to not forget it.
>>> > > > > >>
>>> > > > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS
>>> again
>>> > with
>>> > > > > this
>>> > > > > >> version?
>>> > > > > >>
>>> > > > > >> Cheers,
>>> > > > > >> Till
>>> > > > > >>
>>> > > > > >> [1] https://github.com/apache/flink/pull/751
>>> > > > > >>
>>> > > > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park <
>>> > [hidden email]
>>> > > >
>>> > > > > >> wrote:
>>> > > > > >>
>>> > > > > >> > Hi. The second bug is fixed by the recent change in PR.
>>> > > > > >> > But there is just no test case for first bug.
>>> > > > > >> >
>>> > > > > >> > Regards,
>>> > > > > >> > Chiwan Park
>>> > > > > >> >
>>> > > > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi <[hidden email]>
>>> > wrote:
>>> > > > > >> > >
>>> > > > > >> > > I think both are bugs. They are triggered by the different
>>> > > memory
>>> > > > > >> > > configurations.
>>> > > > > >> > >
>>> > > > > >> > > @chiwan: is the 2nd error fixed by your recent change?
>>> > > > > >> > >
>>> > > > > >> > > @felix: if yes, can you try the 2nd run again with the
>>> > changes?
>>> > > > > >> > >
>>> > > > > >> > > On Thursday, June 4, 2015, Felix Neutatz <
>>> > > [hidden email]>
>>> > > > > >> wrote:
>>> > > > > >> > >
>>> > > > > >> > >> Hi,
>>> > > > > >> > >>
>>> > > > > >> > >> I played a bit with the ALS recommender algorithm. I
>>> used the
>>> > > > > >> movielens
>>> > > > > >> > >> dataset:
>>> > > > > >> > >>
>>> > > > >
>>> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
>>> > > > > >> > >>
>>> > > > > >> > >> The rating matrix has 21.063.128 entries (ratings).
>>> > > > > >> > >>
>>> > > > > >> > >> I run the algorithm with 3 configurations:
>>> > > > > >> > >>
>>> > > > > >> > >> 1. standard jvm heap space:
>>> > > > > >> > >>
>>> > > > > >> > >> val als = ALS()
>>> > > > > >> > >>   .setIterations(10)
>>> > > > > >> > >>   .setNumFactors(10)
>>> > > > > >> > >>   .setBlocks(100)
>>> > > > > >> > >>
>>> > > > > >> > >> throws:
>>> > > > > >> > >> java.lang.RuntimeException: Hash Join bug in memory
>>> > management:
>>> > > > > >> Memory
>>> > > > > >> > >> buffers leaked.
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>> > > > > >> > >> at
>>> > > > > >> >
>>> > > > >
>>> > >
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> > > > > >> > >> at
>>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> > > > > >> > >> at java.lang.Thread.run(Thread.java:745)
>>> > > > > >> > >>
>>> > > > > >> > >> 2. 5G jvm heap space
>>> > > > > >> > >>
>>> > > > > >> > >> val als = ALS()
>>> > > > > >> > >>   .setIterations(10)
>>> > > > > >> > >>   .setNumFactors(10)
>>> > > > > >> > >>   .setBlocks(150)
>>> > > > > >> > >>
>>> > > > > >> > >> throws:
>>> > > > > >> > >>
>>> > > > > >> > >> java.lang.NullPointerException
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>>> > > > > >> > >> at
>>> > > > > >> >
>>> > > > >
>>> > >
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>> > > > > >> > >> at
>>> > > > > >> > >>
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> > > > > >> > >> at
>>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> > > > > >> > >> at java.lang.Thread.run(Thread.java:745)
>>> > > > > >> > >>
>>> > > > > >> > >> 3. 14G jvm heap space
>>> > > > > >> > >>
>>> > > > > >> > >> val als = ALS()
>>> > > > > >> > >>   .setIterations(10)
>>> > > > > >> > >>   .setNumFactors(10)
>>> > > > > >> > >>   .setBlocks(150)
>>> > > > > >> > >>   .setTemporaryPath("/tmp/tmpALS")
>>> > > > > >> > >>
>>> > > > > >> > >> -> works
>>> > > > > >> > >>
>>> > > > > >> > >> Is this a Flink problem or is it just my bad
>>> configuration?
>>> > > > > >> > >>
>>> > > > > >> > >> Best regards,
>>> > > > > >> > >> Felix
>>> > > > > >> > >>
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >>
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>