Hello,
I’m getting this error while running a streaming module on a cluster of 3 nodes: java.lang.ArrayIndexOutOfBoundsException: 32768 at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:214) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:219) at org.apache.flink.types.StringValue.readString(StringValue.java:764) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:499) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:102) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) at org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNextRecord(StreamingAbstractRecordReader.java:80) at org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(StreamingMutableRecordReader.java:36) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:68) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Here’s the configuration for each node: jobmanager.heap.mb: 2048 taskmanager.heap.mb: 4096 taskmanager.numberOfTaskSlots: 5 I’m not even sure where to start with this one so any help is appreciated. Thanks, Ali |
Hi Ali,
this could be a bug in Flink. Can you share the code of your program with us to debug the issue? On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <[hidden email]> wrote: > Hello, > > I’m getting this error while running a streaming module on a cluster of 3 > nodes: > > > java.lang.ArrayIndexOutOfBoundsException: 32768 > > at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:214) > > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:219) > > at org.apache.flink.types.StringValue.readString(StringValue.java:764) > > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) > > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) > > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:499) > > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:102) > > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:29) > > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) > > at > org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNextRecord(StreamingAbstractRecordReader.java:80) > > at > org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(StreamingMutableRecordReader.java:36) > > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:68) > > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > > Here’s the configuration for each node: > > > jobmanager.heap.mb: 2048 > > taskmanager.heap.mb: 4096 > > taskmanager.numberOfTaskSlots: 5 > > > I’m not even sure where to start with this one so any help is appreciated. > > > Thanks, > > Ali > |
I agree with Robert. Looks like a bug in Flink.
Maybe an off-by-one issue (violating index is 32768 and the default memory segment size is 32KB). Which Flink version are you using? In case you are using a custom build, can you share the commit ID (is reported in the first lines of the JobManager log file)? Thanks, Fabian 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email]>: > Hi Ali, > > this could be a bug in Flink. > Can you share the code of your program with us to debug the issue? > > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <[hidden email]> wrote: > > > Hello, > > > > I’m getting this error while running a streaming module on a cluster of 3 > > nodes: > > > > > > java.lang.ArrayIndexOutOfBoundsException: 32768 > > > > at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > > > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:214) > > > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:219) > > > > at org.apache.flink.types.StringValue.readString(StringValue.java:764) > > > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) > > > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) > > > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > > > > at > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:499) > > > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:102) > > > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:29) > > > > at > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > > > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) > > > > at > > > org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNextRecord(StreamingAbstractRecordReader.java:80) > > > > at > > > org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(StreamingMutableRecordReader.java:36) > > > > at > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > > > at > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:68) > > > > at > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > Here’s the configuration for each node: > > > > > > jobmanager.heap.mb: 2048 > > > > taskmanager.heap.mb: 4096 > > > > taskmanager.numberOfTaskSlots: 5 > > > > > > I’m not even sure where to start with this one so any help is > appreciated. > > > > > > Thanks, > > > > Ali > > > |
Thanks for reporting this. Are you using any custom data types?
If you can share your code, it would be very helpful in order to debug this. – Ufuk On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> wrote: > I agree with Robert. Looks like a bug in Flink. > Maybe an off-by-one issue (violating index is 32768 and the default memory > segment size is 32KB). > > Which Flink version are you using? > In case you are using a custom build, can you share the commit ID (is > reported in the first lines of the JobManager log file)? > > Thanks, Fabian > > 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] > <javascript:;>>: > > > Hi Ali, > > > > this could be a bug in Flink. > > Can you share the code of your program with us to debug the issue? > > > > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <[hidden email] > <javascript:;>> wrote: > > > > > Hello, > > > > > > I’m getting this error while running a streaming module on a cluster > of 3 > > > nodes: > > > > > > > > > java.lang.ArrayIndexOutOfBoundsException: 32768 > > > > > > at > org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > > > > > > at > > > > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:214) > > > > > > at > > > > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:219) > > > > > > at org.apache.flink.types.StringValue.readString(StringValue.java:764) > > > > > > at > > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) > > > > > > at > > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) > > > > > > at > > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > > > > > > at > > > > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:499) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:102) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:29) > > > > > > at > > > > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > > > > > > at > > > > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNextRecord(StreamingAbstractRecordReader.java:80) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(StreamingMutableRecordReader.java:36) > > > > > > at > > > > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:68) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) > > > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > Here’s the configuration for each node: > > > > > > > > > jobmanager.heap.mb: 2048 > > > > > > taskmanager.heap.mb: 4096 > > > > > > taskmanager.numberOfTaskSlots: 5 > > > > > > > > > I’m not even sure where to start with this one so any help is > > appreciated. > > > > > > > > > Thanks, > > > > > > Ali > > > > > > |
Thanks for the quick reply guys! A lot of interest in this one. I¹ve
attached the source code is attached. There are other supporting modules/classes but the main flink component is in the included zip file. In answer to Fabian¹s question: I¹m using the 0.9.1 release right off the website (flink-0.9.1-bin-hadoop1.tgz). In answer to Ufuk¹s question: Yes I¹m using custom data types. Thanks, Ali On 2015-11-10, 3:01 PM, "Ufuk Celebi" <[hidden email]> wrote: >Thanks for reporting this. Are you using any custom data types? > >If you can share your code, it would be very helpful in order to debug >this. > > Ufuk > >On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> wrote: > >> I agree with Robert. Looks like a bug in Flink. >> Maybe an off-by-one issue (violating index is 32768 and the default >>memory >> segment size is 32KB). >> >> Which Flink version are you using? >> In case you are using a custom build, can you share the commit ID (is >> reported in the first lines of the JobManager log file)? >> >> Thanks, Fabian >> >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] >> <javascript:;>>: >> >> > Hi Ali, >> > >> > this could be a bug in Flink. >> > Can you share the code of your program with us to debug the issue? >> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <[hidden email] >> <javascript:;>> wrote: >> > >> > > Hello, >> > > >> > > I¹m getting this error while running a streaming module on a cluster >> of 3 >> > > nodes: >> > > >> > > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 >> > > >> > > at >> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) >> > > >> > > at >> > > >> > >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann >>ingRecordDeserializer.java:214) >> > > >> > > at >> > > >> > >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapt >>iveSpanningRecordDeserializer.java:219) >> > > >> > > at >>org.apache.flink.types.StringValue.readString(StringValue.java:764) >> > > >> > > at >> > > >> > >> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S >>tringSerializer.java:68) >> > > >> > > at >> > > >> > >> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S >>tringSerializer.java:73) >> > > >> > > at >> > > >> > >> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S >>tringSerializer.java:28) >> > > >> > > at >> > > >> > >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po >>joSerializer.java:499) >> > > >> > > at >> > > >> > >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>serialize(StreamRecordSerializer.java:102) >> > > >> > > at >> > > >> > >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>serialize(StreamRecordSerializer.java:29) >> > > >> > > at >> > > >> > >> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu >>singDeserializationDelegate.java:57) >> > > >> > > at >> > > >> > >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeser >>ializer.java:110) >> > > >> > > at >> > > >> > >> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe >>xtRecord(StreamingAbstractRecordReader.java:80) >> > > >> > > at >> > > >> > >> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S >>treamingMutableRecordReader.java:36) >> > > >> > > at >> > > >> > >> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato >>r.java:59) >> > > >> > > at >> > > >> > >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI >>nputStreamTask.java:68) >> > > >> > > at >> > > >> > >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp >>utStreamTask.java:101) >> > > >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> > > >> > > at java.lang.Thread.run(Thread.java:745) >> > > >> > > >> > > Here¹s the configuration for each node: >> > > >> > > >> > > jobmanager.heap.mb: 2048 >> > > >> > > taskmanager.heap.mb: 4096 >> > > >> > > taskmanager.numberOfTaskSlots: 5 >> > > >> > > >> > > I¹m not even sure where to start with this one so any help is >> > appreciated. >> > > >> > > >> > > Thanks, >> > > >> > > Ali >> > > >> > >> |
Hey Ali,
thanks for sharing the code. I assume that the custom ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They should not be a problem. I think this is a bug in Flink 0.9.1. Is it possible to re-run your program with the upcoming 0.10.0 (RC8) version and report back? 1) Add https://repository.apache.org/content/repositories/orgapacheflink-1055 as a snapshot repository <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/orgapacheflink-1055 </url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> 2) Set the Flink dependency version to 0.10.0 3) Use the Flink binary matching your Hadoop installation from here: http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, you can go with the Scala 2.10 builds) Sorry for the inconvenience! The release is about to be finished (the voting process is already going on). – Ufuk On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <[hidden email]> wrote: > Thanks for the quick reply guys! A lot of interest in this one. I¹ve > attached the source code is attached. There are other supporting > modules/classes but the main flink component is in the included zip file. > > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off the > website (flink-0.9.1-bin-hadoop1.tgz). > > In answer to Ufuk¹s question: Yes I¹m using custom data types. > > Thanks, > Ali > > > > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <[hidden email]> wrote: > > >Thanks for reporting this. Are you using any custom data types? > > > >If you can share your code, it would be very helpful in order to debug > >this. > > > > Ufuk > > > >On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> wrote: > > > >> I agree with Robert. Looks like a bug in Flink. > >> Maybe an off-by-one issue (violating index is 32768 and the default > >>memory > >> segment size is 32KB). > >> > >> Which Flink version are you using? > >> In case you are using a custom build, can you share the commit ID (is > >> reported in the first lines of the JobManager log file)? > >> > >> Thanks, Fabian > >> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] > >> <javascript:;>>: > >> > >> > Hi Ali, > >> > > >> > this could be a bug in Flink. > >> > Can you share the code of your program with us to debug the issue? > >> > > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <[hidden email] > >> <javascript:;>> wrote: > >> > > >> > > Hello, > >> > > > >> > > I¹m getting this error while running a streaming module on a cluster > >> of 3 > >> > > nodes: > >> > > > >> > > > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 > >> > > > >> > > at > >> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann > >>ingRecordDeserializer.java:214) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapt > >>iveSpanningRecordDeserializer.java:219) > >> > > > >> > > at > >>org.apache.flink.types.StringValue.readString(StringValue.java:764) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > >>tringSerializer.java:68) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > >>tringSerializer.java:73) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > >>tringSerializer.java:28) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > >>joSerializer.java:499) > >> > > > >> > > at > >> > > > >> > > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>serialize(StreamRecordSerializer.java:102) > >> > > > >> > > at > >> > > > >> > > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>serialize(StreamRecordSerializer.java:29) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > >>singDeserializationDelegate.java:57) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeser > >>ializer.java:110) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > >>xtRecord(StreamingAbstractRecordReader.java:80) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > >>treamingMutableRecordReader.java:36) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > >>r.java:59) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > >>nputStreamTask.java:68) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > >>utStreamTask.java:101) > >> > > > >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> > > > >> > > at java.lang.Thread.run(Thread.java:745) > >> > > > >> > > > >> > > Here¹s the configuration for each node: > >> > > > >> > > > >> > > jobmanager.heap.mb: 2048 > >> > > > >> > > taskmanager.heap.mb: 4096 > >> > > > >> > > taskmanager.numberOfTaskSlots: 5 > >> > > > >> > > > >> > > I¹m not even sure where to start with this one so any help is > >> > appreciated. > >> > > > >> > > > >> > > Thanks, > >> > > > >> > > Ali > >> > > > >> > > >> > > |
Hi Ali,
one more thing. Did that error occur once or is it reproducable? Thanks for your help, Fabian 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <[hidden email]>: > Hey Ali, > > thanks for sharing the code. I assume that the custom > ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They > should not be a problem. I think this is a bug in Flink 0.9.1. > > Is it possible to re-run your program with the upcoming 0.10.0 (RC8) > version and report back? > > 1) Add > https://repository.apache.org/content/repositories/orgapacheflink-1055 as > a > snapshot repository > > <repositories> > <repository> > <id>apache.snapshots</id> > <name>Apache Development Snapshot Repository</name> > <url> > https://repository.apache.org/content/repositories/orgapacheflink-1055 > </url> > <releases> > <enabled>false</enabled> > </releases> > <snapshots> > <enabled>true</enabled> > </snapshots> > </repository> > </repositories> > > 2) Set the Flink dependency version to 0.10.0 > > 3) Use the Flink binary matching your Hadoop installation from here: > http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, you > can go with the Scala 2.10 builds) > > Sorry for the inconvenience! The release is about to be finished (the > voting process is already going on). > > – Ufuk > > > On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <[hidden email]> wrote: > > > Thanks for the quick reply guys! A lot of interest in this one. I¹ve > > attached the source code is attached. There are other supporting > > modules/classes but the main flink component is in the included zip file. > > > > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off the > > website (flink-0.9.1-bin-hadoop1.tgz). > > > > In answer to Ufuk¹s question: Yes I¹m using custom data types. > > > > Thanks, > > Ali > > > > > > > > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <[hidden email]> wrote: > > > > >Thanks for reporting this. Are you using any custom data types? > > > > > >If you can share your code, it would be very helpful in order to debug > > >this. > > > > > > Ufuk > > > > > >On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> wrote: > > > > > >> I agree with Robert. Looks like a bug in Flink. > > >> Maybe an off-by-one issue (violating index is 32768 and the default > > >>memory > > >> segment size is 32KB). > > >> > > >> Which Flink version are you using? > > >> In case you are using a custom build, can you share the commit ID (is > > >> reported in the first lines of the JobManager log file)? > > >> > > >> Thanks, Fabian > > >> > > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] > > >> <javascript:;>>: > > >> > > >> > Hi Ali, > > >> > > > >> > this could be a bug in Flink. > > >> > Can you share the code of your program with us to debug the issue? > > >> > > > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <[hidden email] > > >> <javascript:;>> wrote: > > >> > > > >> > > Hello, > > >> > > > > >> > > I¹m getting this error while running a streaming module on a > cluster > > >> of 3 > > >> > > nodes: > > >> > > > > >> > > > > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 > > >> > > > > >> > > at > > >> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > > > >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann > > >>ingRecordDeserializer.java:214) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > > > >>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapt > > >>iveSpanningRecordDeserializer.java:219) > > >> > > > > >> > > at > > >>org.apache.flink.types.StringValue.readString(StringValue.java:764) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > > >>tringSerializer.java:68) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > > >>tringSerializer.java:73) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > > >>tringSerializer.java:28) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > > >>joSerializer.java:499) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > >> > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > > >>serialize(StreamRecordSerializer.java:102) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > >> > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > > >>serialize(StreamRecordSerializer.java:29) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > > >>singDeserializationDelegate.java:57) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > > > >>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeser > > >>ializer.java:110) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > > >>xtRecord(StreamingAbstractRecordReader.java:80) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > > >>treamingMutableRecordReader.java:36) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > > >>r.java:59) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > > >>nputStreamTask.java:68) > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > > >>utStreamTask.java:101) > > >> > > > > >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > >> > > > > >> > > at java.lang.Thread.run(Thread.java:745) > > >> > > > > >> > > > > >> > > Here¹s the configuration for each node: > > >> > > > > >> > > > > >> > > jobmanager.heap.mb: 2048 > > >> > > > > >> > > taskmanager.heap.mb: 4096 > > >> > > > > >> > > taskmanager.numberOfTaskSlots: 5 > > >> > > > > >> > > > > >> > > I¹m not even sure where to start with this one so any help is > > >> > appreciated. > > >> > > > > >> > > > > >> > > Thanks, > > >> > > > > >> > > Ali > > >> > > > > >> > > > >> > > > > > |
Hi Ali,
I looked into this issue. This problem seems to be caused because the deserializer reads more data than it should read. This might happen because of two reasons: 1) the meta information of how much data is safe to read is incorrect. 2) the serializer and deserializer logic are not in sync which can cause the deserializer to read more data than the serializer wrote. The first case is less likely: Flink writes the binary length of each record in front of its serialized representation. This happens whenever data is sent over the network, regardless of the data type. A bug in this part would be very crucial, but is also less likely because this happens very often and has not occurred yet. IMO, this looks like an issue of the serialization logic. Looking at your code, the problem occurs when deserializing ProtocolEvent objects. Is it possible that you share this class with me? If it is not possible to share the class, it would be good, to know the field types of the Pojo and the associated TypeInformation. For that you can run the code in this gist [1] which will recursively print the field types and their TypeInformation. As a temporal workaround, you can try to use Kryo to serialize and deserialize your Pojos as follows: ExecutionEnvironment env = ... env.getConfig().enableForceKryo(); Best, Fabian [1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b 2015-11-11 10:38 GMT+01:00 Fabian Hueske <[hidden email]>: > Hi Ali, > > one more thing. Did that error occur once or is it reproducable? > > Thanks for your help, > Fabian > > 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <[hidden email]>: > >> Hey Ali, >> >> thanks for sharing the code. I assume that the custom >> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They >> should not be a problem. I think this is a bug in Flink 0.9.1. >> >> Is it possible to re-run your program with the upcoming 0.10.0 (RC8) >> version and report back? >> >> 1) Add >> https://repository.apache.org/content/repositories/orgapacheflink-1055 >> as a >> snapshot repository >> >> <repositories> >> <repository> >> <id>apache.snapshots</id> >> <name>Apache Development Snapshot Repository</name> >> <url> >> https://repository.apache.org/content/repositories/orgapacheflink-1055 >> </url> >> <releases> >> <enabled>false</enabled> >> </releases> >> <snapshots> >> <enabled>true</enabled> >> </snapshots> >> </repository> >> </repositories> >> >> 2) Set the Flink dependency version to 0.10.0 >> >> 3) Use the Flink binary matching your Hadoop installation from here: >> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, you >> can go with the Scala 2.10 builds) >> >> Sorry for the inconvenience! The release is about to be finished (the >> voting process is already going on). >> >> – Ufuk >> >> >> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <[hidden email]> >> wrote: >> >> > Thanks for the quick reply guys! A lot of interest in this one. I¹ve >> > attached the source code is attached. There are other supporting >> > modules/classes but the main flink component is in the included zip >> file. >> > >> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off >> the >> > website (flink-0.9.1-bin-hadoop1.tgz). >> > >> > In answer to Ufuk¹s question: Yes I¹m using custom data types. >> > >> > Thanks, >> > Ali >> > >> > >> > >> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <[hidden email]> wrote: >> > >> > >Thanks for reporting this. Are you using any custom data types? >> > > >> > >If you can share your code, it would be very helpful in order to debug >> > >this. >> > > >> > > Ufuk >> > > >> > >On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> wrote: >> > > >> > >> I agree with Robert. Looks like a bug in Flink. >> > >> Maybe an off-by-one issue (violating index is 32768 and the default >> > >>memory >> > >> segment size is 32KB). >> > >> >> > >> Which Flink version are you using? >> > >> In case you are using a custom build, can you share the commit ID (is >> > >> reported in the first lines of the JobManager log file)? >> > >> >> > >> Thanks, Fabian >> > >> >> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] >> > >> <javascript:;>>: >> > >> >> > >> > Hi Ali, >> > >> > >> > >> > this could be a bug in Flink. >> > >> > Can you share the code of your program with us to debug the issue? >> > >> > >> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <[hidden email] >> > >> <javascript:;>> wrote: >> > >> > >> > >> > > Hello, >> > >> > > >> > >> > > I¹m getting this error while running a streaming module on a >> cluster >> > >> of 3 >> > >> > > nodes: >> > >> > > >> > >> > > >> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 >> > >> > > >> > >> > > at >> > >> >> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >> > >> >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann >> > >>ingRecordDeserializer.java:214) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >> > >> >>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapt >> > >>iveSpanningRecordDeserializer.java:219) >> > >> > > >> > >> > > at >> > >>org.apache.flink.types.StringValue.readString(StringValue.java:764) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S >> > >>tringSerializer.java:68) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S >> > >>tringSerializer.java:73) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S >> > >>tringSerializer.java:28) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po >> > >>joSerializer.java:499) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >> > >> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >> > >>serialize(StreamRecordSerializer.java:102) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >> > >> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >> > >>serialize(StreamRecordSerializer.java:29) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu >> > >>singDeserializationDelegate.java:57) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >> > >> >>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeser >> > >>ializer.java:110) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe >> > >>xtRecord(StreamingAbstractRecordReader.java:80) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S >> > >>treamingMutableRecordReader.java:36) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato >> > >>r.java:59) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI >> > >>nputStreamTask.java:68) >> > >> > > >> > >> > > at >> > >> > > >> > >> > >> > >> >> > >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp >> > >>utStreamTask.java:101) >> > >> > > >> > >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> > >> > > >> > >> > > at java.lang.Thread.run(Thread.java:745) >> > >> > > >> > >> > > >> > >> > > Here¹s the configuration for each node: >> > >> > > >> > >> > > >> > >> > > jobmanager.heap.mb: 2048 >> > >> > > >> > >> > > taskmanager.heap.mb: 4096 >> > >> > > >> > >> > > taskmanager.numberOfTaskSlots: 5 >> > >> > > >> > >> > > >> > >> > > I¹m not even sure where to start with this one so any help is >> > >> > appreciated. >> > >> > > >> > >> > > >> > >> > > Thanks, >> > >> > > >> > >> > > Ali >> > >> > > >> > >> > >> > >> >> > >> > >> > > |
Fabian,
I tried running it again and I noticed there were some more exceptions in the log. I fixed those and I don’t see the original error but I do see other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I didn’t even enable that yet like you suggested). Examples: 1) 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 255 at com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectInt Map.java:364) at com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResol ver.java:47) at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav a:95) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav a:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K ryoSerializer.java:186) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe rializer.java:372) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri alize(StreamRecordSerializer.java:89) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri alize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization Delegate.java:51) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali zer.addRecord(SpanningRecordSerializer.java:76) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit er.java:83) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW riter.java:58) at org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput. java:62) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector Wrapper.java:40) at org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc e.java:40) at com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617) at java.lang.Thread.run(Thread.java:745) 2) 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 334 at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntM ap.java:207) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMa p.java:117) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapRef erenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav a:88) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav a:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K ryoSerializer.java:186) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe rializer.java:372) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri alize(StreamRecordSerializer.java:89) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri alize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization Delegate.java:51) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali zer.addRecord(SpanningRecordSerializer.java:76) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit er.java:83) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW riter.java:58) at org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput. java:62) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector Wrapper.java:40) at org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc e.java:40) at com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617) at java.lang.Thread.run(Thread.java:745) 3) com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 106 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassR esolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java :135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java :21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize (KryoSerializer.java:211) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize (KryoSerializer.java:225) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo Serializer.java:499) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese rialize(StreamRecordSerializer.java:102) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese rialize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi ngDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali zer.java:110) at org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext Record(StreamingAbstractRecordReader.java:80) at org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str eamingMutableRecordReader.java:36) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator. java:59) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp utStreamTask.java:68) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput StreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) 4) java.lang.IllegalArgumentException: You can store only Strings, Integer and Longs in the ProtocolDetailMap, not: 'false' for 'null' at io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100) at io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java :144) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java :21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize (KryoSerializer.java:211) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize (KryoSerializer.java:225) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo Serializer.java:499) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese rialize(StreamRecordSerializer.java:102) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese rialize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi ngDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali zer.java:110) at org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext Record(StreamingAbstractRecordReader.java:80) at org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str eamingMutableRecordReader.java:36) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator. java:59) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp utStreamTask.java:68) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput StreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) 5) java.lang.IndexOutOfBoundsException: Index: 85, Size: 9 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefere nceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java :135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java :21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize (KryoSerializer.java:211) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize (KryoSerializer.java:225) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo Serializer.java:499) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese rialize(StreamRecordSerializer.java:102) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese rialize(StreamRecordSerializer.java:29) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi ngDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali zer.java:110) at org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext Record(StreamingAbstractRecordReader.java:80) at org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str eamingMutableRecordReader.java:36) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator. java:59) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp utStreamTask.java:68) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput StreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) As you can tell, there’s a common theme there which is the MapSerializer in Kryo. The source of my grief seems to be the two java.util.Map implementations ProtocolAttributeMap and ProtocolDetailMap. They’re custom implementations and they’re anal about what types of objects you can have as values. Here’s the output of the gist you asked me to run: class org.apache.flink.api.java.typeutils.PojoTypeInfo : class io.pivotal.rti.protocols.ProtocolEvent ( class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class java.lang.Long class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class java.lang.Long class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class java.lang.Long class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String class org.apache.flink.api.java.typeutils.GenericTypeInfo : class io.pivotal.rti.protocols.ProtocolAttributeMap class org.apache.flink.api.java.typeutils.GenericTypeInfo : class io.pivotal.rti.protocols.ProtocolDetailMap class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class java.lang.Short class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class java.lang.Short class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class java.lang.String ) Right now, I’m using the gson library to convert the ProtocolEvent instance to JSON and back. I think I have to write a custom converter to make ProtocolEvent a proper POJO in order for it to work with the serializers in Flink. Sorry for the long email. Thanks, Ali On 2015-11-11, 10:02 AM, "Fabian Hueske" <[hidden email]> wrote: >Hi Ali, > >I looked into this issue. This problem seems to be caused because the >deserializer reads more data than it should read. >This might happen because of two reasons: > 1) the meta information of how much data is safe to read is incorrect. > 2) the serializer and deserializer logic are not in sync which can cause >the deserializer to read more data than the serializer wrote. > >The first case is less likely: Flink writes the binary length of each >record in front of its serialized representation. This happens whenever >data is sent over the network, regardless of the data type. A bug in this >part would be very crucial, but is also less likely because this happens >very often and has not occurred yet. > >IMO, this looks like an issue of the serialization logic. Looking at your >code, the problem occurs when deserializing ProtocolEvent objects. >Is it possible that you share this class with me? > >If it is not possible to share the class, it would be good, to know the >field types of the Pojo and the associated TypeInformation. >For that you can run the code in this gist [1] which will recursively >the field types and their TypeInformation. > >As a temporal workaround, you can try to use Kryo to serialize and >deserialize your Pojos as follows: >ExecutionEnvironment env = ... >env.getConfig().enableForceKryo(); > >Best, >Fabian > >[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b > >2015-11-11 10:38 GMT+01:00 Fabian Hueske <[hidden email]>: > >> Hi Ali, >> >> one more thing. Did that error occur once or is it reproducable? >> >> Thanks for your help, >> Fabian >> >> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <[hidden email]>: >> >>> Hey Ali, >>> >>> thanks for sharing the code. I assume that the custom >>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They >>> should not be a problem. I think this is a bug in Flink 0.9.1. >>> >>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8) >>> version and report back? >>> >>> 1) Add >>> https://repository.apache.org/content/repositories/orgapacheflink-1055 >>> as a >>> snapshot repository >>> >>> <repositories> >>> <repository> >>> <id>apache.snapshots</id> >>> <name>Apache Development Snapshot Repository</name> >>> <url> >>> https://repository.apache.org/content/repositories/orgapacheflink-1055 >>> </url> >>> <releases> >>> <enabled>false</enabled> >>> </releases> >>> <snapshots> >>> <enabled>true</enabled> >>> </snapshots> >>> </repository> >>> </repositories> >>> >>> 2) Set the Flink dependency version to 0.10.0 >>> >>> 3) Use the Flink binary matching your Hadoop installation from here: >>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, >>>you >>> can go with the Scala 2.10 builds) >>> >>> Sorry for the inconvenience! The release is about to be finished (the >>> voting process is already going on). >>> >>> Ufuk >>> >>> >>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <[hidden email]> >>> wrote: >>> >>> > Thanks for the quick reply guys! A lot of interest in this one. I¹ve >>> > attached the source code is attached. There are other supporting >>> > modules/classes but the main flink component is in the included zip >>> file. >>> > >>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off >>> the >>> > website (flink-0.9.1-bin-hadoop1.tgz). >>> > >>> > In answer to Ufuk¹s question: Yes I¹m using custom data types. >>> > >>> > Thanks, >>> > Ali >>> > >>> > >>> > >>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <[hidden email]> wrote: >>> > >>> > >Thanks for reporting this. Are you using any custom data types? >>> > > >>> > >If you can share your code, it would be very helpful in order to >>>debug >>> > >this. >>> > > >>> > > Ufuk >>> > > >>> > >On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> >>>wrote: >>> > > >>> > >> I agree with Robert. Looks like a bug in Flink. >>> > >> Maybe an off-by-one issue (violating index is 32768 and the >>>default >>> > >>memory >>> > >> segment size is 32KB). >>> > >> >>> > >> Which Flink version are you using? >>> > >> In case you are using a custom build, can you share the commit ID >>>(is >>> > >> reported in the first lines of the JobManager log file)? >>> > >> >>> > >> Thanks, Fabian >>> > >> >>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] >>> > >> <javascript:;>>: >>> > >> >>> > >> > Hi Ali, >>> > >> > >>> > >> > this could be a bug in Flink. >>> > >> > Can you share the code of your program with us to debug the >>>issue? >>> > >> > >>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali >>><[hidden email] >>> > >> <javascript:;>> wrote: >>> > >> > >>> > >> > > Hello, >>> > >> > > >>> > >> > > I¹m getting this error while running a streaming module on a >>> cluster >>> > >> of 3 >>> > >> > > nodes: >>> > >> > > >>> > >> > > >>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 >>> > >> > > >>> > >> > > at >>> > >> >>> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive >>>>>Spa >>> > >>> >>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSp >>>>>ann >>> > >>ingRecordDeserializer.java:214) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive >>>>>Spa >>> > >>> >>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAd >>>>>apt >>> > >>iveSpanningRecordDeserializer.java:219) >>> > >> > > >>> > >> > > at >>> > >>org.apache.flink.types.StringValue.readString(StringValue.java:764) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ >>>>>e(S >>> > >>tringSerializer.java:68) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ >>>>>e(S >>> > >>tringSerializer.java:73) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ >>>>>e(S >>> > >>tringSerializer.java:28) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize >>>>>(Po >>> > >>joSerializer.java:499) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >> >>> > >>> >>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.d >>>e >>> > >>serialize(StreamRecordSerializer.java:102) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >> >>> > >>> >>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.d >>>e >>> > >>serialize(StreamRecordSerializer.java:29) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read( >>>>>Reu >>> > >>singDeserializationDelegate.java:57) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive >>>>>Spa >>> > >>> >>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDe >>>>>ser >>> > >>ializer.java:110) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge >>>>>tNe >>> > >>xtRecord(StreamingAbstractRecordReader.java:80) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.nex >>>>>t(S >>> > >>treamingMutableRecordReader.java:36) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter >>>>>ato >>> > >>r.java:59) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(O >>>>>neI >>> > >>nputStreamTask.java:68) >>> > >> > > >>> > >> > > at >>> > >> > > >>> > >> > >>> > >> >>> > >>> >>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(One >>>>>Inp >>> > >>utStreamTask.java:101) >>> > >> > > >>> > >> > > at >>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> > >> > > >>> > >> > > at java.lang.Thread.run(Thread.java:745) >>> > >> > > >>> > >> > > >>> > >> > > Here¹s the configuration for each node: >>> > >> > > >>> > >> > > >>> > >> > > jobmanager.heap.mb: 2048 >>> > >> > > >>> > >> > > taskmanager.heap.mb: 4096 >>> > >> > > >>> > >> > > taskmanager.numberOfTaskSlots: 5 >>> > >> > > >>> > >> > > >>> > >> > > I¹m not even sure where to start with this one so any help is >>> > >> > appreciated. >>> > >> > > >>> > >> > > >>> > >> > > Thanks, >>> > >> > > >>> > >> > > Ali >>> > >> > > >>> > >> > >>> > >> >>> > >>> > >>> >> >> |
Hi Ali,
Flink uses different serializers for different data types. For example, (boxed) primitives are serialized using dedicated serializers (IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is recognized as a Pojo type and therefore serialized using Flink's PojoSerializer. Types that cannot be (fully) analyzed are handled as GenericTypes and serialized using Flink's KryoSerializer. By forcing Kryo serialization as I suggested before, Pojo types (such as ProtocolEvent) will be serialized with Kryo instead of Flink's PojoSerializer. Hence, forcing Kryo only affects Pojo types. GenericTypes (such as ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo (also without forcing it). The exceptions you are facing might be caused by a bug in the KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug basically corrupts the stream of serialized data and might very well also be responsible for the original exception you posted. As you see from the JIRA issue, a bug fix was merged to all active branches however it is not yet contained in an official release. I would recommend you to try the latest candidate of the upcoming 0.10 release [2] or build Flink from the 0.9-release branch [3]. Please let me know if you have any questions or still facing problems when switching to version with a fix for FLINK-2800. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-2800 [2] http://people.apache.org/~mxm/flink-0.10.0-rc8/ [3] https://github.com/apache/flink/tree/release-0.9 2015-11-11 17:20 GMT+01:00 Kashmar, Ali <[hidden email]>: > Fabian, > > I tried running it again and I noticed there were some more exceptions in > the log. I fixed those and I don’t see the original error but I do see > other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I didn’t > even enable that yet like you suggested). Examples: > > 1) > > 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput > - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 255 > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectInt > Map.java:364) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResol > ver.java:47) > at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav > a:95) > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav > a:21) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K > ryoSerializer.java:186) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe > rializer.java:372) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri > alize(StreamRecordSerializer.java:89) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri > alize(StreamRecordSerializer.java:29) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization > Delegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali > zer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit > er.java:83) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW > riter.java:58) > at > org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput. > java:62) > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector > Wrapper.java:40) > at > org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc > e.java:40) > at > com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 > 142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: > 617) > at java.lang.Thread.run(Thread.java:745) > > > > 2) > 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput > - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: 334 > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntM > ap.java:207) > at > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMa > p.java:117) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapRef > erenceResolver.java:23) > at > com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594) > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav > a:88) > at > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.jav > a:21) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(K > ryoSerializer.java:186) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSe > rializer.java:372) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri > alize(StreamRecordSerializer.java:89) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.seri > alize(StreamRecordSerializer.java:29) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(Serialization > Delegate.java:51) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeriali > zer.addRecord(SpanningRecordSerializer.java:76) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrit > er.java:83) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordW > riter.java:58) > at > org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutput. > java:62) > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collector > Wrapper.java:40) > at > org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSourc > e.java:40) > at > com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 > 142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: > 617) > at java.lang.Thread.run(Thread.java:745) > > 3) > > com.esotericsoftware.kryo.KryoException: Encountered unregistered class > ID: 106 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassR > esolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java > :135) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java > :21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize > (KryoSerializer.java:211) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize > (KryoSerializer.java:225) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo > Serializer.java:499) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese > rialize(StreamRecordSerializer.java:102) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese > rialize(StreamRecordSerializer.java:29) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi > ngDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann > ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali > zer.java:110) > at > org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext > Record(StreamingAbstractRecordReader.java:80) > at > org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str > eamingMutableRecordReader.java:36) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator. > java:59) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp > utStreamTask.java:68) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput > StreamTask.java:101) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > 4) > > java.lang.IllegalArgumentException: You can store only Strings, Integer > and Longs in the ProtocolDetailMap, not: 'false' for 'null' > at > io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100) > at > io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java > :144) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java > :21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize > (KryoSerializer.java:211) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize > (KryoSerializer.java:225) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo > Serializer.java:499) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese > rialize(StreamRecordSerializer.java:102) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese > rialize(StreamRecordSerializer.java:29) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi > ngDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann > ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali > zer.java:110) > at > org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext > Record(StreamingAbstractRecordReader.java:80) > at > org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str > eamingMutableRecordReader.java:36) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator. > java:59) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp > utStreamTask.java:68) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput > StreamTask.java:101) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > 5) > > > java.lang.IndexOutOfBoundsException: Index: 85, Size: 9 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefere > nceResolver.java:42) > at > com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java > :135) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java > :21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize > (KryoSerializer.java:211) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize > (KryoSerializer.java:225) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Pojo > Serializer.java:499) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese > rialize(StreamRecordSerializer.java:102) > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.dese > rialize(StreamRecordSerializer.java:29) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reusi > ngDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpann > ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali > zer.java:110) > at > org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNext > Record(StreamingAbstractRecordReader.java:80) > at > org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(Str > eamingMutableRecordReader.java:36) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator. > java:59) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInp > utStreamTask.java:68) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInput > StreamTask.java:101) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > > > As you can tell, there’s a common theme there which is the MapSerializer > in Kryo. The source of my grief seems to be the two java.util.Map > implementations ProtocolAttributeMap and ProtocolDetailMap. They’re custom > implementations and they’re anal about what types of objects you can have > as values. > > > Here’s the output of the gist you asked me to run: > > class org.apache.flink.api.java.typeutils.PojoTypeInfo : class > io.pivotal.rti.protocols.ProtocolEvent > ( > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > java.lang.Long > class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > java.lang.Long > class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > java.lang.Long > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > class org.apache.flink.api.java.typeutils.GenericTypeInfo : class > io.pivotal.rti.protocols.ProtocolAttributeMap > class org.apache.flink.api.java.typeutils.GenericTypeInfo : class > io.pivotal.rti.protocols.ProtocolDetailMap > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > java.lang.Short > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > java.lang.Short > class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > java.lang.String > ) > > > Right now, I’m using the gson library to convert the ProtocolEvent > instance to JSON and back. I think I have to write a custom converter to > make ProtocolEvent a proper POJO in order for it to work with the > serializers in Flink. > > Sorry for the long email. > > Thanks, > Ali > > > On 2015-11-11, 10:02 AM, "Fabian Hueske" <[hidden email]> wrote: > > >Hi Ali, > > > >I looked into this issue. This problem seems to be caused because the > >deserializer reads more data than it should read. > >This might happen because of two reasons: > > 1) the meta information of how much data is safe to read is incorrect. > > 2) the serializer and deserializer logic are not in sync which can cause > >the deserializer to read more data than the serializer wrote. > > > >The first case is less likely: Flink writes the binary length of each > >record in front of its serialized representation. This happens whenever > >data is sent over the network, regardless of the data type. A bug in this > >part would be very crucial, but is also less likely because this happens > >very often and has not occurred yet. > > > >IMO, this looks like an issue of the serialization logic. Looking at your > >code, the problem occurs when deserializing ProtocolEvent objects. > >Is it possible that you share this class with me? > > > >If it is not possible to share the class, it would be good, to know the > >field types of the Pojo and the associated TypeInformation. > >For that you can run the code in this gist [1] which will recursively > >the field types and their TypeInformation. > > > >As a temporal workaround, you can try to use Kryo to serialize and > >deserialize your Pojos as follows: > >ExecutionEnvironment env = ... > >env.getConfig().enableForceKryo(); > > > >Best, > >Fabian > > > >[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b > > > >2015-11-11 10:38 GMT+01:00 Fabian Hueske <[hidden email]>: > > > >> Hi Ali, > >> > >> one more thing. Did that error occur once or is it reproducable? > >> > >> Thanks for your help, > >> Fabian > >> > >> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <[hidden email]>: > >> > >>> Hey Ali, > >>> > >>> thanks for sharing the code. I assume that the custom > >>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They > >>> should not be a problem. I think this is a bug in Flink 0.9.1. > >>> > >>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8) > >>> version and report back? > >>> > >>> 1) Add > >>> https://repository.apache.org/content/repositories/orgapacheflink-1055 > >>> as a > >>> snapshot repository > >>> > >>> <repositories> > >>> <repository> > >>> <id>apache.snapshots</id> > >>> <name>Apache Development Snapshot Repository</name> > >>> <url> > >>> https://repository.apache.org/content/repositories/orgapacheflink-1055 > >>> </url> > >>> <releases> > >>> <enabled>false</enabled> > >>> </releases> > >>> <snapshots> > >>> <enabled>true</enabled> > >>> </snapshots> > >>> </repository> > >>> </repositories> > >>> > >>> 2) Set the Flink dependency version to 0.10.0 > >>> > >>> 3) Use the Flink binary matching your Hadoop installation from here: > >>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, > >>>you > >>> can go with the Scala 2.10 builds) > >>> > >>> Sorry for the inconvenience! The release is about to be finished (the > >>> voting process is already going on). > >>> > >>> Ufuk > >>> > >>> > >>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <[hidden email]> > >>> wrote: > >>> > >>> > Thanks for the quick reply guys! A lot of interest in this one. I¹ve > >>> > attached the source code is attached. There are other supporting > >>> > modules/classes but the main flink component is in the included zip > >>> file. > >>> > > >>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off > >>> the > >>> > website (flink-0.9.1-bin-hadoop1.tgz). > >>> > > >>> > In answer to Ufuk¹s question: Yes I¹m using custom data types. > >>> > > >>> > Thanks, > >>> > Ali > >>> > > >>> > > >>> > > >>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <[hidden email]> wrote: > >>> > > >>> > >Thanks for reporting this. Are you using any custom data types? > >>> > > > >>> > >If you can share your code, it would be very helpful in order to > >>>debug > >>> > >this. > >>> > > > >>> > > Ufuk > >>> > > > >>> > >On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> > >>>wrote: > >>> > > > >>> > >> I agree with Robert. Looks like a bug in Flink. > >>> > >> Maybe an off-by-one issue (violating index is 32768 and the > >>>default > >>> > >>memory > >>> > >> segment size is 32KB). > >>> > >> > >>> > >> Which Flink version are you using? > >>> > >> In case you are using a custom build, can you share the commit ID > >>>(is > >>> > >> reported in the first lines of the JobManager log file)? > >>> > >> > >>> > >> Thanks, Fabian > >>> > >> > >>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] > >>> > >> <javascript:;>>: > >>> > >> > >>> > >> > Hi Ali, > >>> > >> > > >>> > >> > this could be a bug in Flink. > >>> > >> > Can you share the code of your program with us to debug the > >>>issue? > >>> > >> > > >>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali > >>><[hidden email] > >>> > >> <javascript:;>> wrote: > >>> > >> > > >>> > >> > > Hello, > >>> > >> > > > >>> > >> > > I¹m getting this error while running a streaming module on a > >>> cluster > >>> > >> of 3 > >>> > >> > > nodes: > >>> > >> > > > >>> > >> > > > >>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 > >>> > >> > > > >>> > >> > > at > >>> > >> > >>> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive > >>>>>Spa > >>> > > >>> > >>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSp > >>>>>ann > >>> > >>ingRecordDeserializer.java:214) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive > >>>>>Spa > >>> > > >>> > >>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAd > >>>>>apt > >>> > >>iveSpanningRecordDeserializer.java:219) > >>> > >> > > > >>> > >> > > at > >>> > >>org.apache.flink.types.StringValue.readString(StringValue.java:764) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > >>>>>e(S > >>> > >>tringSerializer.java:68) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > >>>>>e(S > >>> > >>tringSerializer.java:73) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > >>>>>e(S > >>> > >>tringSerializer.java:28) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize > >>>>>(Po > >>> > >>joSerializer.java:499) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > >> > >>> > > >>> > >>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.d > >>>e > >>> > >>serialize(StreamRecordSerializer.java:102) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > >> > >>> > > >>> > >>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.d > >>>e > >>> > >>serialize(StreamRecordSerializer.java:29) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read( > >>>>>Reu > >>> > >>singDeserializationDelegate.java:57) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive > >>>>>Spa > >>> > > >>> > >>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDe > >>>>>ser > >>> > >>ializer.java:110) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>> > org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge > >>>>>tNe > >>> > >>xtRecord(StreamingAbstractRecordReader.java:80) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.nex > >>>>>t(S > >>> > >>treamingMutableRecordReader.java:36) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter > >>>>>ato > >>> > >>r.java:59) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(O > >>>>>neI > >>> > >>nputStreamTask.java:68) > >>> > >> > > > >>> > >> > > at > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > >>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(One > >>>>>Inp > >>> > >>utStreamTask.java:101) > >>> > >> > > > >>> > >> > > at > >>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >>> > >> > > > >>> > >> > > at java.lang.Thread.run(Thread.java:745) > >>> > >> > > > >>> > >> > > > >>> > >> > > Here¹s the configuration for each node: > >>> > >> > > > >>> > >> > > > >>> > >> > > jobmanager.heap.mb: 2048 > >>> > >> > > > >>> > >> > > taskmanager.heap.mb: 4096 > >>> > >> > > > >>> > >> > > taskmanager.numberOfTaskSlots: 5 > >>> > >> > > > >>> > >> > > > >>> > >> > > I¹m not even sure where to start with this one so any help is > >>> > >> > appreciated. > >>> > >> > > > >>> > >> > > > >>> > >> > > Thanks, > >>> > >> > > > >>> > >> > > Ali > >>> > >> > > > >>> > >> > > >>> > >> > >>> > > >>> > > >>> > >> > >> > > |
So the problem wasn’t in Flink after all. It turns out the data I was
receiving at the socket was not complete. So I went back and looked at the way I’m sending data to the socket and realized that the socket is closed before sending all data. I just needed to flush the stream before closing the socket. I don’t see any more serialization errors. Thanks everyone for the help and I apologize if I wasted your time with this. I will stick with 0.9.1 for now but I’ll download and use 0.10 as soon as it’s released. Cheers, Ali On 2015-11-11, 6:00 PM, "Fabian Hueske" <[hidden email]> wrote: >Hi Ali, > >Flink uses different serializers for different data types. For example, >(boxed) primitives are serialized using dedicated serializers >(IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is >recognized as a Pojo type and therefore serialized using Flink's >PojoSerializer. >Types that cannot be (fully) analyzed are handled as GenericTypes and >serialized using Flink's KryoSerializer. > >By forcing Kryo serialization as I suggested before, Pojo types (such as >ProtocolEvent) will be serialized with Kryo instead of Flink's >PojoSerializer. >Hence, forcing Kryo only affects Pojo types. GenericTypes (such as >ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo >(also without forcing it). > >The exceptions you are facing might be caused by a bug in the >KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug >basically corrupts the stream of serialized data and might very well also >be responsible for the original exception you posted. As you see from the >JIRA issue, a bug fix was merged to all active branches however it is not >yet contained in an official release. > >I would recommend you to try the latest candidate of the upcoming 0.10 >release [2] or build Flink from the 0.9-release branch [3]. > >Please let me know if you have any questions or still facing problems when >switching to version with a fix for FLINK-2800. > >Best, Fabian > >[1] https://issues.apache.org/jira/browse/FLINK-2800 >[2] http://people.apache.org/~mxm/flink-0.10.0-rc8/ >[3] https://github.com/apache/flink/tree/release-0.9 > >2015-11-11 17:20 GMT+01:00 Kashmar, Ali <[hidden email]>: > >> Fabian, >> >> I tried running it again and I noticed there were some more exceptions >>in >> the log. I fixed those and I don’t see the original error but I do see >> other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I >>didn’t >> even enable that yet like you suggested). Examples: >> >> 1) >> >> 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput >> - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: >>255 >> at >> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectI >>nt >> Map.java:364) >> at >> >>com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceRes >>ol >> ver.java:47) >> at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j >>av >> a:95) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j >>av >> a:21) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize >>(K >> ryoSerializer.java:186) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo >>Se >> rializer.java:372) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se >>ri >> alize(StreamRecordSerializer.java:89) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se >>ri >> alize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati >>on >> Delegate.java:51) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria >>li >> zer.addRecord(SpanningRecordSerializer.java:76) >> at >> >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr >>it >> er.java:83) >> at >> >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor >>dW >> riter.java:58) >> at >> >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu >>t. >> java:62) >> at >> >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect >>or >> Wrapper.java:40) >> at >> >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou >>rc >> e.java:40) >> at >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java >>:1 >> 142) >> at >> >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav >>a: >> 617) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> 2) >> 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput >> - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: >>334 >> at >> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIn >>tM >> ap.java:207) >> at >> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectInt >>Ma >> p.java:117) >> at >> >>com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapR >>ef >> erenceResolver.java:23) >> at >> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j >>av >> a:88) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j >>av >> a:21) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize >>(K >> ryoSerializer.java:186) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo >>Se >> rializer.java:372) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se >>ri >> alize(StreamRecordSerializer.java:89) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se >>ri >> alize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati >>on >> Delegate.java:51) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria >>li >> zer.addRecord(SpanningRecordSerializer.java:76) >> at >> >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr >>it >> er.java:83) >> at >> >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor >>dW >> riter.java:58) >> at >> >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu >>t. >> java:62) >> at >> >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect >>or >> Wrapper.java:40) >> at >> >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou >>rc >> e.java:40) >> at >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java >>:1 >> 142) >> at >> >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav >>a: >> 617) >> at java.lang.Thread.run(Thread.java:745) >> >> 3) >> >> com.esotericsoftware.kryo.KryoException: Encountered unregistered class >> ID: 106 >> at >> >>com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClas >>sR >> esolver.java:119) >> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :135) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :21) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:211) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:225) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po >>jo >> Serializer.java:499) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:102) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu >>si >> ngDeserializationDelegate.java:57) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nn >> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria >>li >> zer.java:110) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe >>xt >> Record(StreamingAbstractRecordReader.java:80) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S >>tr >> eamingMutableRecordReader.java:36) >> at >> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato >>r. >> java:59) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI >>np >> utStreamTask.java:68) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp >>ut >> StreamTask.java:101) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> 4) >> >> java.lang.IllegalArgumentException: You can store only Strings, Integer >> and Longs in the ProtocolDetailMap, not: 'false' for 'null' >> at >> >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100 >>) >> at >> >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :144) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :21) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:211) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:225) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po >>jo >> Serializer.java:499) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:102) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu >>si >> ngDeserializationDelegate.java:57) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nn >> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria >>li >> zer.java:110) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe >>xt >> Record(StreamingAbstractRecordReader.java:80) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S >>tr >> eamingMutableRecordReader.java:36) >> at >> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato >>r. >> java:59) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI >>np >> utStreamTask.java:68) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp >>ut >> StreamTask.java:101) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> 5) >> >> >> java.lang.IndexOutOfBoundsException: Index: 85, Size: 9 >> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >> at java.util.ArrayList.get(ArrayList.java:429) >> at >> >>com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefe >>re >> nceResolver.java:42) >> at >> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :135) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :21) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:211) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:225) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po >>jo >> Serializer.java:499) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:102) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu >>si >> ngDeserializationDelegate.java:57) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nn >> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria >>li >> zer.java:110) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe >>xt >> Record(StreamingAbstractRecordReader.java:80) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S >>tr >> eamingMutableRecordReader.java:36) >> at >> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato >>r. >> java:59) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI >>np >> utStreamTask.java:68) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp >>ut >> StreamTask.java:101) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> As you can tell, there’s a common theme there which is the MapSerializer >> in Kryo. The source of my grief seems to be the two java.util.Map >> implementations ProtocolAttributeMap and ProtocolDetailMap. They’re >>custom >> implementations and they’re anal about what types of objects you can >>have >> as values. >> >> >> Here’s the output of the gist you asked me to run: >> >> class org.apache.flink.api.java.typeutils.PojoTypeInfo : class >> io.pivotal.rti.protocols.ProtocolEvent >> ( >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Long >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Long >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Long >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class >> io.pivotal.rti.protocols.ProtocolAttributeMap >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class >> io.pivotal.rti.protocols.ProtocolDetailMap >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Short >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Short >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> ) >> >> >> Right now, I’m using the gson library to convert the ProtocolEvent >> instance to JSON and back. I think I have to write a custom converter to >> make ProtocolEvent a proper POJO in order for it to work with the >> serializers in Flink. >> >> Sorry for the long email. >> >> Thanks, >> Ali >> >> >> On 2015-11-11, 10:02 AM, "Fabian Hueske" <[hidden email]> wrote: >> >> >Hi Ali, >> > >> >I looked into this issue. This problem seems to be caused because the >> >deserializer reads more data than it should read. >> >This might happen because of two reasons: >> > 1) the meta information of how much data is safe to read is >>incorrect. >> > 2) the serializer and deserializer logic are not in sync which can >>cause >> >the deserializer to read more data than the serializer wrote. >> > >> >The first case is less likely: Flink writes the binary length of each >> >record in front of its serialized representation. This happens whenever >> >data is sent over the network, regardless of the data type. A bug in >>this >> >part would be very crucial, but is also less likely because this >>happens >> >very often and has not occurred yet. >> > >> >IMO, this looks like an issue of the serialization logic. Looking at >>your >> >code, the problem occurs when deserializing ProtocolEvent objects. >> >Is it possible that you share this class with me? >> > >> >If it is not possible to share the class, it would be good, to know the >> >field types of the Pojo and the associated TypeInformation. >> >For that you can run the code in this gist [1] which will recursively >> >the field types and their TypeInformation. >> > >> >As a temporal workaround, you can try to use Kryo to serialize and >> >deserialize your Pojos as follows: >> >ExecutionEnvironment env = ... >> >env.getConfig().enableForceKryo(); >> > >> >Best, >> >Fabian >> > >> >[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b >> > >> >2015-11-11 10:38 GMT+01:00 Fabian Hueske <[hidden email]>: >> > >> >> Hi Ali, >> >> >> >> one more thing. Did that error occur once or is it reproducable? >> >> >> >> Thanks for your help, >> >> Fabian >> >> >> >> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <[hidden email]>: >> >> >> >>> Hey Ali, >> >>> >> >>> thanks for sharing the code. I assume that the custom >> >>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. >>They >> >>> should not be a problem. I think this is a bug in Flink 0.9.1. >> >>> >> >>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8) >> >>> version and report back? >> >>> >> >>> 1) Add >> >>> >>https://repository.apache.org/content/repositories/orgapacheflink-1055 >> >>> as a >> >>> snapshot repository >> >>> >> >>> <repositories> >> >>> <repository> >> >>> <id>apache.snapshots</id> >> >>> <name>Apache Development Snapshot Repository</name> >> >>> <url> >> >>> >>https://repository.apache.org/content/repositories/orgapacheflink-1055 >> >>> </url> >> >>> <releases> >> >>> <enabled>false</enabled> >> >>> </releases> >> >>> <snapshots> >> >>> <enabled>true</enabled> >> >>> </snapshots> >> >>> </repository> >> >>> </repositories> >> >>> >> >>> 2) Set the Flink dependency version to 0.10.0 >> >>> >> >>> 3) Use the Flink binary matching your Hadoop installation from here: >> >>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, >> >>>you >> >>> can go with the Scala 2.10 builds) >> >>> >> >>> Sorry for the inconvenience! The release is about to be finished >>(the >> >>> voting process is already going on). >> >>> >> >>> Ufuk >> >>> >> >>> >> >>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <[hidden email]> >> >>> wrote: >> >>> >> >>> > Thanks for the quick reply guys! A lot of interest in this one. >>I¹ve >> >>> > attached the source code is attached. There are other supporting >> >>> > modules/classes but the main flink component is in the included >>zip >> >>> file. >> >>> > >> >>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right >>off >> >>> the >> >>> > website (flink-0.9.1-bin-hadoop1.tgz). >> >>> > >> >>> > In answer to Ufuk¹s question: Yes I¹m using custom data types. >> >>> > >> >>> > Thanks, >> >>> > Ali >> >>> > >> >>> > >> >>> > >> >>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <[hidden email]> wrote: >> >>> > >> >>> > >Thanks for reporting this. Are you using any custom data types? >> >>> > > >> >>> > >If you can share your code, it would be very helpful in order to >> >>>debug >> >>> > >this. >> >>> > > >> >>> > > Ufuk >> >>> > > >> >>> > >On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> >> >>>wrote: >> >>> > > >> >>> > >> I agree with Robert. Looks like a bug in Flink. >> >>> > >> Maybe an off-by-one issue (violating index is 32768 and the >> >>>default >> >>> > >>memory >> >>> > >> segment size is 32KB). >> >>> > >> >> >>> > >> Which Flink version are you using? >> >>> > >> In case you are using a custom build, can you share the commit >>ID >> >>>(is >> >>> > >> reported in the first lines of the JobManager log file)? >> >>> > >> >> >>> > >> Thanks, Fabian >> >>> > >> >> >>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] >> >>> > >> <javascript:;>>: >> >>> > >> >> >>> > >> > Hi Ali, >> >>> > >> > >> >>> > >> > this could be a bug in Flink. >> >>> > >> > Can you share the code of your program with us to debug the >> >>>issue? >> >>> > >> > >> >>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali >> >>><[hidden email] >> >>> > >> <javascript:;>> wrote: >> >>> > >> > >> >>> > >> > > Hello, >> >>> > >> > > >> >>> > >> > > I¹m getting this error while running a streaming module on >>a >> >>> cluster >> >>> > >> of 3 >> >>> > >> > > nodes: >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 >> >>> > >> > > >> >>> > >> > > at >> >>> > >> >> >>> >>org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti >>>>>>>ve >> >>>>>Spa >> >>> > >> >>> >> >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptive >>>>>>>Sp >> >>>>>ann >> >>> > >>ingRecordDeserializer.java:214) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti >>>>>>>ve >> >>>>>Spa >> >>> > >> >>> >> >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(Spilling >>>>>>>Ad >> >>>>>apt >> >>> > >>iveSpanningRecordDeserializer.java:219) >> >>> > >> > > >> >>> > >> > > at >> >>> > >>>>org.apache.flink.types.StringValue.readString(StringValue.java:764) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial >>>>>>>iz >> >>>>>e(S >> >>> > >>tringSerializer.java:68) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial >>>>>>>iz >> >>>>>e(S >> >>> > >>tringSerializer.java:73) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial >>>>>>>iz >> >>>>>e(S >> >>> > >>tringSerializer.java:28) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deseriali >>>>>>>ze >> >>>>>(Po >> >>> > >>joSerializer.java:499) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >> >>> > >> >>> >> >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer >>>>>.d >> >>>e >> >>> > >>serialize(StreamRecordSerializer.java:102) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >> >>> > >> >>> >> >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer >>>>>.d >> >>>e >> >>> > >>serialize(StreamRecordSerializer.java:29) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea >>>>>>>d( >> >>>>>Reu >> >>> > >>singDeserializationDelegate.java:57) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti >>>>>>>ve >> >>>>>Spa >> >>> > >> >>> >> >>>>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecord >>>>>>>De >> >>>>>ser >> >>> > >>ializer.java:110) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>> >> org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge >> >>>>>tNe >> >>> > >>xtRecord(StreamingAbstractRecordReader.java:80) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.n >>>>>>>ex >> >>>>>t(S >> >>> > >>treamingMutableRecordReader.java:36) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt >>>>>>>er >> >>>>>ato >> >>> > >>r.java:59) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext >>>>>>>(O >> >>>>>neI >> >>> > >>nputStreamTask.java:68) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(O >>>>>>>ne >> >>>>>Inp >> >>> > >>utStreamTask.java:101) >> >>> > >> > > >> >>> > >> > > at >> >>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> >>> > >> > > >> >>> > >> > > at java.lang.Thread.run(Thread.java:745) >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > Here¹s the configuration for each node: >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > jobmanager.heap.mb: 2048 >> >>> > >> > > >> >>> > >> > > taskmanager.heap.mb: 4096 >> >>> > >> > > >> >>> > >> > > taskmanager.numberOfTaskSlots: 5 >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > I¹m not even sure where to start with this one so any help >>is >> >>> > >> > appreciated. >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > Thanks, >> >>> > >> > > >> >>> > >> > > Ali >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> > >> >>> >> >> >> >> >> >> |
Ah, no problem.
Glad you could resolve your problem :-) Thanks for reporting back. Cheers, Fabian 2015-11-12 17:42 GMT+01:00 Kashmar, Ali <[hidden email]>: > So the problem wasn’t in Flink after all. It turns out the data I was > receiving at the socket was not complete. So I went back and looked at the > way I’m sending data to the socket and realized that the socket is closed > before sending all data. I just needed to flush the stream before closing > the socket. I don’t see any more serialization errors. > > Thanks everyone for the help and I apologize if I wasted your time with > this. I will stick with 0.9.1 for now but I’ll download and use 0.10 as > soon as it’s released. > > Cheers, > Ali > > On 2015-11-11, 6:00 PM, "Fabian Hueske" <[hidden email]> wrote: > > >Hi Ali, > > > >Flink uses different serializers for different data types. For example, > >(boxed) primitives are serialized using dedicated serializers > >(IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is > >recognized as a Pojo type and therefore serialized using Flink's > >PojoSerializer. > >Types that cannot be (fully) analyzed are handled as GenericTypes and > >serialized using Flink's KryoSerializer. > > > >By forcing Kryo serialization as I suggested before, Pojo types (such as > >ProtocolEvent) will be serialized with Kryo instead of Flink's > >PojoSerializer. > >Hence, forcing Kryo only affects Pojo types. GenericTypes (such as > >ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo > >(also without forcing it). > > > >The exceptions you are facing might be caused by a bug in the > >KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug > >basically corrupts the stream of serialized data and might very well also > >be responsible for the original exception you posted. As you see from the > >JIRA issue, a bug fix was merged to all active branches however it is not > >yet contained in an official release. > > > >I would recommend you to try the latest candidate of the upcoming 0.10 > >release [2] or build Flink from the 0.9-release branch [3]. > > > >Please let me know if you have any questions or still facing problems when > >switching to version with a fix for FLINK-2800. > > > >Best, Fabian > > > >[1] https://issues.apache.org/jira/browse/FLINK-2800 > >[2] http://people.apache.org/~mxm/flink-0.10.0-rc8/ > >[3] https://github.com/apache/flink/tree/release-0.9 > > > >2015-11-11 17:20 GMT+01:00 Kashmar, Ali <[hidden email]>: > > > >> Fabian, > >> > >> I tried running it again and I noticed there were some more exceptions > >>in > >> the log. I fixed those and I don’t see the original error but I do see > >> other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I > >>didn’t > >> even enable that yet like you suggested). Examples: > >> > >> 1) > >> > >> 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput > >> - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: > >>255 > >> at > >> > >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectI > >>nt > >> Map.java:364) > >> at > >> > >>com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceRes > >>ol > >> ver.java:47) > >> at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) > >> at > >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j > >>av > >> a:95) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j > >>av > >> a:21) > >> at > >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize > >>(K > >> ryoSerializer.java:186) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo > >>Se > >> rializer.java:372) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se > >>ri > >> alize(StreamRecordSerializer.java:89) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se > >>ri > >> alize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati > >>on > >> Delegate.java:51) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria > >>li > >> zer.addRecord(SpanningRecordSerializer.java:76) > >> at > >> > >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr > >>it > >> er.java:83) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor > >>dW > >> riter.java:58) > >> at > >> > >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu > >>t. > >> java:62) > >> at > >> > >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect > >>or > >> Wrapper.java:40) > >> at > >> > >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou > >>rc > >> e.java:40) > >> at > >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) > >> at > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> at > >> > >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java > >>:1 > >> 142) > >> at > >> > >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav > >>a: > >> 617) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> > >> > >> 2) > >> 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput > >> - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: > >>334 > >> at > >> > >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIn > >>tM > >> ap.java:207) > >> at > >> > >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectInt > >>Ma > >> p.java:117) > >> at > >> > >>com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapR > >>ef > >> erenceResolver.java:23) > >> at > >> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629) > >> at > >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j > >>av > >> a:88) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j > >>av > >> a:21) > >> at > >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize > >>(K > >> ryoSerializer.java:186) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo > >>Se > >> rializer.java:372) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se > >>ri > >> alize(StreamRecordSerializer.java:89) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se > >>ri > >> alize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati > >>on > >> Delegate.java:51) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria > >>li > >> zer.addRecord(SpanningRecordSerializer.java:76) > >> at > >> > >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr > >>it > >> er.java:83) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor > >>dW > >> riter.java:58) > >> at > >> > >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu > >>t. > >> java:62) > >> at > >> > >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect > >>or > >> Wrapper.java:40) > >> at > >> > >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou > >>rc > >> e.java:40) > >> at > >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) > >> at > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> at > >> > >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java > >>:1 > >> 142) > >> at > >> > >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav > >>a: > >> 617) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> 3) > >> > >> com.esotericsoftware.kryo.KryoException: Encountered unregistered class > >> ID: 106 > >> at > >> > >>com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClas > >>sR > >> esolver.java:119) > >> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :135) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :21) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:211) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:225) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > >>jo > >> Serializer.java:499) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:102) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > >>si > >> ngDeserializationDelegate.java:57) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nn > >> > >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria > >>li > >> zer.java:110) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > >>xt > >> Record(StreamingAbstractRecordReader.java:80) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > >>tr > >> eamingMutableRecordReader.java:36) > >> at > >> > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > >>r. > >> java:59) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > >>np > >> utStreamTask.java:68) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > >>ut > >> StreamTask.java:101) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> 4) > >> > >> java.lang.IllegalArgumentException: You can store only Strings, Integer > >> and Longs in the ProtocolDetailMap, not: 'false' for 'null' > >> at > >> > >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100 > >>) > >> at > >> > >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :144) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :21) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:211) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:225) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > >>jo > >> Serializer.java:499) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:102) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > >>si > >> ngDeserializationDelegate.java:57) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nn > >> > >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria > >>li > >> zer.java:110) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > >>xt > >> Record(StreamingAbstractRecordReader.java:80) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > >>tr > >> eamingMutableRecordReader.java:36) > >> at > >> > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > >>r. > >> java:59) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > >>np > >> utStreamTask.java:68) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > >>ut > >> StreamTask.java:101) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> 5) > >> > >> > >> java.lang.IndexOutOfBoundsException: Index: 85, Size: 9 > >> at java.util.ArrayList.rangeCheck(ArrayList.java:653) > >> at java.util.ArrayList.get(ArrayList.java:429) > >> at > >> > >>com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefe > >>re > >> nceResolver.java:42) > >> at > >> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :135) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :21) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:211) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:225) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > >>jo > >> Serializer.java:499) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:102) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > >>si > >> ngDeserializationDelegate.java:57) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nn > >> > >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria > >>li > >> zer.java:110) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > >>xt > >> Record(StreamingAbstractRecordReader.java:80) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > >>tr > >> eamingMutableRecordReader.java:36) > >> at > >> > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > >>r. > >> java:59) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > >>np > >> utStreamTask.java:68) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > >>ut > >> StreamTask.java:101) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> > >> > >> As you can tell, there’s a common theme there which is the MapSerializer > >> in Kryo. The source of my grief seems to be the two java.util.Map > >> implementations ProtocolAttributeMap and ProtocolDetailMap. They’re > >>custom > >> implementations and they’re anal about what types of objects you can > >>have > >> as values. > >> > >> > >> Here’s the output of the gist you asked me to run: > >> > >> class org.apache.flink.api.java.typeutils.PojoTypeInfo : class > >> io.pivotal.rti.protocols.ProtocolEvent > >> ( > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Long > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Long > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Long > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class > >> io.pivotal.rti.protocols.ProtocolAttributeMap > >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class > >> io.pivotal.rti.protocols.ProtocolDetailMap > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Short > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Short > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> ) > >> > >> > >> Right now, I’m using the gson library to convert the ProtocolEvent > >> instance to JSON and back. I think I have to write a custom converter to > >> make ProtocolEvent a proper POJO in order for it to work with the > >> serializers in Flink. > >> > >> Sorry for the long email. > >> > >> Thanks, > >> Ali > >> > >> > >> On 2015-11-11, 10:02 AM, "Fabian Hueske" <[hidden email]> wrote: > >> > >> >Hi Ali, > >> > > >> >I looked into this issue. This problem seems to be caused because the > >> >deserializer reads more data than it should read. > >> >This might happen because of two reasons: > >> > 1) the meta information of how much data is safe to read is > >>incorrect. > >> > 2) the serializer and deserializer logic are not in sync which can > >>cause > >> >the deserializer to read more data than the serializer wrote. > >> > > >> >The first case is less likely: Flink writes the binary length of each > >> >record in front of its serialized representation. This happens whenever > >> >data is sent over the network, regardless of the data type. A bug in > >>this > >> >part would be very crucial, but is also less likely because this > >>happens > >> >very often and has not occurred yet. > >> > > >> >IMO, this looks like an issue of the serialization logic. Looking at > >>your > >> >code, the problem occurs when deserializing ProtocolEvent objects. > >> >Is it possible that you share this class with me? > >> > > >> >If it is not possible to share the class, it would be good, to know the > >> >field types of the Pojo and the associated TypeInformation. > >> >For that you can run the code in this gist [1] which will recursively > >> >the field types and their TypeInformation. > >> > > >> >As a temporal workaround, you can try to use Kryo to serialize and > >> >deserialize your Pojos as follows: > >> >ExecutionEnvironment env = ... > >> >env.getConfig().enableForceKryo(); > >> > > >> >Best, > >> >Fabian > >> > > >> >[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b > >> > > >> >2015-11-11 10:38 GMT+01:00 Fabian Hueske <[hidden email]>: > >> > > >> >> Hi Ali, > >> >> > >> >> one more thing. Did that error occur once or is it reproducable? > >> >> > >> >> Thanks for your help, > >> >> Fabian > >> >> > >> >> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <[hidden email]>: > >> >> > >> >>> Hey Ali, > >> >>> > >> >>> thanks for sharing the code. I assume that the custom > >> >>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. > >>They > >> >>> should not be a problem. I think this is a bug in Flink 0.9.1. > >> >>> > >> >>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8) > >> >>> version and report back? > >> >>> > >> >>> 1) Add > >> >>> > >>https://repository.apache.org/content/repositories/orgapacheflink-1055 > >> >>> as a > >> >>> snapshot repository > >> >>> > >> >>> <repositories> > >> >>> <repository> > >> >>> <id>apache.snapshots</id> > >> >>> <name>Apache Development Snapshot Repository</name> > >> >>> <url> > >> >>> > >>https://repository.apache.org/content/repositories/orgapacheflink-1055 > >> >>> </url> > >> >>> <releases> > >> >>> <enabled>false</enabled> > >> >>> </releases> > >> >>> <snapshots> > >> >>> <enabled>true</enabled> > >> >>> </snapshots> > >> >>> </repository> > >> >>> </repositories> > >> >>> > >> >>> 2) Set the Flink dependency version to 0.10.0 > >> >>> > >> >>> 3) Use the Flink binary matching your Hadoop installation from here: > >> >>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use > Java, > >> >>>you > >> >>> can go with the Scala 2.10 builds) > >> >>> > >> >>> Sorry for the inconvenience! The release is about to be finished > >>(the > >> >>> voting process is already going on). > >> >>> > >> >>> Ufuk > >> >>> > >> >>> > >> >>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <[hidden email]> > >> >>> wrote: > >> >>> > >> >>> > Thanks for the quick reply guys! A lot of interest in this one. > >>I¹ve > >> >>> > attached the source code is attached. There are other supporting > >> >>> > modules/classes but the main flink component is in the included > >>zip > >> >>> file. > >> >>> > > >> >>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right > >>off > >> >>> the > >> >>> > website (flink-0.9.1-bin-hadoop1.tgz). > >> >>> > > >> >>> > In answer to Ufuk¹s question: Yes I¹m using custom data types. > >> >>> > > >> >>> > Thanks, > >> >>> > Ali > >> >>> > > >> >>> > > >> >>> > > >> >>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <[hidden email]> wrote: > >> >>> > > >> >>> > >Thanks for reporting this. Are you using any custom data types? > >> >>> > > > >> >>> > >If you can share your code, it would be very helpful in order to > >> >>>debug > >> >>> > >this. > >> >>> > > > >> >>> > > Ufuk > >> >>> > > > >> >>> > >On Tuesday, 10 November 2015, Fabian Hueske <[hidden email]> > >> >>>wrote: > >> >>> > > > >> >>> > >> I agree with Robert. Looks like a bug in Flink. > >> >>> > >> Maybe an off-by-one issue (violating index is 32768 and the > >> >>>default > >> >>> > >>memory > >> >>> > >> segment size is 32KB). > >> >>> > >> > >> >>> > >> Which Flink version are you using? > >> >>> > >> In case you are using a custom build, can you share the commit > >>ID > >> >>>(is > >> >>> > >> reported in the first lines of the JobManager log file)? > >> >>> > >> > >> >>> > >> Thanks, Fabian > >> >>> > >> > >> >>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <[hidden email] > >> >>> > >> <javascript:;>>: > >> >>> > >> > >> >>> > >> > Hi Ali, > >> >>> > >> > > >> >>> > >> > this could be a bug in Flink. > >> >>> > >> > Can you share the code of your program with us to debug the > >> >>>issue? > >> >>> > >> > > >> >>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali > >> >>><[hidden email] > >> >>> > >> <javascript:;>> wrote: > >> >>> > >> > > >> >>> > >> > > Hello, > >> >>> > >> > > > >> >>> > >> > > I¹m getting this error while running a streaming module on > >>a > >> >>> cluster > >> >>> > >> of 3 > >> >>> > >> > > nodes: > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > >> >>> > >>org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti > >>>>>>>ve > >> >>>>>Spa > >> >>> > > >> >>> > >> > >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptive > >>>>>>>Sp > >> >>>>>ann > >> >>> > >>ingRecordDeserializer.java:214) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti > >>>>>>>ve > >> >>>>>Spa > >> >>> > > >> >>> > >> > >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(Spilling > >>>>>>>Ad > >> >>>>>apt > >> >>> > >>iveSpanningRecordDeserializer.java:219) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > > >>>>org.apache.flink.types.StringValue.readString(StringValue.java:764) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > >>>>>>>iz > >> >>>>>e(S > >> >>> > >>tringSerializer.java:68) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > >>>>>>>iz > >> >>>>>e(S > >> >>> > >>tringSerializer.java:73) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > >>>>>>>iz > >> >>>>>e(S > >> >>> > >>tringSerializer.java:28) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deseriali > >>>>>>>ze > >> >>>>>(Po > >> >>> > >>joSerializer.java:499) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer > >>>>>.d > >> >>>e > >> >>> > >>serialize(StreamRecordSerializer.java:102) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer > >>>>>.d > >> >>>e > >> >>> > >>serialize(StreamRecordSerializer.java:29) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea > >>>>>>>d( > >> >>>>>Reu > >> >>> > >>singDeserializationDelegate.java:57) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti > >>>>>>>ve > >> >>>>>Spa > >> >>> > > >> >>> > >> > >>>>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecord > >>>>>>>De > >> >>>>>ser > >> >>> > >>ializer.java:110) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> >>>>> > >> org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge > >> >>>>>tNe > >> >>> > >>xtRecord(StreamingAbstractRecordReader.java:80) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.n > >>>>>>>ex > >> >>>>>t(S > >> >>> > >>treamingMutableRecordReader.java:36) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt > >>>>>>>er > >> >>>>>ato > >> >>> > >>r.java:59) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext > >>>>>>>(O > >> >>>>>neI > >> >>> > >>nputStreamTask.java:68) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(O > >>>>>>>ne > >> >>>>>Inp > >> >>> > >>utStreamTask.java:101) > >> >>> > >> > > > >> >>> > >> > > at > >> >>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> >>> > >> > > > >> >>> > >> > > at java.lang.Thread.run(Thread.java:745) > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > Here¹s the configuration for each node: > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > jobmanager.heap.mb: 2048 > >> >>> > >> > > > >> >>> > >> > > taskmanager.heap.mb: 4096 > >> >>> > >> > > > >> >>> > >> > > taskmanager.numberOfTaskSlots: 5 > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > I¹m not even sure where to start with this one so any help > >>is > >> >>> > >> > appreciated. > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > Thanks, > >> >>> > >> > > > >> >>> > >> > > Ali > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > > >> >>> > >> >> > >> >> > >> > >> > > |
Free forum by Nabble | Edit this page |