[jira] [Created] (FLINK-22608) Flink Kryo deserialize read wrong bytes

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22608) Flink Kryo deserialize read wrong bytes

Shang Yuanchun (Jira)
Si Chen created FLINK-22608:
-------------------------------

             Summary: Flink Kryo deserialize read wrong bytes
                 Key: FLINK-22608
                 URL: https://issues.apache.org/jira/browse/FLINK-22608
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing, Runtime / State Backends
    Affects Versions: 1.12.0
            Reporter: Si Chen


In flink program, I use ValueState to save my state. The state stores pojo. But my pojo used kryo serializer. As the program run some time, I add a field in pojo. Then recovery the program with checkpoint. I found the value of the field incorrect. Then I read the source code I found

 
{code:java}
//代码占位符
org.apache.flink.runtime.state.heap.HeapRestoreOperation#readStateHandleStateData

private void readStateHandleStateData(
   FSDataInputStream fsDataInputStream,
   DataInputViewStreamWrapper inView,
   KeyGroupRangeOffsets keyGroupOffsets,
   Map<Integer, StateMetaInfoSnapshot> kvStatesById,
   int numStates,
   int readVersion,
   boolean isCompressed) throws IOException {

   final StreamCompressionDecorator streamCompressionDecorator = isCompressed ?
      SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;

   for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
      int keyGroupIndex = groupOffset.f0;
      long offset = groupOffset.f1;

      // Check that restored key groups all belong to the backend.
      Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The key group must belong to the backend.");

      fsDataInputStream.seek(offset);

      int writtenKeyGroupIndex = inView.readInt();
      Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
         "Unexpected key-group in restore.");

      try (InputStream kgCompressionInStream =
             streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {

         readKeyGroupStateData(
            kgCompressionInStream,
            kvStatesById,
            keyGroupIndex,
            numStates,
            readVersion);
      }
   }
}
{code}
my state keyGroupIndex is 81, and keyGroupOffset is 3572. And the next keyGroupOffset is 3611. So my state offset rang is 3572 to 3611. But when I add new field in pojo. Kryo will read more bytes in the next keyGroup.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)