Hi, all!
At flink master branch, we have supported state ttl for sql mini-batch deduplication using incremental cleanup strategy on heap backend, refer to FLINK-16581. Because I want to test the performance of this feature, so I compile master branch code and deploy the jar to production environment,then run three types of tests, respectively: flink 1.9.0 release version enable state ttl flink 1.11-snapshot version disable state ttl flink 1.11-snapshot version enable state ttl The test query sql as follows: select order_date, sum(price * amount - goods_all_fav_amt - virtual_money_amt + goods_carriage_amt) as saleP, sum(amount) as saleN, count(distinct parent_sn) as orderN, count(distinct user_id) as cusN from( select order_date, user_id, order_type, order_status, terminal, last_update_time, goods_all_fav_amt, goods_carriage_amt, virtual_money_amt, price, amount, order_quality, quality_goods_cnt, acture_goods_amt from (select *, row_number() over(partition by order_id, order_goods_id order by proctime desc) as rownum from dm_trd_order_goods) where rownum=1 and (order_type in (1,2,3,4,5) or order_status = 70) and terminal = 'shop' and price > 0) group by order_date At runtime, this query will generate two operators which include Deduplication and GroupAgg. In the test, the configuration is same, parallelism is 20, set kafka consumer from the earliest, and disable mini-batch function, The test results as follows: flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w records, average tps at 5200/s, Flink UI picture link back pressure, checkpoint flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink receive 883w records, average tps at 5200/s, Flink UI picture link back pressure, checkpoint flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, flink only receive 168w records because of deduplication operator serious back pressure, average tps at 270/s, moreover, checkpoint always fail because of deduplication operator serious back pressure, Flink UI picture link back pressure, checkpoint Deduplication state clean up implement in flink 1.9.0 use timer, but 1.11-snapshot version use StateTtlConfig, this is the main difference. Comparing the three tests comprehensively, we can see that if disable state ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state ttl. However, if enable state ttl in 1.11-snapshot, performance down is nearly 20 times, so I think incremental cleanup strategy cause this problem, what do you think about it? @azagrebin, @jark. Thanks. lsyldliu Zhejiang University, College of Control Science and engineer, CSC |
Hi lsyldliu,
Thanks for investigating this. First of all, if you are using mini-batch deduplication, it doesn't support state ttl in 1.9. That's why the tps looks the same with 1.11 disable state ttl. We just introduce state ttl for mini-batch deduplication recently. Regarding to the performance regression, it looks very surprise to me. The performance is reduced by 19x when StateTtlConfig is enabled in 1.11. I don't have much experience of the underlying of StateTtlConfig. So I loop in @Yu Li <[hidden email]> @YunTang in CC who may have more insights on this. For more information, we use the following StateTtlConfig [1] in blink planner: StateTtlConfig .newBuilder(Time.milliseconds(retentionTime)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); Best, Jark [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27 On Wed, 29 Apr 2020 at 11:53, 刘大龙 <[hidden email]> wrote: > Hi, all! > > At flink master branch, we have supported state ttl for sql mini-batch > deduplication using incremental cleanup strategy on heap backend, refer to > FLINK-16581. Because I want to test the performance of this feature, so I > compile master branch code and deploy the jar to production > environment,then run three types of tests, respectively: > > > > > flink 1.9.0 release version enable state ttl > flink 1.11-snapshot version disable state ttl > flink 1.11-snapshot version enable state ttl > > > > > The test query sql as follows: > > select order_date, > sum(price * amount - goods_all_fav_amt - virtual_money_amt + > goods_carriage_amt) as saleP, > sum(amount) as saleN, > count(distinct parent_sn) as orderN, > count(distinct user_id) as cusN > from( > select order_date, user_id, > order_type, order_status, terminal, last_update_time, > goods_all_fav_amt, > goods_carriage_amt, virtual_money_amt, price, amount, > order_quality, quality_goods_cnt, acture_goods_amt > from (select *, row_number() over(partition by order_id, > order_goods_id order by proctime desc) as rownum from dm_trd_order_goods) > where rownum=1 > and (order_type in (1,2,3,4,5) or order_status = 70) > and terminal = 'shop' and price > 0) > group by order_date > > > At runtime, this query will generate two operators which include > Deduplication and GroupAgg. In the test, the configuration is same, > parallelism is 20, set kafka consumer from the earliest, and disable > mini-batch function, The test results as follows: > > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w > records, average tps at 5200/s, Flink UI picture link back pressure, > checkpoint > flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink > receive 883w records, average tps at 5200/s, Flink UI picture link back > pressure, checkpoint > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, > flink only receive 168w records because of deduplication operator serious > back pressure, average tps at 270/s, moreover, checkpoint always fail > because of deduplication operator serious back pressure, Flink UI picture > link back pressure, checkpoint > > Deduplication state clean up implement in flink 1.9.0 use timer, but > 1.11-snapshot version use StateTtlConfig, this is the main difference. > Comparing the three tests comprehensively, we can see that if disable state > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state > ttl. However, if enable state ttl in 1.11-snapshot, performance down is > nearly 20 times, so I think incremental cleanup strategy cause this > problem, what do you think about it? @azagrebin, @jark. > > Thanks. > > lsyldliu > > Zhejiang University, College of Control Science and engineer, CSC |
> -----原始邮件----- > 发件人: "Jark Wu" <[hidden email]> > 发送时间: 2020-04-29 14:09:44 (星期三) > 收件人: dev <[hidden email]>, "Yu Li" <[hidden email]>, [hidden email] > 抄送: [hidden email] > 主题: Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation > > Hi lsyldliu, > > Thanks for investigating this. > > First of all, if you are using mini-batch deduplication, it doesn't support > state ttl in 1.9. That's why the tps looks the same with 1.11 disable state > ttl. > We just introduce state ttl for mini-batch deduplication recently. > > Regarding to the performance regression, it looks very surprise to me. The > performance is reduced by 19x when StateTtlConfig is enabled in 1.11. > I don't have much experience of the underlying of StateTtlConfig. So I loop > in @Yu Li <[hidden email]> @YunTang in CC who may have more insights on > this. > > For more information, we use the following StateTtlConfig [1] in blink > planner: > > StateTtlConfig > .newBuilder(Time.milliseconds(retentionTime)) > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > .build(); > > > Best, > Jark > > > [1]: > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27 > > > > > > On Wed, 29 Apr 2020 at 11:53, 刘大龙 <[hidden email]> wrote: > > > Hi, all! > > > > At flink master branch, we have supported state ttl for sql mini-batch > > deduplication using incremental cleanup strategy on heap backend, refer to > > FLINK-16581. Because I want to test the performance of this feature, so I > > compile master branch code and deploy the jar to production > > environment,then run three types of tests, respectively: > > > > > > > > > > flink 1.9.0 release version enable state ttl > > flink 1.11-snapshot version disable state ttl > > flink 1.11-snapshot version enable state ttl > > > > > > > > > > The test query sql as follows: > > > > select order_date, > > sum(price * amount - goods_all_fav_amt - virtual_money_amt + > > goods_carriage_amt) as saleP, > > sum(amount) as saleN, > > count(distinct parent_sn) as orderN, > > count(distinct user_id) as cusN > > from( > > select order_date, user_id, > > order_type, order_status, terminal, last_update_time, > > goods_all_fav_amt, > > goods_carriage_amt, virtual_money_amt, price, amount, > > order_quality, quality_goods_cnt, acture_goods_amt > > from (select *, row_number() over(partition by order_id, > > order_goods_id order by proctime desc) as rownum from dm_trd_order_goods) > > where rownum=1 > > and (order_type in (1,2,3,4,5) or order_status = 70) > > and terminal = 'shop' and price > 0) > > group by order_date > > > > > > At runtime, this query will generate two operators which include > > Deduplication and GroupAgg. In the test, the configuration is same, > > parallelism is 20, set kafka consumer from the earliest, and disable > > mini-batch function, The test results as follows: > > > > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w > > records, average tps at 5200/s, Flink UI picture link back pressure, > > checkpoint > > flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink > > receive 883w records, average tps at 5200/s, Flink UI picture link back > > pressure, checkpoint > > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, > > flink only receive 168w records because of deduplication operator serious > > back pressure, average tps at 270/s, moreover, checkpoint always fail > > because of deduplication operator serious back pressure, Flink UI picture > > link back pressure, checkpoint > > > > Deduplication state clean up implement in flink 1.9.0 use timer, but > > 1.11-snapshot version use StateTtlConfig, this is the main difference. > > Comparing the three tests comprehensively, we can see that if disable state > > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state > > ttl. However, if enable state ttl in 1.11-snapshot, performance down is > > nearly 20 times, so I think incremental cleanup strategy cause this > > problem, what do you think about it? @azagrebin, @jark. > > > > Thanks. > > > > lsyldliu > > > > Zhejiang University, College of Control Science and engineer, CSC ------------------------------ 刘大龙 浙江大学 控制系 智能系统与控制研究所 工控新楼217 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 Tel:18867547281 Hi Jark, I use non-minibtach deduplication and group agg for the tests, non-minibatch deduplicaiton state ttl implementation has been refactored use StateTtlConfig replace timer in current 1.11 master branch that PR is my work, I also surprise to the 19x performance down. |
Hi lsyldliu,
You can try to tune the StateTtlConfig. As the documentation suggests [1] the TTL incremental cleanup can decrease the per record performance. This is the price of the automatic cleanup. If the only thing, which happens mostly in your operator, is working with state then even checking one additional record to cleanup is two times more actions to do. Timer approach was discussed in TTL feature design. It needs an additional implementation and keeps more state but performs only one cleanup action exactly when needed so it is a performance/storage trade-off. Anyways, 20x degradation looks indeed a lot. As a first step, I would suggest to configure the incremental cleanup explicitly in `StateTtlConfigUtil#createTtlConfig` with a less entries to check, e.g. 1 because processFirstRow/processLastRow already access the state twice and do cleanup: .cleanupIncrementally(1, false) Also not sure but depending on the input data, finishBundle can happen mostly during the snapshotting which slows down taking the checkpoint. Could this fail the checkpoint accumulating the backpressure and slowing down the pipeline? Not sure why to keep the deduplication data in a Java map and in Flink state at the same time, why not to keep it only in Flink state and deduplicate on each incoming record? Best, Andrey [1] note 2 in https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup On Wed, Apr 29, 2020 at 11:53 AM 刘大龙 <[hidden email]> wrote: > > > > > -----原始邮件----- > > 发件人: "Jark Wu" <[hidden email]> > > 发送时间: 2020-04-29 14:09:44 (星期三) > > 收件人: dev <[hidden email]>, "Yu Li" <[hidden email]>, > [hidden email] > > 抄送: [hidden email] > > 主题: Re: The use of state ttl incremental cleanup strategy in sql > deduplication resulting in significant performance degradation > > > > Hi lsyldliu, > > > > Thanks for investigating this. > > > > First of all, if you are using mini-batch deduplication, it doesn't > support > > state ttl in 1.9. That's why the tps looks the same with 1.11 disable > state > > ttl. > > We just introduce state ttl for mini-batch deduplication recently. > > > > Regarding to the performance regression, it looks very surprise to me. > The > > performance is reduced by 19x when StateTtlConfig is enabled in 1.11. > > I don't have much experience of the underlying of StateTtlConfig. So I > loop > > in @Yu Li <[hidden email]> @YunTang in CC who may have more insights > on > > this. > > > > For more information, we use the following StateTtlConfig [1] in blink > > planner: > > > > StateTtlConfig > > .newBuilder(Time.milliseconds(retentionTime)) > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > .build(); > > > > > > Best, > > Jark > > > > > > [1]: > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27 > > > > > > > > > > > > On Wed, 29 Apr 2020 at 11:53, 刘大龙 <[hidden email]> wrote: > > > > > Hi, all! > > > > > > At flink master branch, we have supported state ttl for sql mini-batch > > > deduplication using incremental cleanup strategy on heap backend, > refer to > > > FLINK-16581. Because I want to test the performance of this feature, > so I > > > compile master branch code and deploy the jar to production > > > environment,then run three types of tests, respectively: > > > > > > > > > > > > > > > flink 1.9.0 release version enable state ttl > > > flink 1.11-snapshot version disable state ttl > > > flink 1.11-snapshot version enable state ttl > > > > > > > > > > > > > > > The test query sql as follows: > > > > > > select order_date, > > > sum(price * amount - goods_all_fav_amt - virtual_money_amt + > > > goods_carriage_amt) as saleP, > > > sum(amount) as saleN, > > > count(distinct parent_sn) as orderN, > > > count(distinct user_id) as cusN > > > from( > > > select order_date, user_id, > > > order_type, order_status, terminal, last_update_time, > > > goods_all_fav_amt, > > > goods_carriage_amt, virtual_money_amt, price, amount, > > > order_quality, quality_goods_cnt, acture_goods_amt > > > from (select *, row_number() over(partition by order_id, > > > order_goods_id order by proctime desc) as rownum from > dm_trd_order_goods) > > > where rownum=1 > > > and (order_type in (1,2,3,4,5) or order_status = 70) > > > and terminal = 'shop' and price > 0) > > > group by order_date > > > > > > > > > At runtime, this query will generate two operators which include > > > Deduplication and GroupAgg. In the test, the configuration is same, > > > parallelism is 20, set kafka consumer from the earliest, and disable > > > mini-batch function, The test results as follows: > > > > > > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w > > > records, average tps at 5200/s, Flink UI picture link back pressure, > > > checkpoint > > > flink 1.11-snapshot version disable state ttl:this test lasted 28m, > flink > > > receive 883w records, average tps at 5200/s, Flink UI picture link back > > > pressure, checkpoint > > > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, > > > flink only receive 168w records because of deduplication operator > serious > > > back pressure, average tps at 270/s, moreover, checkpoint always fail > > > because of deduplication operator serious back pressure, Flink UI > picture > > > link back pressure, checkpoint > > > > > > Deduplication state clean up implement in flink 1.9.0 use timer, but > > > 1.11-snapshot version use StateTtlConfig, this is the main difference. > > > Comparing the three tests comprehensively, we can see that if disable > state > > > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable > state > > > ttl. However, if enable state ttl in 1.11-snapshot, performance down is > > > nearly 20 times, so I think incremental cleanup strategy cause this > > > problem, what do you think about it? @azagrebin, @jark. > > > > > > Thanks. > > > > > > lsyldliu > > > > > > Zhejiang University, College of Control Science and engineer, CSC > > > ------------------------------ > 刘大龙 > > 浙江大学 控制系 智能系统与控制研究所 工控新楼217 > 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 > Tel:18867547281 > Hi Jark, > I use non-minibtach deduplication and group agg for the tests, > non-minibatch deduplicaiton state ttl implementation has been refactored > use StateTtlConfig replace timer in current 1.11 master branch that PR is > my work, I also surprise to the 19x performance down. |
Hi Andrey,
Thanks for the tuning ideas. I will explain the design of deduplication. The mini-batch implementation of deduplication buffers a bundle of input data in heap (Java Map), when the bundle size hit the trigger size or trigger time, the buffered data will be processed together. So we only need to access the state once per key. This is designed for rocksdb statebackend to reduce the frequently accessing, (de)serialization. And yes, this may slow down the checkpoint, but the suggested mini-batch timeout is <= 10s. From our production experience, it doesn't have much impact on checkpoint. Best, Jark On Tue, 5 May 2020 at 06:48, Andrey Zagrebin <[hidden email]> wrote: > Hi lsyldliu, > > You can try to tune the StateTtlConfig. As the documentation suggests [1] > the TTL incremental cleanup can decrease the per record performance. This > is the price of the automatic cleanup. > If the only thing, which happens mostly in your operator, is working with > state then even checking one additional record to cleanup is two times more > actions to do. > Timer approach was discussed in TTL feature design. It needs an additional > implementation and keeps more state but performs only one cleanup action > exactly when needed so it is a performance/storage trade-off. > > Anyways, 20x degradation looks indeed a lot. > As a first step, I would suggest to configure the incremental cleanup > explicitly in `StateTtlConfigUtil#createTtlConfig` with a less entries to > check, e.g. 1 because processFirstRow/processLastRow already access the > state twice and do cleanup: > > .cleanupIncrementally(1, false) > > > Also not sure but depending on the input data, finishBundle can happen > mostly during the snapshotting which slows down taking the checkpoint. > Could this fail the checkpoint accumulating the backpressure and slowing > down the pipeline? > > Not sure why to keep the deduplication data in a Java map and in Flink > state at the same time, why not to keep it only in Flink state and > deduplicate on each incoming record? > > Best, > Andrey > > [1] note 2 in > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup > > On Wed, Apr 29, 2020 at 11:53 AM 刘大龙 <[hidden email]> wrote: > > > > > > > > > > -----原始邮件----- > > > 发件人: "Jark Wu" <[hidden email]> > > > 发送时间: 2020-04-29 14:09:44 (星期三) > > > 收件人: dev <[hidden email]>, "Yu Li" <[hidden email]>, > > [hidden email] > > > 抄送: [hidden email] > > > 主题: Re: The use of state ttl incremental cleanup strategy in sql > > deduplication resulting in significant performance degradation > > > > > > Hi lsyldliu, > > > > > > Thanks for investigating this. > > > > > > First of all, if you are using mini-batch deduplication, it doesn't > > support > > > state ttl in 1.9. That's why the tps looks the same with 1.11 disable > > state > > > ttl. > > > We just introduce state ttl for mini-batch deduplication recently. > > > > > > Regarding to the performance regression, it looks very surprise to me. > > The > > > performance is reduced by 19x when StateTtlConfig is enabled in 1.11. > > > I don't have much experience of the underlying of StateTtlConfig. So I > > loop > > > in @Yu Li <[hidden email]> @YunTang in CC who may have more insights > > on > > > this. > > > > > > For more information, we use the following StateTtlConfig [1] in blink > > > planner: > > > > > > StateTtlConfig > > > .newBuilder(Time.milliseconds(retentionTime)) > > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > > .build(); > > > > > > > > > Best, > > > Jark > > > > > > > > > [1]: > > > > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27 > > > > > > > > > > > > > > > > > > On Wed, 29 Apr 2020 at 11:53, 刘大龙 <[hidden email]> wrote: > > > > > > > Hi, all! > > > > > > > > At flink master branch, we have supported state ttl for sql > mini-batch > > > > deduplication using incremental cleanup strategy on heap backend, > > refer to > > > > FLINK-16581. Because I want to test the performance of this feature, > > so I > > > > compile master branch code and deploy the jar to production > > > > environment,then run three types of tests, respectively: > > > > > > > > > > > > > > > > > > > > flink 1.9.0 release version enable state ttl > > > > flink 1.11-snapshot version disable state ttl > > > > flink 1.11-snapshot version enable state ttl > > > > > > > > > > > > > > > > > > > > The test query sql as follows: > > > > > > > > select order_date, > > > > sum(price * amount - goods_all_fav_amt - virtual_money_amt + > > > > goods_carriage_amt) as saleP, > > > > sum(amount) as saleN, > > > > count(distinct parent_sn) as orderN, > > > > count(distinct user_id) as cusN > > > > from( > > > > select order_date, user_id, > > > > order_type, order_status, terminal, last_update_time, > > > > goods_all_fav_amt, > > > > goods_carriage_amt, virtual_money_amt, price, amount, > > > > order_quality, quality_goods_cnt, acture_goods_amt > > > > from (select *, row_number() over(partition by order_id, > > > > order_goods_id order by proctime desc) as rownum from > > dm_trd_order_goods) > > > > where rownum=1 > > > > and (order_type in (1,2,3,4,5) or order_status = 70) > > > > and terminal = 'shop' and price > 0) > > > > group by order_date > > > > > > > > > > > > At runtime, this query will generate two operators which include > > > > Deduplication and GroupAgg. In the test, the configuration is same, > > > > parallelism is 20, set kafka consumer from the earliest, and disable > > > > mini-batch function, The test results as follows: > > > > > > > > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive > 1374w > > > > records, average tps at 5200/s, Flink UI picture link back pressure, > > > > checkpoint > > > > flink 1.11-snapshot version disable state ttl:this test lasted 28m, > > flink > > > > receive 883w records, average tps at 5200/s, Flink UI picture link > back > > > > pressure, checkpoint > > > > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, > > > > flink only receive 168w records because of deduplication operator > > serious > > > > back pressure, average tps at 270/s, moreover, checkpoint always fail > > > > because of deduplication operator serious back pressure, Flink UI > > picture > > > > link back pressure, checkpoint > > > > > > > > Deduplication state clean up implement in flink 1.9.0 use timer, but > > > > 1.11-snapshot version use StateTtlConfig, this is the main > difference. > > > > Comparing the three tests comprehensively, we can see that if disable > > state > > > > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable > > state > > > > ttl. However, if enable state ttl in 1.11-snapshot, performance down > is > > > > nearly 20 times, so I think incremental cleanup strategy cause this > > > > problem, what do you think about it? @azagrebin, @jark. > > > > > > > > Thanks. > > > > > > > > lsyldliu > > > > > > > > Zhejiang University, College of Control Science and engineer, CSC > > > > > > ------------------------------ > > 刘大龙 > > > > 浙江大学 控制系 智能系统与控制研究所 工控新楼217 > > 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 > > Tel:18867547281 > > Hi Jark, > > I use non-minibtach deduplication and group agg for the tests, > > non-minibatch deduplicaiton state ttl implementation has been refactored > > use StateTtlConfig replace timer in current 1.11 master branch that PR is > > my work, I also surprise to the 19x performance down. > |
Free forum by Nabble | Edit this page |