Hi Community,
I have a job running on Flink1.9.0 on YARN with rocksDB on HDFS with incremental checkpoint enabled. I have some MapState in code with following config: val ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() .cleanupInBackground() .cleanupFullSnapshot() .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) After running for around 2 days, I observed checkpoint folder is showing 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M /flink-chk743e4568a70b626837b/chk-41 91.7 M /flink-chk743e4568a70b626837b/chk-42 96.1 M /flink-chk743e4568a70b626837b/chk-43 48.1 M /flink-chk743e4568a70b626837b/chk-44 71.6 M /flink-chk743e4568a70b626837b/chk-45 50.9 M /flink-chk743e4568a70b626837b/chk-46 90.2 M /flink-chk743e4568a70b626837b/chk-37 49.3 M /flink-chk743e4568a70b626837b/chk-38 96.9 M /flink-chk743e4568a70b626837b/chk-39 797.9 G /flink-chk743e4568a70b626837b/shared The ./shared folder size seems continuing increasing and seems the folder is not being clean up. However while I disabled incremental cleanup, the expired full snapshot will be removed automatically. Is there any way to remove outdated state on HDFS to stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou |
Hi Shuwen,
The “shared” means that the state files are shared among multiple checkpoints, which happens when you enable incremental checkpointing[1]. Therefore, it’s reasonable that the size keeps growing if you set “state.checkpoint.num-retained” to be a big value. [1] https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html Best, Jiayi Liao Original Message Sender: shuwen zhou<[hidden email]> Recipient: dev<[hidden email]> Date: Tuesday, Nov 5, 2019 17:59 Subject: RocksDB state on HDFS seems not being cleanned up Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on HDFS with incremental checkpoint enabled. I have some MapState in code with following config: val ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() .cleanupInBackground() .cleanupFullSnapshot() .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) After running for around 2 days, I observed checkpoint folder is showing 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M /flink-chk743e4568a70b626837b/chk-41 91.7 M /flink-chk743e4568a70b626837b/chk-42 96.1 M /flink-chk743e4568a70b626837b/chk-43 48.1 M /flink-chk743e4568a70b626837b/chk-44 71.6 M /flink-chk743e4568a70b626837b/chk-45 50.9 M /flink-chk743e4568a70b626837b/chk-46 90.2 M /flink-chk743e4568a70b626837b/chk-37 49.3 M /flink-chk743e4568a70b626837b/chk-38 96.9 M /flink-chk743e4568a70b626837b/chk-39 797.9 G /flink-chk743e4568a70b626837b/shared The ./shared folder size seems continuing increasing and seems the folder is not being clean up. However while I disabled incremental cleanup, the expired full snapshot will be removed automatically. Is there any way to remove outdated state on HDFS to stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou |
Hi Jiayi,
I understand that being shared folder means to store state of multiple checkpoints. I think that shared folder should only retain data across number “state.checkpoint.num-retained” checkpoints and remove outdated checkpoint, isn't it? In my case I doubt that outdated checkpoint's states wasn't cleaned up, which makes shared folder keep increasing even after TTL was passed. On Tue, 5 Nov 2019 at 21:13, bupt_ljy <[hidden email]> wrote: > Hi Shuwen, > > > The “shared” means that the state files are shared among multiple > checkpoints, which happens when you enable incremental checkpointing[1]. > Therefore, it’s reasonable that the size keeps growing if you set > “state.checkpoint.num-retained” to be a big value. > > > [1] > https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html > > > Best, > Jiayi Liao > > > Original Message > Sender: shuwen zhou<[hidden email]> > Recipient: dev<[hidden email]> > Date: Tuesday, Nov 5, 2019 17:59 > Subject: RocksDB state on HDFS seems not being cleanned up > > > Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on > HDFS with incremental checkpoint enabled. I have some MapState in code with > following config: val ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() > .cleanupInBackground() .cleanupFullSnapshot() > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > After running for around 2 days, I observed checkpoint folder is showing > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M > /flink-chk743e4568a70b626837b/chk-41 91.7 M > /flink-chk743e4568a70b626837b/chk-42 96.1 M > /flink-chk743e4568a70b626837b/chk-43 48.1 M > /flink-chk743e4568a70b626837b/chk-44 71.6 M > /flink-chk743e4568a70b626837b/chk-45 50.9 M > /flink-chk743e4568a70b626837b/chk-46 90.2 M > /flink-chk743e4568a70b626837b/chk-37 49.3 M > /flink-chk743e4568a70b626837b/chk-38 96.9 M > /flink-chk743e4568a70b626837b/chk-39 797.9 G > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems > continuing increasing and seems the folder is not being clean up. However > while I disabled incremental cleanup, the expired full snapshot will be > removed automatically. Is there any way to remove outdated state on HDFS to > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou -- Best Wishes, Shuwen Zhou |
Hi Shuwen,
I think the problem is that you configured state ttl to clean up on full snapshots which aren't executed when using RocksDB with incremental snapshots. Instead you need to activate `cleanupInRocksdbCompactFilter`: val ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() .cleanupInBackground() .cleanupInRocksdbCompactFilter() .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) Cheers, Till On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou <[hidden email]> wrote: > Hi Jiayi, > I understand that being shared folder means to store state of multiple > checkpoints. I think that shared folder should only retain data across > number “state.checkpoint.num-retained” checkpoints and remove outdated > checkpoint, isn't it? > In my case I doubt that outdated checkpoint's states wasn't cleaned up, > which makes shared folder keep increasing even after TTL was passed. > > > On Tue, 5 Nov 2019 at 21:13, bupt_ljy <[hidden email]> wrote: > > > Hi Shuwen, > > > > > > The “shared” means that the state files are shared among multiple > > checkpoints, which happens when you enable incremental checkpointing[1]. > > Therefore, it’s reasonable that the size keeps growing if you set > > “state.checkpoint.num-retained” to be a big value. > > > > > > [1] > > > https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html > > > > > > Best, > > Jiayi Liao > > > > > > Original Message > > Sender: shuwen zhou<[hidden email]> > > Recipient: dev<[hidden email]> > > Date: Tuesday, Nov 5, 2019 17:59 > > Subject: RocksDB state on HDFS seems not being cleanned up > > > > > > Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on > > HDFS with incremental checkpoint enabled. I have some MapState in code > with > > following config: val ttlConfig = StateTtlConfig > > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() > > .cleanupInBackground() .cleanupFullSnapshot() > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > After running for around 2 days, I observed checkpoint folder is showing > > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M > > /flink-chk743e4568a70b626837b/chk-41 91.7 M > > /flink-chk743e4568a70b626837b/chk-42 96.1 M > > /flink-chk743e4568a70b626837b/chk-43 48.1 M > > /flink-chk743e4568a70b626837b/chk-44 71.6 M > > /flink-chk743e4568a70b626837b/chk-45 50.9 M > > /flink-chk743e4568a70b626837b/chk-46 90.2 M > > /flink-chk743e4568a70b626837b/chk-37 49.3 M > > /flink-chk743e4568a70b626837b/chk-38 96.9 M > > /flink-chk743e4568a70b626837b/chk-39 797.9 G > > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems > > continuing increasing and seems the folder is not being clean up. However > > while I disabled incremental cleanup, the expired full snapshot will be > > removed automatically. Is there any way to remove outdated state on HDFS > to > > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou > > > > -- > Best Wishes, > Shuwen Zhou > |
@Till Rohrmann , I think just set `cleanupInBackground()` should be enough for RocksDB to clean up in compaction filter after Flink-1.9.0 [1]
@Shuwen , I have several questions for your behavior: 1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I don't think a job-id would act like this. 2. why you have 10 checkpoints left under checkpoint folder, did you configure the retained checkpoints as 10? 3. what do you mean "while I disabled incremental cleanup, the expired full snapshot will be removed automatically." ? I cannot see that you have configured state ttl configure as `cleanupIncrementally()`, moreover, what is the actual meaning of "removed automatically"? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background Best Yun Tang On 11/5/19, 11:24 PM, "Till Rohrmann" <[hidden email]> wrote: Hi Shuwen, I think the problem is that you configured state ttl to clean up on full snapshots which aren't executed when using RocksDB with incremental snapshots. Instead you need to activate `cleanupInRocksdbCompactFilter`: val ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() .cleanupInBackground() .cleanupInRocksdbCompactFilter() .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) Cheers, Till On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou <[hidden email]> wrote: > Hi Jiayi, > I understand that being shared folder means to store state of multiple > checkpoints. I think that shared folder should only retain data across > number “state.checkpoint.num-retained” checkpoints and remove outdated > checkpoint, isn't it? > In my case I doubt that outdated checkpoint's states wasn't cleaned up, > which makes shared folder keep increasing even after TTL was passed. > > > On Tue, 5 Nov 2019 at 21:13, bupt_ljy <[hidden email]> wrote: > > > Hi Shuwen, > > > > > > The “shared” means that the state files are shared among multiple > > checkpoints, which happens when you enable incremental checkpointing[1]. > > Therefore, it’s reasonable that the size keeps growing if you set > > “state.checkpoint.num-retained” to be a big value. > > > > > > [1] > > > https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html > > > > > > Best, > > Jiayi Liao > > > > > > Original Message > > Sender: shuwen zhou<[hidden email]> > > Recipient: dev<[hidden email]> > > Date: Tuesday, Nov 5, 2019 17:59 > > Subject: RocksDB state on HDFS seems not being cleanned up > > > > > > Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on > > HDFS with incremental checkpoint enabled. I have some MapState in code > with > > following config: val ttlConfig = StateTtlConfig > > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() > > .cleanupInBackground() .cleanupFullSnapshot() > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > After running for around 2 days, I observed checkpoint folder is showing > > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M > > /flink-chk743e4568a70b626837b/chk-41 91.7 M > > /flink-chk743e4568a70b626837b/chk-42 96.1 M > > /flink-chk743e4568a70b626837b/chk-43 48.1 M > > /flink-chk743e4568a70b626837b/chk-44 71.6 M > > /flink-chk743e4568a70b626837b/chk-45 50.9 M > > /flink-chk743e4568a70b626837b/chk-46 90.2 M > > /flink-chk743e4568a70b626837b/chk-37 49.3 M > > /flink-chk743e4568a70b626837b/chk-38 96.9 M > > /flink-chk743e4568a70b626837b/chk-39 797.9 G > > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems > > continuing increasing and seems the folder is not being clean up. However > > while I disabled incremental cleanup, the expired full snapshot will be > > removed automatically. Is there any way to remove outdated state on HDFS > to > > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou > > > > -- > Best Wishes, > Shuwen Zhou > |
Hi Yun and Till,
Thank you for your response. For @Yun 1. No, I just renamed the checkpoint directory name since the directory name contains company data. Sorry for the confusion. 2. Yes, I set state.checkpoints.num-retained: 10 state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED In flink.conf I was expecting, shared folder will no longer contains outdated state, since my TTL is set to 30 mins, I shouldn't have seen date older than 1 day. However I could still see those outdated data in shared folder For example, current time is 2019-11-06 03:58:00 UTC, I could see following file on HDFS 65.1 M 2019-11-04 17:58 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/03dea380-758b-4d52-b335-5e6318ba6c40 2.1 K 2019-11-04 17:28 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/1205f112-f5ba-4516-ae32-1424afda08ac 65.1 M 2019-11-04 17:58 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/2298e34d-8cdc-4f8a-aac0-76cf4b9ac0f5 65.1 M 2019-11-04 17:58 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/25e58576-f86f-4ac9-83b8-08ce0be036c4 65.1 M 2019-11-05 17:42 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/27031a93-3ae5-4247-a751-62552c29f325 3.I actually mean that, only latest 10 checkpoint containing full state will be retained on HDFS. In my case, around 20G for each checkpoint. In such way I could have control on how much data was stored on HDFS, Rather than having a increasing shared folder. But it takes a lot of time to store full state on HDFS. Thus I would still like to use incremental. For @Till I would have a try on cleanupInRocksdbCompactFilter to see if it works. Thank you. On Wed, 6 Nov 2019 at 01:50, Yun Tang <[hidden email]> wrote: > @Till Rohrmann , I think just set `cleanupInBackground()` should be enough > for RocksDB to clean up in compaction filter after Flink-1.9.0 [1] > > @Shuwen , I have several questions for your behavior: > 1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I > don't think a job-id would act like this. > 2. why you have 10 checkpoints left under checkpoint folder, did you > configure the retained checkpoints as 10? > 3. what do you mean "while I disabled incremental cleanup, the expired > full snapshot will be removed automatically." ? I cannot see that you have > configured state ttl configure as `cleanupIncrementally()`, moreover, what > is the actual meaning of "removed automatically"? > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background > > Best > Yun Tang > > On 11/5/19, 11:24 PM, "Till Rohrmann" <[hidden email]> wrote: > > Hi Shuwen, > > I think the problem is that you configured state ttl to clean up on > full > snapshots which aren't executed when using RocksDB with incremental > snapshots. Instead you need to activate > `cleanupInRocksdbCompactFilter`: > > val ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30) > .updateTtlOnCreateAndWrite() > .cleanupInBackground() > .cleanupInRocksdbCompactFilter() > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > Cheers, > Till > > On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou <[hidden email]> wrote: > > > Hi Jiayi, > > I understand that being shared folder means to store state of > multiple > > checkpoints. I think that shared folder should only retain data > across > > number “state.checkpoint.num-retained” checkpoints and remove > outdated > > checkpoint, isn't it? > > In my case I doubt that outdated checkpoint's states wasn't cleaned > up, > > which makes shared folder keep increasing even after TTL was passed. > > > > > > On Tue, 5 Nov 2019 at 21:13, bupt_ljy <[hidden email]> wrote: > > > > > Hi Shuwen, > > > > > > > > > The “shared” means that the state files are shared among multiple > > > checkpoints, which happens when you enable incremental > checkpointing[1]. > > > Therefore, it’s reasonable that the size keeps growing if you set > > > “state.checkpoint.num-retained” to be a big value. > > > > > > > > > [1] > > > > > > https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html > > > > > > > > > Best, > > > Jiayi Liao > > > > > > > > > Original Message > > > Sender: shuwen zhou<[hidden email]> > > > Recipient: dev<[hidden email]> > > > Date: Tuesday, Nov 5, 2019 17:59 > > > Subject: RocksDB state on HDFS seems not being cleanned up > > > > > > > > > Hi Community, I have a job running on Flink1.9.0 on YARN with > rocksDB on > > > HDFS with incremental checkpoint enabled. I have some MapState in > code > > with > > > following config: val ttlConfig = StateTtlConfig > > > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() > > > .cleanupInBackground() .cleanupFullSnapshot() > > > > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > > After running for around 2 days, I observed checkpoint folder is > showing > > > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M > > > /flink-chk743e4568a70b626837b/chk-41 91.7 M > > > /flink-chk743e4568a70b626837b/chk-42 96.1 M > > > /flink-chk743e4568a70b626837b/chk-43 48.1 M > > > /flink-chk743e4568a70b626837b/chk-44 71.6 M > > > /flink-chk743e4568a70b626837b/chk-45 50.9 M > > > /flink-chk743e4568a70b626837b/chk-46 90.2 M > > > /flink-chk743e4568a70b626837b/chk-37 49.3 M > > > /flink-chk743e4568a70b626837b/chk-38 96.9 M > > > /flink-chk743e4568a70b626837b/chk-39 797.9 G > > > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems > > > continuing increasing and seems the folder is not being clean up. > However > > > while I disabled incremental cleanup, the expired full snapshot > will be > > > removed automatically. Is there any way to remove outdated state > on HDFS > > to > > > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou > > > > > > > > -- > > Best Wishes, > > Shuwen Zhou > > > > > -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> |
Hi Shuwen
Since you just have 10 “chk-“ folders as expected and when subsuming checkpoints, the “chk-” folder would be removed after we successfully removed shared state [1]. That is to say, I think you might not have too many orphan states files left. To ensure this, you could use state process API [2] to load your checkpoints and compare all the files under “shared” folder to see whether there existed too many orphan files. If this is true, we might think of the custom compaction filter future of FRocksDB. Secondly, your judgment of “20GB each checkpoint” might not be accurate when RocksDB incremental checkpoint is enabled, the UI showed is only the incremental size [3], I suggest you to count your files’s size within your checkpoint meta to know the accurate checkpoint size for each checkpoint. Last but not least, RocksDB’s future of compaction filter to delete expired data only happened during compaction [4], I’m afraid you might need to look up your rocksDB’s LOG file to see the frequency of compaction on task managers. And I think the increasing size might be related with the interval of your checkpoints, what the interval when you executing checkpoints? [1] https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264 [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html [3] https://issues.apache.org/jira/browse/FLINK-13390 [4] https://github.com/facebook/rocksdb/blob/834feaff05a4bf7ae49c736305d5eb180aed4011/include/rocksdb/compaction_filter.h#L61 Best Yun Tang From: shuwen zhou <[hidden email]> Date: Wednesday, November 6, 2019 at 12:02 PM To: dev <[hidden email]>, Yun Tang <[hidden email]>, Till Rohrmann <[hidden email]> Subject: Re: RocksDB state on HDFS seems not being cleanned up Hi Yun and Till, Thank you for your response. For @Yun 1. No, I just renamed the checkpoint directory name since the directory name contains company data. Sorry for the confusion. 2. Yes, I set state.checkpoints.num-retained: 10 state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED In flink.conf I was expecting, shared folder will no longer contains outdated state, since my TTL is set to 30 mins, I shouldn't have seen date older than 1 day. However I could still see those outdated data in shared folder For example, current time is 2019-11-06 03:58:00 UTC, I could see following file on HDFS 65.1 M 2019-11-04 17:58 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/03dea380-758b-4d52-b335-5e6318ba6c40 2.1 K 2019-11-04 17:28 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/1205f112-f5ba-4516-ae32-1424afda08ac 65.1 M 2019-11-04 17:58 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/2298e34d-8cdc-4f8a-aac0-76cf4b9ac0f5 65.1 M 2019-11-04 17:58 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/25e58576-f86f-4ac9-83b8-08ce0be036c4 65.1 M 2019-11-05 17:42 /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/27031a93-3ae5-4247-a751-62552c29f325 3.I actually mean that, only latest 10 checkpoint containing full state will be retained on HDFS. In my case, around 20G for each checkpoint. In such way I could have control on how much data was stored on HDFS, Rather than having a increasing shared folder. But it takes a lot of time to store full state on HDFS. Thus I would still like to use incremental. For @Till I would have a try on cleanupInRocksdbCompactFilter to see if it works. Thank you. On Wed, 6 Nov 2019 at 01:50, Yun Tang <[hidden email]<mailto:[hidden email]>> wrote: @Till Rohrmann , I think just set `cleanupInBackground()` should be enough for RocksDB to clean up in compaction filter after Flink-1.9.0 [1] @Shuwen , I have several questions for your behavior: 1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I don't think a job-id would act like this. 2. why you have 10 checkpoints left under checkpoint folder, did you configure the retained checkpoints as 10? 3. what do you mean "while I disabled incremental cleanup, the expired full snapshot will be removed automatically." ? I cannot see that you have configured state ttl configure as `cleanupIncrementally()`, moreover, what is the actual meaning of "removed automatically"? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fstream%2Fstate%2Fstate.html%23cleanup-in-background&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225193142&sdata=J1ysT4OumrsPUEGdoEjMByABQ3crf7I5xGMg0avU6Us%3D&reserved=0> Best Yun Tang On 11/5/19, 11:24 PM, "Till Rohrmann" <[hidden email]<mailto:[hidden email]>> wrote: Hi Shuwen, I think the problem is that you configured state ttl to clean up on full snapshots which aren't executed when using RocksDB with incremental snapshots. Instead you need to activate `cleanupInRocksdbCompactFilter`: val ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() .cleanupInBackground() .cleanupInRocksdbCompactFilter() .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) Cheers, Till On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou <[hidden email]<mailto:[hidden email]>> wrote: > Hi Jiayi, > I understand that being shared folder means to store state of multiple > checkpoints. I think that shared folder should only retain data across > number “state.checkpoint.num-retained” checkpoints and remove outdated > checkpoint, isn't it? > In my case I doubt that outdated checkpoint's states wasn't cleaned up, > which makes shared folder keep increasing even after TTL was passed. > > > On Tue, 5 Nov 2019 at 21:13, bupt_ljy <[hidden email]<mailto:[hidden email]>> wrote: > > > Hi Shuwen, > > > > > > The “shared” means that the state files are shared among multiple > > checkpoints, which happens when you enable incremental checkpointing[1]. > > Therefore, it’s reasonable that the size keeps growing if you set > > “state.checkpoint.num-retained” to be a big value. > > > > > > [1] > > > https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink.apache.org%2Ffeatures%2F2018%2F01%2F30%2Fincremental-checkpointing.html&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225203151&sdata=yccGSr%2BNrrCuUse1lYJ%2FFgHInx9oXwtfkhnN1KuhDf8%3D&reserved=0> > > > > > > Best, > > Jiayi Liao > > > > > > Original Message > > Sender: shuwen zhou<[hidden email]<mailto:[hidden email]>> > > Recipient: dev<[hidden email]<mailto:[hidden email]>> > > Date: Tuesday, Nov 5, 2019 17:59 > > Subject: RocksDB state on HDFS seems not being cleanned up > > > > > > Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on > > HDFS with incremental checkpoint enabled. I have some MapState in code > with > > following config: val ttlConfig = StateTtlConfig > > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() > > .cleanupInBackground() .cleanupFullSnapshot() > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > After running for around 2 days, I observed checkpoint folder is showing > > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M > > /flink-chk743e4568a70b626837b/chk-41 91.7 M > > /flink-chk743e4568a70b626837b/chk-42 96.1 M > > /flink-chk743e4568a70b626837b/chk-43 48.1 M > > /flink-chk743e4568a70b626837b/chk-44 71.6 M > > /flink-chk743e4568a70b626837b/chk-45 50.9 M > > /flink-chk743e4568a70b626837b/chk-46 90.2 M > > /flink-chk743e4568a70b626837b/chk-37 49.3 M > > /flink-chk743e4568a70b626837b/chk-38 96.9 M > > /flink-chk743e4568a70b626837b/chk-39 797.9 G > > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems > > continuing increasing and seems the folder is not being clean up. However > > while I disabled incremental cleanup, the expired full snapshot will be > > removed automatically. Is there any way to remove outdated state on HDFS > to > > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou > > > > -- > Best Wishes, > Shuwen Zhou > -- Best Wishes, Shuwen Zhou<https://nam12.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.linkedin.com%2Fpub%2Fshuwen-zhou%2F57%2F55b%2F599%2F&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225203151&sdata=0flYa6qSLUtXt3aWUyhiHdZhWNC3BQ7QbN1Edg%2F0xjo%3D&reserved=0> |
Hi Yun,
Thank you for your detailed explanation,It brings me a lot to research. I think 1. I should try reduce number of "state.checkpoints.num-retained", maybe to 3, which could decrease amount of shared folder. 2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer is yes, maybe. I could have use the state process API you mentioned to figure it out and get back to you. 3. I have a look in file /flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata, there are a lot file names like hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03, sum those file's size up is the total size of each chekpoint, am I correct? 4. My checkpoint interval is 16 minutes. On Wed, 6 Nov 2019 at 15:57, Yun Tang <[hidden email]> wrote: > Hi Shuwen > > > > Since you just have 10 “chk-“ folders as expected and when subsuming > checkpoints, the “chk-” folder would be removed after we successfully > removed shared state [1]. That is to say, I think you might not have too > many orphan states files left. To ensure this, you could use state process > API [2] to load your checkpoints and compare all the files under “shared” > folder to see whether there existed too many orphan files. If this is true, > we might think of the custom compaction filter future of FRocksDB. > > > > Secondly, your judgment of “20GB each checkpoint” might not be accurate > when RocksDB incremental checkpoint is enabled, the UI showed is only the > incremental size [3], I suggest you to count your files’s size within your > checkpoint meta to know the accurate checkpoint size for each checkpoint. > > > > Last but not least, RocksDB’s future of compaction filter to delete > expired data only happened during compaction [4], I’m afraid you might need > to look up your rocksDB’s LOG file to see the frequency of compaction on > task managers. And I think the increasing size might be related with the > interval of your checkpoints, what the interval when you executing > checkpoints? > > > > > > [1] > https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264 > > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > [3] https://issues.apache.org/jira/browse/FLINK-13390 > > [4] > https://github.com/facebook/rocksdb/blob/834feaff05a4bf7ae49c736305d5eb180aed4011/include/rocksdb/compaction_filter.h#L61 > > > > Best > > Yun Tang > > > > *From: *shuwen zhou <[hidden email]> > *Date: *Wednesday, November 6, 2019 at 12:02 PM > *To: *dev <[hidden email]>, Yun Tang <[hidden email]>, Till > Rohrmann <[hidden email]> > *Subject: *Re: RocksDB state on HDFS seems not being cleanned up > > > > Hi Yun and Till, > > Thank you for your response. > > For @Yun > 1. No, I just renamed the checkpoint directory name since the directory > name contains company data. Sorry for the confusion. > > 2. Yes, I set > > *state.checkpoints.num-retained: *10 > *state.backend.rocksdb.predefined-options: *FLASH_SSD_OPTIMIZED > > In flink.conf > > I was expecting, shared folder will no longer contains outdated state, since my TTL is set to 30 mins, I shouldn't have seen date older than 1 day. However I could still see those outdated data in shared folder > > For example, current time is 2019-11-06 03:58:00 UTC, I could see following file on HDFS > > 65.1 M 2019-11-04 17:58 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/03dea380-758b-4d52-b335-5e6318ba6c40 > 2.1 K 2019-11-04 17:28 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/1205f112-f5ba-4516-ae32-1424afda08ac > 65.1 M 2019-11-04 17:58 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/2298e34d-8cdc-4f8a-aac0-76cf4b9ac0f5 > 65.1 M 2019-11-04 17:58 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/25e58576-f86f-4ac9-83b8-08ce0be036c4 > > 65.1 M 2019-11-05 17:42 > /flink/checkpoint/c344b61c456af743e4568a70b626837b/shared/27031a93-3ae5-4247-a751-62552c29f325 > > > 3.I actually mean that, only latest 10 checkpoint containing full state will be retained on HDFS. In my case, around 20G for each checkpoint. In such way I could have control on how much data was stored on HDFS, Rather than having a increasing shared folder. > > But it takes a lot of time to store full state on HDFS. Thus I would still like to use incremental. > > > > > > For @Till > > I would have a try on cleanupInRocksdbCompactFilter to see if it works. Thank you. > > > > On Wed, 6 Nov 2019 at 01:50, Yun Tang <[hidden email]> wrote: > > @Till Rohrmann , I think just set `cleanupInBackground()` should be enough > for RocksDB to clean up in compaction filter after Flink-1.9.0 [1] > > @Shuwen , I have several questions for your behavior: > 1. Is the ` flink-chk743e4568a70b626837b` real folder for checkpoints? I > don't think a job-id would act like this. > 2. why you have 10 checkpoints left under checkpoint folder, did you > configure the retained checkpoints as 10? > 3. what do you mean "while I disabled incremental cleanup, the expired > full snapshot will be removed automatically." ? I cannot see that you have > configured state ttl configure as `cleanupIncrementally()`, moreover, what > is the actual meaning of "removed automatically"? > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background > <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fstream%2Fstate%2Fstate.html%23cleanup-in-background&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225193142&sdata=J1ysT4OumrsPUEGdoEjMByABQ3crf7I5xGMg0avU6Us%3D&reserved=0> > > Best > Yun Tang > > On 11/5/19, 11:24 PM, "Till Rohrmann" <[hidden email]> wrote: > > Hi Shuwen, > > I think the problem is that you configured state ttl to clean up on > full > snapshots which aren't executed when using RocksDB with incremental > snapshots. Instead you need to activate > `cleanupInRocksdbCompactFilter`: > > val ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(30) > .updateTtlOnCreateAndWrite() > .cleanupInBackground() > .cleanupInRocksdbCompactFilter() > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > Cheers, > Till > > On Tue, Nov 5, 2019 at 4:04 PM shuwen zhou <[hidden email]> wrote: > > > Hi Jiayi, > > I understand that being shared folder means to store state of > multiple > > checkpoints. I think that shared folder should only retain data > across > > number “state.checkpoint.num-retained” checkpoints and remove > outdated > > checkpoint, isn't it? > > In my case I doubt that outdated checkpoint's states wasn't cleaned > up, > > which makes shared folder keep increasing even after TTL was passed. > > > > > > On Tue, 5 Nov 2019 at 21:13, bupt_ljy <[hidden email]> wrote: > > > > > Hi Shuwen, > > > > > > > > > The “shared” means that the state files are shared among multiple > > > checkpoints, which happens when you enable incremental > checkpointing[1]. > > > Therefore, it’s reasonable that the size keeps growing if you set > > > “state.checkpoint.num-retained” to be a big value. > > > > > > > > > [1] > > > > > > https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html > <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink.apache.org%2Ffeatures%2F2018%2F01%2F30%2Fincremental-checkpointing.html&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225203151&sdata=yccGSr%2BNrrCuUse1lYJ%2FFgHInx9oXwtfkhnN1KuhDf8%3D&reserved=0> > > > > > > > > > Best, > > > Jiayi Liao > > > > > > > > > Original Message > > > Sender: shuwen zhou<[hidden email]> > > > Recipient: dev<[hidden email]> > > > Date: Tuesday, Nov 5, 2019 17:59 > > > Subject: RocksDB state on HDFS seems not being cleanned up > > > > > > > > > Hi Community, I have a job running on Flink1.9.0 on YARN with > rocksDB on > > > HDFS with incremental checkpoint enabled. I have some MapState in > code > > with > > > following config: val ttlConfig = StateTtlConfig > > > .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() > > > .cleanupInBackground() .cleanupFullSnapshot() > > > > > > .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) > > > After running for around 2 days, I observed checkpoint folder is > showing > > > 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M > > > /flink-chk743e4568a70b626837b/chk-41 91.7 M > > > /flink-chk743e4568a70b626837b/chk-42 96.1 M > > > /flink-chk743e4568a70b626837b/chk-43 48.1 M > > > /flink-chk743e4568a70b626837b/chk-44 71.6 M > > > /flink-chk743e4568a70b626837b/chk-45 50.9 M > > > /flink-chk743e4568a70b626837b/chk-46 90.2 M > > > /flink-chk743e4568a70b626837b/chk-37 49.3 M > > > /flink-chk743e4568a70b626837b/chk-38 96.9 M > > > /flink-chk743e4568a70b626837b/chk-39 797.9 G > > > /flink-chk743e4568a70b626837b/shared The ./shared folder size seems > > > continuing increasing and seems the folder is not being clean up. > However > > > while I disabled incremental cleanup, the expired full snapshot > will be > > > removed automatically. Is there any way to remove outdated state > on HDFS > > to > > > stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou > > > > > > > > -- > > Best Wishes, > > Shuwen Zhou > > > > > > > -- > > Best Wishes, > > Shuwen Zhou > <https://nam12.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.linkedin.com%2Fpub%2Fshuwen-zhou%2F57%2F55b%2F599%2F&data=02%7C01%7C%7C04edf5e408784c55104c08d7626e141c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637086097225203151&sdata=0flYa6qSLUtXt3aWUyhiHdZhWNC3BQ7QbN1Edg%2F0xjo%3D&reserved=0> > > > -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> |
Free forum by Nabble | Edit this page |