Jürgen Kreileder created FLINK-11420:
---------------------------------------- Summary: Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBounds Key: FLINK-11420 URL: https://issues.apache.org/jira/browse/FLINK-11420 Project: Flink Issue Type: Bug Components: Type Serialization System Affects Versions: 1.7.1 Reporter: Jürgen Kreileder We frequently run into random ArrayIndexOutOfBounds exceptions when flink tries to serialize Scala case classes containing a Map[String, Any] (Any being String, Long, Int, or Boolean) with the FsStateBackend. (This probably happens with any case class containing a type requiring Kryo, see this thread for instance: [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=Df6TLfeOJsB_rHWxs_rUoyLqcqv2gVWQTtfhA@...%3e]) Disabling asynchronous snapshots seems to work around the problem, so maybe something is not thread-safe in CaseClassSerializer. Our objects look like this: {code} case class Event(timestamp: Long, [...], content: Map[String, Any] case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any]) {code} I've looked at a few of the exceptions in a debugger. It always happens when serializing the right-hand side a tuple from EnrichedEvent -> Event -> content, e.g: 13 from ("foo", 13) or false from ("bar", false). Stacktrace: {code:java} java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.base/java.lang.Thread.run(Thread.java:834){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |