Andrey Zagrebin created FLINK-9513:
--------------------------------------
Summary: Wrap state binder with TTL logic
Key: FLINK-9513
URL:
https://issues.apache.org/jira/browse/FLINK-9513 Project: Flink
Issue Type: Sub-task
Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Andrey Zagrebin
Fix For: 1.6.0
The main idea is to wrap user state value with a class holding the value and the expiration timestamp (maybe meta data in future) and use the new object as a value in the existing implementations:
{code:java}
class TtlValue<V> {
V value;
long expirationTimestamp;
}
{code}
The original state binder factory is wrapped with TtlStateBinder if TTL is enabled:
{code:java}
state = ttlConfig.updateType == DISABLED ?
bind(binder) : bind(new TtlStateBinder(binder, timerService));
{code}
TtlStateBinder decorates the states produced by the original binder with TTL logic wrappers and adds TtlValue serialisation logic:
{code:java}
TtlStateBinder {
StateBinder binder;
ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
<V> TtlValueState<V> createValueState(valueDesc) {
serializer = new TtlValueSerializer(valueDesc.getSerializer);
ttlValueDesc = new ValueDesc(serializer, ...);
// or implement custom TypeInfo
originalStateWithTtl = binder.createValueState(valueDesc);
return new TtlValueState(originalStateWithTtl, timeProvider);
}
// List, Map, ...
}
{code}
| |
| |
TTL serializer should add expiration timestamp
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)