Unbearably slow Table API time-windowed stream join with RocksDBStateBackend

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Unbearably slow Table API time-windowed stream join with RocksDBStateBackend

LIU Xiao
Example SQL:

SELECT *
FROM stream1 s1, stream2 s2
WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime

And we have lots of messages in stream1 and stream2 share a same rowtime.

It runs fine when using heap as the state backend,
but requires lots of heap memory sometimes (when upstream out of sync, etc), and a risk of full gc exists.

When we use RocksDBStateBackend to lower the heap memory usage, we found our program runs unbearably slow.

After examing the code we found
org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1()
may be the cause of the problem (we are using Flink 1.6 but 1.8 should be same):
...
    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound of the current
      // row. Put it into the left cache, since later coming records from the right stream are
      // expected to be joined with it.
      var leftRowList = leftCache.get(timeForLeftRow)
      if (null == leftRowList) {
        leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      }
      leftRowList.add(JTuple2.of(leftRow, emitted))
      leftCache.put(timeForLeftRow, leftRowList)
...

In above code, if there are lots of messages with a same timeForLeftRow,
the serialization and deserialization cost will be very high when using RocksDBStateBackend.

A simple fix I came up with:
...
  // cache to store rows from the left stream
  //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
  private var leftCache: MapState[JTuple2[Long, Integer], JList[JTuple2[Row, Boolean]]] = _
  private var leftCacheSize: MapState[Long, Integer] = _
...
    // Check if we need to cache the current row.
    if (rightOperatorTime < rightQualifiedUpperBound) {
      // Operator time of right stream has not exceeded the upper window bound of the current
      // row. Put it into the left cache, since later coming records from the right stream are
      // expected to be joined with it.
      //var leftRowList = leftCache.get(timeForLeftRow)
      //if (null == leftRowList) {
      //  leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
      //}
      //leftRowList.add(JTuple2.of(leftRow, emitted))
      //leftCache.put(timeForLeftRow, leftRowList)
      var leftRowListSize = leftCacheSize.get(timeForLeftRow)
      if (null == leftRowListSize) {
        leftRowListSize = new Integer(0)
      }
      leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize), JTuple2.of(leftRow, emitted))
      leftCacheSize.put(timeForLeftRow, leftRowListSize + 1)
...

--
LIU Xiao <[hidden email]>

Reply | Threaded
Open this post in threaded view
|

Re: Unbearably slow Table API time-windowed stream join with RocksDBStateBackend

Jark Wu-2
Hi Xiao,

Thanks for reporting this.
You approach sounds good to me. But we have many similar problems in
existing streaming sql operator implementations.
So I think if State API / statebackend can provide a better state structure
to handle this situation would be great.

This is a similar problem with poor performance of RocksDBListState. And
the relative discussions have been raised several times [1][2].
The root cause is RocsDBStatBackend serialize the whole list as a byte[].
And there were some ideas proposed in the thread.

I cc'ed Yu Li who works on statebackend.

Thanks,
Jark


[1]: https://issues.apache.org/jira/browse/FLINK-8297
[2]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLINK-8297-A-solution-for-FLINK-8297-Timebased-RocksDBListState-tc28259.html


On Wed, 14 Aug 2019 at 14:46, LIU Xiao <[hidden email]> wrote:

> Example SQL:
>
> SELECT *
> FROM stream1 s1, stream2 s2
> WHERE s1.id = s2.id AND s1.rowtime = s2.rowtime
>
> And we have lots of messages in stream1 and stream2 share a same rowtime.
>
> It runs fine when using heap as the state backend,
> but requires lots of heap memory sometimes (when upstream out of sync,
> etc), and a risk of full gc exists.
>
> When we use RocksDBStateBackend to lower the heap memory usage, we found
> our program runs unbearably slow.
>
> After examing the code we found
> org.apache.flink.table.runtime.join.TimeBoundedStreamJoin#processElement1()
> may be the cause of the problem (we are using Flink 1.6 but 1.8 should be
> same):
> ...
>     // Check if we need to cache the current row.
>     if (rightOperatorTime < rightQualifiedUpperBound) {
>       // Operator time of right stream has not exceeded the upper window
> bound of the current
>       // row. Put it into the left cache, since later coming records from
> the right stream are
>       // expected to be joined with it.
>       var leftRowList = leftCache.get(timeForLeftRow)
>       if (null == leftRowList) {
>         leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
>       }
>       leftRowList.add(JTuple2.of(leftRow, emitted))
>       leftCache.put(timeForLeftRow, leftRowList)
> ...
>
> In above code, if there are lots of messages with a same timeForLeftRow,
> the serialization and deserialization cost will be very high when using
> RocksDBStateBackend.
>
> A simple fix I came up with:
> ...
>   // cache to store rows from the left stream
>   //private var leftCache: MapState[Long, JList[JTuple2[Row, Boolean]]] = _
>   private var leftCache: MapState[JTuple2[Long, Integer],
> JList[JTuple2[Row, Boolean]]] = _
>   private var leftCacheSize: MapState[Long, Integer] = _
> ...
>     // Check if we need to cache the current row.
>     if (rightOperatorTime < rightQualifiedUpperBound) {
>       // Operator time of right stream has not exceeded the upper window
> bound of the current
>       // row. Put it into the left cache, since later coming records from
> the right stream are
>       // expected to be joined with it.
>       //var leftRowList = leftCache.get(timeForLeftRow)
>       //if (null == leftRowList) {
>       //  leftRowList = new util.ArrayList[JTuple2[Row, Boolean]](1)
>       //}
>       //leftRowList.add(JTuple2.of(leftRow, emitted))
>       //leftCache.put(timeForLeftRow, leftRowList)
>       var leftRowListSize = leftCacheSize.get(timeForLeftRow)
>       if (null == leftRowListSize) {
>         leftRowListSize = new Integer(0)
>       }
>       leftCache.put(JTuple2.of(timeForLeftRow, leftRowListSize),
> JTuple2.of(leftRow, emitted))
>       leftCacheSize.put(timeForLeftRow, leftRowListSize + 1)
> ...
>
> --
> LIU Xiao <[hidden email]>
>
>