[jira] [Created] (FLINK-16392) oneside sorted cache in intervaljoin

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16392) oneside sorted cache in intervaljoin

Shang Yuanchun (Jira)
Chen Qin created FLINK-16392:
--------------------------------

             Summary: oneside sorted cache in intervaljoin
                 Key: FLINK-16392
                 URL: https://issues.apache.org/jira/browse/FLINK-16392
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
    Affects Versions: 1.10.0
            Reporter: Chen Qin
             Fix For: 1.11.0


IntervalJoin is getting lots of usecases. Those use cases shares following similar pattern
 * left stream  pulled from static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal. Specifically as rocksdb fetch and update getting more expensive, performance took hit and unblock large use cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in in ProcessJoinFunction if they want to skip scan when intervaljoin operator receive events from left stream(static data set)
 * build sortedMap from otherBuffer of each seen key granularity
 ** expedite right stream lookup of left buffers without access rocksdb everytime
 ** if a key see event from left side, it cleanup buffer and load buffer from right side

 

Open discussion
 * how to control cache size?
 ** TBD
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. This is a small overhead to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new key seen.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)