Example SQL:
SELECT * FROM stream1 s1, stream2 s2 WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime And we have lots of messages in stream1 and stream2 share a same rowtime. It runs fine when using heap as the state backend, but requires lots of heap memory sometimes (when upstream out of sync, etc), and a risk of full gc exists. When we use RocksDBStateBackend to lower the heap memory usage, we found our program runs unbearably slow. After examing the code we found org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1() may be the cause of the problem (we are using Flink 1.6 but 1.8 should be same): ... // Check if we need to cache the current row. if (rightOperatorTime < rightQualifiedUpperBound) { // Operator time of right stream has not exceeded the upper window bound of the current // row. Put it into the left cache, since later coming records from the right stream are // expected to be joined with it. var leftRowList = leftCache.get(timeForLeftRow) if (null == leftRowList) { leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1) } leftRowList.add(JTuple2.of(leftRow, emitted)) leftCache.put(timeForLeftRow, leftRowList) ... In above code, if there are lots of messages with a same timeForLeftRow, the serialization and deserialization cost will be very high when using RocksDBStateBackend. A simple fix I came up with: ... // cache to store rows from the left stream //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _ private var leftCache: MapState[JTuple2[Long, Integer], JList[JTuple2[Row, Boolean]]] = _ private var leftCacheSize: MapState[Long, Integer] = _ ... // Check if we need to cache the current row. if (rightOperatorTime < rightQualifiedUpperBound) { // Operator time of right stream has not exceeded the upper window bound of the current // row. Put it into the left cache, since later coming records from the right stream are // expected to be joined with it. //var leftRowList = leftCache.get(timeForLeftRow) //if (null == leftRowList) { // leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1) //} //leftRowList.add(JTuple2.of(leftRow, emitted)) //leftCache.put(timeForLeftRow, leftRowList) var leftRowListSize = leftCacheSize.get(timeForLeftRow) if (null == leftRowListSize) { leftRowListSize = new Integer(0) } leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize), JTuple2.of(leftRow, emitted)) leftCacheSize.put(timeForLeftRow, leftRowListSize + 1) ... -- LIU Xiao <[hidden email]> |
Hi Xiao,
Thanks for reporting this. You approach sounds good to me. But we have many similar problems in existing streaming sql operator implementations. So I think if State API / statebackend can provide a better state structure to handle this situation would be great. This is a similar problem with poor performance of RocksDBListState. And the relative discussions have been raised several times [1][2]. The root cause is RocsDBStatBackend serialize the whole list as a byte[]. And there were some ideas proposed in the thread. I cc'ed Yu Li who works on statebackend. Thanks, Jark [1]: https://issues.apache.org/jira/browse/FLINK-8297 [2]: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLINK-8297-A-solution-for-FLINK-8297-Timebased-RocksDBListState-tc28259.html On Wed, 14 Aug 2019 at 14:46, LIU Xiao <[hidden email]> wrote: > Example SQL: > > SELECT * > FROM stream1 s1, stream2 s2 > WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime > > And we have lots of messages in stream1 and stream2 share a same rowtime. > > It runs fine when using heap as the state backend, > but requires lots of heap memory sometimes (when upstream out of sync, > etc), and a risk of full gc exists. > > When we use RocksDBStateBackend to lower the heap memory usage, we found > our program runs unbearably slow. > > After examing the code we found > org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1() > may be the cause of the problem (we are using Flink 1.6 but 1.8 should be > same): > ... > // Check if we need to cache the current row. > if (rightOperatorTime < rightQualifiedUpperBound) { > // Operator time of right stream has not exceeded the upper window > bound of the current > // row. Put it into the left cache, since later coming records from > the right stream are > // expected to be joined with it. > var leftRowList = leftCache.get(timeForLeftRow) > if (null == leftRowList) { > leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1) > } > leftRowList.add(JTuple2.of(leftRow, emitted)) > leftCache.put(timeForLeftRow, leftRowList) > ... > > In above code, if there are lots of messages with a same timeForLeftRow, > the serialization and deserialization cost will be very high when using > RocksDBStateBackend. > > A simple fix I came up with: > ... > // cache to store rows from the left stream > //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _ > private var leftCache: MapState[JTuple2[Long, Integer], > JList[JTuple2[Row, Boolean]]] = _ > private var leftCacheSize: MapState[Long, Integer] = _ > ... > // Check if we need to cache the current row. > if (rightOperatorTime < rightQualifiedUpperBound) { > // Operator time of right stream has not exceeded the upper window > bound of the current > // row. Put it into the left cache, since later coming records from > the right stream are > // expected to be joined with it. > //var leftRowList = leftCache.get(timeForLeftRow) > //if (null == leftRowList) { > // leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1) > //} > //leftRowList.add(JTuple2.of(leftRow, emitted)) > //leftCache.put(timeForLeftRow, leftRowList) > var leftRowListSize = leftCacheSize.get(timeForLeftRow) > if (null == leftRowListSize) { > leftRowListSize = new Integer(0) > } > leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize), > JTuple2.of(leftRow, emitted)) > leftCacheSize.put(timeForLeftRow, leftRowListSize + 1) > ... > > -- > LIU Xiao <[hidden email]> > > |
Free forum by Nabble | Edit this page |