-----原始邮件----- 发件人:"刘大龙" <[hidden email]> 发送时间:2020-05-06 17:55:25 (星期三) 收件人: "Jark Wu" <[hidden email]> 抄送: 主题: Re: Re: Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation Thanks for your tuning ideas, I will test it later. Just to emphasize, I use non-mini batch deduplication for tests. -----原始邮件----- 发件人:"Jark Wu" <[hidden email]> 发送时间:2020-05-05 10:48:27 (星期二) 收件人: dev <[hidden email]> 抄送: "刘大龙" <[hidden email]>, "Yu Li" <[hidden email]>, "Yun Tang" <[hidden email]> 主题: Re: Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation Hi Andrey, Thanks for the tuning ideas. I will explain the design of deduplication. The mini-batch implementation of deduplication buffers a bundle of input data in heap (Java Map), when the bundle size hit the trigger size or trigger time, the buffered data will be processed together. So we only need to access the state once per key. This is designed for rocksdb statebackend to reduce the frequently accessing, (de)serialization. And yes, this may slow down the checkpoint, but the suggested mini-batch timeout is <= 10s. From our production experience, it doesn't have much impact on checkpoint. Best, Jark On Tue, 5 May 2020 at 06:48, Andrey Zagrebin <[hidden email]> wrote: Hi lsyldliu, You can try to tune the StateTtlConfig. As the documentation suggests [1] the TTL incremental cleanup can decrease the per record performance. This is the price of the automatic cleanup. If the only thing, which happens mostly in your operator, is working with state then even checking one additional record to cleanup is two times more actions to do. Timer approach was discussed in TTL feature design. It needs an additional implementation and keeps more state but performs only one cleanup action exactly when needed so it is a performance/storage trade-off. Anyways, 20x degradation looks indeed a lot. As a first step, I would suggest to configure the incremental cleanup explicitly in `StateTtlConfigUtil#createTtlConfig` with a less entries to check, e.g. 1 because processFirstRow/processLastRow already access the state twice and do cleanup: .cleanupIncrementally(1, false) Also not sure but depending on the input data, finishBundle can happen mostly during the snapshotting which slows down taking the checkpoint. Could this fail the checkpoint accumulating the backpressure and slowing down the pipeline? Not sure why to keep the deduplication data in a Java map and in Flink state at the same time, why not to keep it only in Flink state and deduplicate on each incoming record? Best, Andrey [1] note 2 in https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup On Wed, Apr 29, 2020 at 11:53 AM 刘大龙 <[hidden email]> wrote: > > > > > -----原始邮件----- > > 发件人: "Jark Wu" <[hidden email]> > > 发送时间: 2020-04-29 14:09:44 (星期三) > > 收件人: dev <[hidden email]>, "Yu Li" <[hidden email]>, > [hidden email] > > 抄送: [hidden email] > > 主题: Re: The use of state ttl incremental cleanup strategy in sql > deduplication resulting in significant performance degradation > > > > Hi lsyldliu, > > > > Thanks for investigating this. > > > > First of all, if you are using mini-batch deduplication, it doesn't > support > > state ttl in 1.9. That's why the tps looks the same with 1.11 disable > state > > ttl. > > We just introduce state ttl for mini-batch deduplication recently. > > > > Regarding to the performance regression, it looks very surprise to me. > The > > performance is reduced by 19x when StateTtlConfig is enabled in 1.11. > > I don't have much experience of the underlying of StateTtlConfig. So I > loop > > in @Yu Li <[hidden email]> @YunTang in CC who may have more insights > on > > this. > > > > For more information, we use the following StateTtlConfig [1] in blink > > planner: > > > > StateTtlConfig > > .newBuilder(Time.milliseconds(retentionTime)) > > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > > .build(); > > > > > > Best, > > Jark > > > > > > [1]: > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27 > > > > > > > > > > > > On Wed, 29 Apr 2020 at 11:53, 刘大龙 <[hidden email]> wrote: > > > > > Hi, all! > > > > > > At flink master branch, we have supported state ttl for sql mini-batch > > > deduplication using incremental cleanup strategy on heap backend, > refer to > > > FLINK-16581. Because I want to test the performance of this feature, > so I > > > compile master branch code and deploy the jar to production > > > environment,then run three types of tests, respectively: > > > > > > > > > > > > > > > flink 1.9.0 release version enable state ttl > > > flink 1.11-snapshot version disable state ttl > > > flink 1.11-snapshot version enable state ttl > > > > > > > > > > > > > > > The test query sql as follows: > > > > > > select order_date, > > > sum(price * amount - goods_all_fav_amt - virtual_money_amt + > > > goods_carriage_amt) as saleP, > > > sum(amount) as saleN, > > > count(distinct parent_sn) as orderN, > > > count(distinct user_id) as cusN > > > from( > > > select order_date, user_id, > > > order_type, order_status, terminal, last_update_time, > > > goods_all_fav_amt, > > > goods_carriage_amt, virtual_money_amt, price, amount, > > > order_quality, quality_goods_cnt, acture_goods_amt > > > from (select *, row_number() over(partition by order_id, > > > order_goods_id order by proctime desc) as rownum from > dm_trd_order_goods) > > > where rownum=1 > > > and (order_type in (1,2,3,4,5) or order_status = 70) > > > and terminal = 'shop' and price > 0) > > > group by order_date > > > > > > > > > At runtime, this query will generate two operators which include > > > Deduplication and GroupAgg. In the test, the configuration is same, > > > parallelism is 20, set kafka consumer from the earliest, and disable > > > mini-batch function, The test results as follows: > > > > > > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w > > > records, average tps at 5200/s, Flink UI picture link back pressure, > > > checkpoint > > > flink 1.11-snapshot version disable state ttl:this test lasted 28m, > flink > > > receive 883w records, average tps at 5200/s, Flink UI picture link back > > > pressure, checkpoint > > > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, > > > flink only receive 168w records because of deduplication operator > serious > > > back pressure, average tps at 270/s, moreover, checkpoint always fail > > > because of deduplication operator serious back pressure, Flink UI > picture > > > link back pressure, checkpoint > > > > > > Deduplication state clean up implement in flink 1.9.0 use timer, but > > > 1.11-snapshot version use StateTtlConfig, this is the main difference. > > > Comparing the three tests comprehensively, we can see that if disable > state > > > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable > state > > > ttl. However, if enable state ttl in 1.11-snapshot, performance down is > > > nearly 20 times, so I think incremental cleanup strategy cause this > > > problem, what do you think about it? @azagrebin, @jark. > > > > > > Thanks. > > > > > > lsyldliu > > > > > > Zhejiang University, College of Control Science and engineer, CSC > > > ------------------------------ > 刘大龙 > > 浙江大学 控制系 智能系统与控制研究所 工控新楼217 > 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 > Tel:18867547281 > Hi Jark, > I use non-minibtach deduplication and group agg for the tests, > non-minibatch deduplicaiton state ttl implementation has been refactored > use StateTtlConfig replace timer in current 1.11 master branch that PR is > my work, I also surprise to the 19x performance down. Best regards, lsyldliu 刘大龙 浙江大学 控制系 智能系统与控制研究所 工控新楼217 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 Tel:18867547281 |
Free forum by Nabble | Edit this page |