Question about RocksDBStateBackend Compaction Filter state cleanup

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about RocksDBStateBackend Compaction Filter state cleanup

LakeShen
Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();



> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.


What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen
Reply | Threaded
Open this post in threaded view
|

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

Yun Tang
Hi Lake

Flink leverage RocksDB's background compaction mechanism to filter out-of-TTL entries (by comparing with current timestamp provided from RocksDB's time_provider) to not let them stay in newly compacted data.

This would iterator over data entries with FlinkCompactionFilter::FilterV2 [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB iterator more than several entries .e.g 1000, it would call time_provider to update current timestamp to let the process of cleaning up more eagerly and accurately.

[1] https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
[2] https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140

Best
Yun Tang

________________________________
From: LakeShen <[hidden email]>
Sent: Tuesday, March 17, 2020 15:30
To: dev <[hidden email]>; user-zh <[hidden email]>; user <[hidden email]>
Subject: Question about RocksDBStateBackend Compaction Filter state cleanup

Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :


StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();


The default background cleanup for RocksDB backend queries the current timestamp each time 1000 entries have been processed.

What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen
Reply | Threaded
Open this post in threaded view
|

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

Andrey Zagrebin-4
Hi Lake,

When the Flink doc mentions a state entry in RocksDB, we mean one key/value
pair stored by user code over any keyed state API
(keyed context in keyed operators obtained e.g. from keyBy()
transformation).
In case of map or list, the doc means map key/value and list element.

- value/aggregating/folding/reducing state: key -> value
- map state: key -> map key -> value
- list state: key -> list -> element in some position

Best,
Andrey

On Tue, Mar 17, 2020 at 11:04 AM Yun Tang <[hidden email]> wrote:

> Hi Lake
>
> Flink leverage RocksDB's background compaction mechanism to filter
> out-of-TTL entries (by comparing with current timestamp provided from
> RocksDB's time_provider) to not let them stay in newly compacted data.
>
> This would iterator over data entries with FlinkCompactionFilter::FilterV2
> [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the
> threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB
> iterator more than several entries .e.g 1000, it would call time_provider
> to update current timestamp to let the process of cleaning up more eagerly
> and accurately.
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
> [2]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* LakeShen <[hidden email]>
> *Sent:* Tuesday, March 17, 2020 15:30
> *To:* dev <[hidden email]>; user-zh <[hidden email]>;
> user <[hidden email]>
> *Subject:* Question about RocksDBStateBackend Compaction Filter state
> cleanup
>
> Hi community ,
>
> I see the flink RocksDBStateBackend state cleanup,now the code like this :
>
> StateTtlConfig ttlConfig = StateTtlConfig
>     .newBuilder(Time.seconds(1))
>     .cleanupInRocksdbCompactFilter(1000)
>     .build();
>
>
>
> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.
>
>
> What's the meaning of  1000 entries? 1000 different key ?
>
> Thanks to your reply.
>
> Best regards,
> LakeShen
>