[jira] [Created] (FLINK-19708) Support KeyedSortedMapState in DataStream API

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19708) Support KeyedSortedMapState in DataStream API

Shang Yuanchun (Jira)
Sean Z created FLINK-19708:
------------------------------

             Summary: Support KeyedSortedMapState in DataStream API
                 Key: FLINK-19708
                 URL: https://issues.apache.org/jira/browse/FLINK-19708
             Project: Flink
          Issue Type: New Feature
          Components: API / DataStream, Runtime / State Backends
            Reporter: Sean Z


Current DataStream API doesn't have SortedMapState supported. There are lots of use cases based on sorted time-series data like range-query or higher/lower key fetch, and ordered data seems like a nature of time-series stream processing. Therefore, we propose to support the KeyedSortedMapState feature.
 
There were some previous discussions [FLINK-6219|https://issues.apache.org/jira/browse/FLINK-6219] about SortedMapState, and the thread was closed because blink code might cover this feature. However, the [blink code|[https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java]]  wasn't merged into the master branch since then. The major concern is the inconsistent comparison between heap/off-heap state backends. In RocksDB, the comparison should be based on bytes, which makes generic key types support challenging, and in heap state backend, the comparison is more about Comparable interface.
 
There are two possible solutions to this issue as discussed in dev email list,
 
1. There is a prototype feature called TemporalState which could limit the key type to Long type. It makes sense for most of the use cases are about timestamp as a key but it brings limitations to support generic key types. We also need to gracefully handle negative long.
 
2. We keep the different sorting behavior of different state backends and set it to bytes comparison for given serialization by default in off-heap state backends. Let users provide their own specific serializer if they want to sort some customized type on RocksDB. This solution could start with blink implementation, which has already supported  serializing various key types (almost all the atomic types) into an ordered bytes. [code|[https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered]]
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)