Si Chen created FLINK-22608:
-------------------------------
Summary: Flink Kryo deserialize read wrong bytes
Key: FLINK-22608
URL:
https://issues.apache.org/jira/browse/FLINK-22608 Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing, Runtime / State Backends
Affects Versions: 1.12.0
Reporter: Si Chen
In flink program, I use ValueState to save my state. The state stores pojo. But my pojo used kryo serializer. As the program run some time, I add a field in pojo. Then recovery the program with checkpoint. I found the value of the field incorrect. Then I read the source code I found
{code:java}
//代码占位符
org.apache.flink.runtime.state.heap.HeapRestoreOperation#readStateHandleStateData
private void readStateHandleStateData(
FSDataInputStream fsDataInputStream,
DataInputViewStreamWrapper inView,
KeyGroupRangeOffsets keyGroupOffsets,
Map<Integer, StateMetaInfoSnapshot> kvStatesById,
int numStates,
int readVersion,
boolean isCompressed) throws IOException {
final StreamCompressionDecorator streamCompressionDecorator = isCompressed ?
SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
int keyGroupIndex = groupOffset.f0;
long offset = groupOffset.f1;
// Check that restored key groups all belong to the backend.
Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The key group must belong to the backend.");
fsDataInputStream.seek(offset);
int writtenKeyGroupIndex = inView.readInt();
Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
"Unexpected key-group in restore.");
try (InputStream kgCompressionInStream =
streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
readKeyGroupStateData(
kgCompressionInStream,
kvStatesById,
keyGroupIndex,
numStates,
readVersion);
}
}
}
{code}
my state keyGroupIndex is 81, and keyGroupOffset is 3572. And the next keyGroupOffset is 3611. So my state offset rang is 3572 to 3611. But when I add new field in pojo. Kryo will read more bytes in the next keyGroup.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)