Flink 1.9Version State TTL parameter configuration it does not work

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.9Version State TTL parameter configuration it does not work

YangPeng007
Hi,I have some questions about state TTL to consult with everybody,the
statebackend is rocksdb  Below is my code:
-----------------code begin-------------
private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.minutes(60))
        .updateTtlOnCreateAndWrite()
        .neverReturnExpired()
        .cleanupInRocksdbCompactFilter(1000)
        .build();
MapStateDescriptor<String, Integer> eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

-----------------code end-----------------

I  have set the TTL of the state is 60mins, But after 12 hours,  through
the monitor of rocksdb metric , we found that the sst file of
 CF:EV_EID_FLAG  has been increasing, and there is no decreasing trend.
Later we found some information from the taskmanager log:*WARN
org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL compaction
filter for state < EV_EID_FLAG >: feature is disabled for the state backend*
After I added  "*state.backend.rocksdb.ttl.compaction.filter.**enabled:
true*"  this parameter, the warn information disappeared, but  ater the
project completed some checkpoints ,The next   checkpoint will always fail, I
checked the jstack command and found that the fail checkpoint was stuck in
acquiring state ,disk io is idle;remove the  "
*state.backend.rocksdb.ttl.compaction.filter.**enabled: true"* the
parameter,the project  will resume the checkpoint.  So I’m asking everyone
here. Is my usage method wrong?