Sean Z created FLINK-19708:
------------------------------
Summary: Support KeyedSortedMapState in DataStream API
Key: FLINK-19708
URL:
https://issues.apache.org/jira/browse/FLINK-19708 Project: Flink
Issue Type: New Feature
Components: API / DataStream, Runtime / State Backends
Reporter: Sean Z
Current DataStream API doesn't have SortedMapState supported. There are lots of use cases based on sorted time-series data like range-query or higher/lower key fetch, and ordered data seems like a nature of time-series stream processing. Therefore, we propose to support the KeyedSortedMapState feature.
There were some previous discussions [FLINK-6219|
https://issues.apache.org/jira/browse/FLINK-6219] about SortedMapState, and the thread was closed because blink code might cover this feature. However, the [blink code|[
https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java]] wasn't merged into the master branch since then. The major concern is the inconsistent comparison between heap/off-heap state backends. In RocksDB, the comparison should be based on bytes, which makes generic key types support challenging, and in heap state backend, the comparison is more about Comparable interface.
There are two possible solutions to this issue as discussed in dev email list,
1. There is a prototype feature called TemporalState which could limit the key type to Long type. It makes sense for most of the use cases are about timestamp as a key but it brings limitations to support generic key types. We also need to gracefully handle negative long.
2. We keep the different sorting behavior of different state backends and set it to bytes comparison for given serialization by default in off-heap state backends. Let users provide their own specific serializer if they want to sort some customized type on RocksDB. This solution could start with blink implementation, which has already supported serializing various key types (almost all the atomic types) into an ordered bytes. [code|[
https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered]]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)