Hi Robert,
Comments in line > On Feb 28, 2020, at 2:51 AM, Robert Metzger <[hidden email]> wrote: > > Sorry for the late reply. > > There's not much you can do at the moment, as Flink needs to sync on the checkpoint barriers. > There's something in the making for addressing the issue soon: https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints <https://cwiki.apache.org/confluence/display/FLINK/FLIP-76:+Unaligned+Checkpoints> If I understand correctly, we need to make sure when snapshot state called, inflight records between barriers from different channels needs to be "materialized" (processed and pushed to down stream before snapshot called) Be more specifically, if we honor watermark progression and operator snapshot (barriers aligned), drain out of order processed records before actually snapshot. Will it work correctly? Detail here https://github.com/apache/flink/pull/11267/files <https://github.com/apache/flink/pull/11267/files> > Did you try out using the FsStateBackend? It’s skewed key causing rocksdb update states slow as far as we know, Ran probably can share more in flinkforward 2020 :) > If you are going to stick with rocks, I would recommend to understand what exactly causes the poor performance. I see the following areas: > - serialization costs > - disk / ssd speed > - network speed (during checkpoint creation) (as Yu mentioned) > - if you have asynchronous checkpoints enabled, they will also slow down the processing. > > > On Sun, Feb 23, 2020 at 8:27 PM Chen Qin <[hidden email] <mailto:[hidden email]>> wrote: > Just follow up on this thread, it accurately caused by key skew. Given single subtask is single threaded 5% of slow processing cause entire job back pressures on rocksdbstatebackend. > > Robert, > > What is blocking us enable multi threading in processor? I recall it has something todo with barrier and record in order. Can you share more insights on this? > > Chen > >> On Feb 21, 2020, at 4:56 AM, Robert Metzger <[hidden email] <mailto:[hidden email]>> wrote: >> >> >> I would try the FsStateBackend in this scenario, as you have enough memory available. >> >> On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <[hidden email] <mailto:[hidden email]>> wrote: >> Hi Gordon, >> >> Thanks for your reply! Regarding state size - we are at 200-300gb but we have 120 parallelism which will make each task handle ~2 - 3 gb state. (when we submit the job we are setting tm memory to 15g.) In this scenario what will be the best fit for statebackend? >> >> Thanks, >> Ran >> >> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <[hidden email] <mailto:[hidden email]>> wrote: >> Hi Ran, >> >> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <[hidden email] <mailto:[hidden email]>> wrote: >> Hi all, >> >> We have a Flink app that uses a KeyedProcessFunction, and in the function it requires a ValueState(of TreeSet) and the processElement method needs to access and update it. We tried to use RocksDB as our stateBackend but the performance is not good, and intuitively we think it was because of the serialization / deserialization on each processElement call. >> >> As you have already pointed out, serialization behaviour is a major difference between the 2 state backends, and will directly impact performance due to the extra runtime overhead in RocksDB. >> If you plan to continue using the RocksDB state backend, make sure to use MapState instead of ValueState where possible, since every access to the ValueState in the RocksDB backend requires serializing / deserializing the whole value. >> For MapState, de-/serialization happens per K-V access. Whether or not this makes sense would of course depend on your state access pattern. >> >> Then we tried to switch to use FsStateBackend (which keeps the in-flight data in the TaskManager’s memory according to doc), and it could resolve the performance issue. So we want to understand better what are the tradeoffs in choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is it supports incremental checkpoint, but would love to know what else we are losing in choosing FsStateBackend. >> >> As of now, feature-wise both backends support asynchronous snapshotting, state schema evolution, and access via the State Processor API. >> In the end, the major factor for deciding between the two state backends would be your expected state size. >> That being said, it could be possible in the future that savepoint formats for the backends are changed to be compatible, meaning that you will be able to switch between different backends upon restore [1]. >> >> >> Thanks a lot! >> Ran Zhang >> >> Cheers, >> Gordon >> >> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State <https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State> |
Free forum by Nabble | Edit this page |