Tzu-Li (Gordon) Tai created FLINK-11094:
-------------------------------------------
Summary: Restored state in RocksDBStateBackend that has not been accessed in new execution causes NPE on snapshot
Key: FLINK-11094
URL:
https://issues.apache.org/jira/browse/FLINK-11094 Project: Flink
Issue Type: Bug
Components: State Backends, Checkpointing
Affects Versions: 1.7.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Fix For: 1.7.1
This was caused by changes in FLINK-10679.
The problem is that in that change, in the {{RocksDBKeyedBackend}}, {{RegisteredStateMetaInfoBase}}s were no longer created eagerly for all restored state, but instead only lazily created when the state was accessed again by the user. This causes non-accessed restored state to have empty meta info, and throws NPE when trying to take a snapshot of them.
The rationale behind FLINK-10679 was that, since {{RegisteredStateMetaInfoBase}} holds already serializer instances for state access, creating them eagerly at restore time with restored serializer snapshots did not make sense (because at that point-in-time, we do not have the new serializers yet for state access; the snapshot is only capable of creating the previous state serializer).
I propose the following:
Instead of having final {{TypeSerializer}} instances in {{RegisteredStateMetaInfoBase}}s, they should have a {{StateSerializerProvider}} instead.
The {{StateSerializerProvider}} would have the following methods:
{code}
public class StateSerializerProvider<T> {
TypeSerializer<T> getCurrentSerializer();
TypeSerializer<T> updateCurrentSerializer(TypeSerializer<T> newSerializer);
TypeSerializer<T> getPreviousSerializer();
}
{code}
A {{StateSerializerProvider}} can be created either from:
1) A restored serializer snapshot when restoring the state.
2) A fresh, new state's serializer, when registering the state for the first time.
For 1), state that has not been accessed after the restore will return the same serializer (i.e. the previous serializer) for both {{getPreviousSerializer}} and {{getCurrentSerializer}}. Once a restored state is re-accessed, then {{updateCurrentSerializer(TypeSerializer<T> newSerializer)}} should be used to update what serializer the provider returns in {{getCurrentSerializer}}.
We could also make use of this new abstraction to move away some of the new serializer's compatibility checks from the state backend to {{StateSerializerProvider#updateCurrentSerializer}}.
For tests, apparently we're lacking test coverage for restored state that has not been accessed and being snapshotted again. This should be included as part of the fix.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)