Serialization problem in Flink integration to SAMOA

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization problem in Flink integration to SAMOA

F. Beligianni
Hello,

I am currently working on the integration of Flink Streaming API to
SAMOA and I have some problems with an exception that I take from the kryo
serialiser:

Caused by: java.lang.ArrayIndexOutOfBoundsException
at java.lang.System.arraycopy(Native Method)
at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:238)
at
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read(SpillingAdaptiveSpanningRecordDeserializer.java:410)
at
org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:134)
at com.esotericsoftware.kryo.io.Input.require(Input.java:154)
at com.esotericsoftware.kryo.io.Input.readInt(Input.java:303)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:103)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:596)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:707)
at
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:195)


Specifically, I am working with Flink-0.9-SNAPSHOT and the exception is
received on the custom class "FlinkProcessingItem" which extends
"StreamInvokable" class, in "invoke" function when the readNext() function
of StreamInvokable is called.

The object that is supposed to be received by "readNext" function is a
custom Tuple3 object, called SamoaType and defined like this:
"SamoaType extends Tuple3<String, ContentEvent, String>", where
ContentEvent is an interface of SAMOA.

The type information of the custom SamoaType is added to the source in the
following way: "TypeExtractor.getForObject"

The ContentEvent object that's sent between the two Invokables is of type
"InstanceContentEvent" which implements ContentEvent, which you can find in
the following link:
InstanceContentEvent
<https://github.com/yahoo/samoa/blob/master/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java>
.

We managed to reproduce the exception in the following test program;
TestSerialization
<https://github.com/senorcarbone/samoa/commit/9eba049031aee85d1bef58dcdaf37110b9fe4505>
.


Lastly, I should mention that the same example runs in Storm, even though
Storm also uses kryo.

Thank you,
Fay
Reply | Threaded
Open this post in threaded view
|

Re: Serialization problem in Flink integration to SAMOA

Paris Carbone
fyi

The problem seems to be that samoa-api uses Kryo 2.17 and Flink 2.24.0. All flink-related tests pass if I upgrade samoa to 2.24.0. You can also ask at the samoa-incubating dev-list if that will be ok to change. Maybe it would be good to test the same version on storm, samza and s4 respectively to be sure.

Paris


> On 28 Jan 2015, at 12:52, F. Beligianni <[hidden email]> wrote:
>
> Hello,
>
> I am currently working on the integration of Flink Streaming API to
> SAMOA and I have some problems with an exception that I take from the kryo
> serialiser:
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException
> at java.lang.System.arraycopy(Native Method)
> at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:238)
> at
> org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read(SpillingAdaptiveSpanningRecordDeserializer.java:410)
> at
> org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
> at com.esotericsoftware.kryo.io.Input.fill(Input.java:134)
> at com.esotericsoftware.kryo.io.Input.require(Input.java:154)
> at com.esotericsoftware.kryo.io.Input.readInt(Input.java:303)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:103)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:596)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:707)
> at
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:195)
>
>
> Specifically, I am working with Flink-0.9-SNAPSHOT and the exception is
> received on the custom class "FlinkProcessingItem" which extends
> "StreamInvokable" class, in "invoke" function when the readNext() function
> of StreamInvokable is called.
>
> The object that is supposed to be received by "readNext" function is a
> custom Tuple3 object, called SamoaType and defined like this:
> "SamoaType extends Tuple3<String, ContentEvent, String>", where
> ContentEvent is an interface of SAMOA.
>
> The type information of the custom SamoaType is added to the source in the
> following way: "TypeExtractor.getForObject"
>
> The ContentEvent object that's sent between the two Invokables is of type
> "InstanceContentEvent" which implements ContentEvent, which you can find in
> the following link:
> InstanceContentEvent
> <https://github.com/yahoo/samoa/blob/master/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java>
> .
>
> We managed to reproduce the exception in the following test program;
> TestSerialization
> <https://github.com/senorcarbone/samoa/commit/9eba049031aee85d1bef58dcdaf37110b9fe4505>
> .
>
>
> Lastly, I should mention that the same example runs in Storm, even though
> Storm also uses kryo.
>
> Thank you,
> Fay