Passing around huge hash sets

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Passing around huge hash sets

Andra Lungu
Hey!

It appears that my jobs have the same memory issue disguised in different
Exceptions. It's expected, I am passing around hash sets of neighbors and
for skewed graphs (i.e. with a lot of neighbors) it's bound to fail one way
or the other.

This time Kryo hates me :(
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
Failed to serialize element. Serialized size (> 2166784 bytes) exceeds JVM
heap space
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
    at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
    at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:261)
    at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
    at
com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:115)
    at
com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:109)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
    at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
    at
com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:593)
    at
com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:589)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
    at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
    at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
    at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
    at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
    at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at
util.DummyGraph$ApplyCoGroupToVertexValuesTuple2.coGroup(DummyGraph.java:368)
    at
org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:722)


Is there anything I can do to increase the heap size?

Thanks in advance!
Andra
Reply | Threaded
Open this post in threaded view
|

Re: Passing around huge hash sets

Fabian Hueske-2
Yes, you can decrease the amount of managed memory and/or increase the TM
heap size

See
- taskmanager.memory.size,
- taskmanager.memory.fraction, and
- taskmanager.heap.mb

in

https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html#common-options

Cheers, Fabian

2015-07-10 14:35 GMT+02:00 Andra Lungu <[hidden email]>:

> Hey!
>
> It appears that my jobs have the same memory issue disguised in different
> Exceptions. It's expected, I am passing around hash sets of neighbors and
> for skewed graphs (i.e. with a lot of neighbors) it's bound to fail one way
> or the other.
>
> This time Kryo hates me :(
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> Failed to serialize element. Serialized size (> 2166784 bytes) exceeds JVM
> heap space
>     at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>     at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>     at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:261)
>     at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
>     at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:115)
>     at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:109)
>     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>     at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
>     at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:593)
>     at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:589)
>     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>     at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
>     at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
>     at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>     at
>
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>     at
>
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>     at
>
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
>     at
>
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>     at
>
> util.DummyGraph$ApplyCoGroupToVertexValuesTuple2.coGroup(DummyGraph.java:368)
>     at
>
> org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:722)
>
>
> Is there anything I can do to increase the heap size?
>
> Thanks in advance!
> Andra
>
Reply | Threaded
Open this post in threaded view
|

Re: Passing around huge hash sets

Stephan Ewen
There is also an inherent limit to this style of passing massive data about
neighbors. They are often limited in their scalability as well.

At some point you may need to switch to probabilistic data structures. or
simply to a different method.

On Fri, Jul 10, 2015 at 2:54 PM, Fabian Hueske <[hidden email]> wrote:

> Yes, you can decrease the amount of managed memory and/or increase the TM
> heap size
>
> See
> - taskmanager.memory.size,
> - taskmanager.memory.fraction, and
> - taskmanager.heap.mb
>
> in
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html#common-options
>
> Cheers, Fabian
>
> 2015-07-10 14:35 GMT+02:00 Andra Lungu <[hidden email]>:
>
> > Hey!
> >
> > It appears that my jobs have the same memory issue disguised in different
> > Exceptions. It's expected, I am passing around hash sets of neighbors and
> > for skewed graphs (i.e. with a lot of neighbors) it's bound to fail one
> way
> > or the other.
> >
> > This time Kryo hates me :(
> > Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> > Failed to serialize element. Serialized size (> 2166784 bytes) exceeds
> JVM
> > heap space
> >     at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> >     at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
> >     at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:261)
> >     at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:115)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:109)
> >     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:593)
> >     at
> >
> >
> com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeMapSerializer.write(DefaultSerializers.java:589)
> >     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> >     at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:186)
> >     at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
> >     at
> >
> >
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> >     at
> >
> >
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
> >     at
> >
> >
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
> >     at
> >
> >
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
> >     at
> >
> >
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> >     at
> >
> >
> util.DummyGraph$ApplyCoGroupToVertexValuesTuple2.coGroup(DummyGraph.java:368)
> >     at
> >
> >
> org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
> >     at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> >     at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >     at java.lang.Thread.run(Thread.java:722)
> >
> >
> > Is there anything I can do to increase the heap size?
> >
> > Thanks in advance!
> > Andra
> >
>