Hi devs,
Current DataStream API doesn't have SortedMapState supported. There are lots of use cases based on sorted time-series data like range-query or higher/lower key fetch, and ordered data seems like a nature of time-series stream processing. Therefore, we propose to support the KeyedSortedMapState feature. There were some previous discussions [1] about SortedMapState, and the thread was closed because blink code might cover this feature. However, the blink code[2] wasn't merged into the master branch since then. The major concern is the inconsistent comparison between heap/off-heap state backends. In RocksDB, the comparison should be based on bytes, which makes generic key types support challenging, and in heap state backend, the comparison is more about Comparable interface. There are two possible solutions to this issue in my opinion, 1. We could limit the key type to Long type, for most of the use cases are about timestamp as a key. It's easier to implement but brings limitations to support generic key types. 2. We keep the different sorting behavior of different state backends and set it to bytes comparison for given serialization by default in off-heap state backends. Let users provide their own specific serializer if they want to sort some customized type on RocksDB. Look forward to having some discussions about this feature. Please share your ideas if anyone has context on this. Thanks! Best, Xinghan [1] https://issues.apache.org/jira/browse/FLINK-6219 [2] https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java |
I'm very interested in this topic, and have even done some prototyping of
solution 1 -- limiting the key type to Long -- which Nico Kruber and I called TemporalState in our prototype. I look forward to sharing what we learned, and to discussing this further, but I am completely overwhelmed with Flink Forward preparations at the moment. Best, David On Wed, Oct 14, 2020 at 9:30 AM Sean Z <[hidden email]> wrote: > Hi devs, > > Current DataStream API doesn't have SortedMapState supported. There are > lots of use cases based on sorted time-series data like range-query or > higher/lower key fetch, and ordered data seems like a nature of time-series > stream processing. Therefore, we propose to support the KeyedSortedMapState > feature. > > There were some previous discussions [1] about SortedMapState, and the > thread was closed because blink code might cover this feature. However, the > blink code[2] wasn't merged into the master branch since then. The major > concern is the inconsistent comparison between heap/off-heap state > backends. In RocksDB, the comparison should be based on bytes, which makes > generic key types support challenging, and in heap state backend, the > comparison is more about Comparable interface. > > There are two possible solutions to this issue in my opinion, > 1. We could limit the key type to Long type, for most of the use cases are > about timestamp as a key. It's easier to implement but brings limitations > to support generic key types. > 2. We keep the different sorting behavior of different state backends and > set it to bytes comparison for given serialization by default in off-heap > state backends. Let users provide their own specific serializer if they > want to sort some customized type on RocksDB. > > Look forward to having some discussions about this feature. Please share > your ideas if anyone has context on this. Thanks! > > Best, > Xinghan > > [1] https://issues.apache.org/jira/browse/FLINK-6219 > [2] > > https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java > |
Thanks for the reply! Look forward to learning more about this prototype.
Is there any way that we could track this TemporalState like Jira issue? or should we start to create one in Jira? so anyone has interest like me, could be part of the loop. Besides, is there any written docs/code about the prototype so we could have more context about that? Best, Xinghan On Wed, Oct 14, 2020 at 11:09 AM David Anderson <[hidden email]> wrote: > I'm very interested in this topic, and have even done some prototyping of > solution 1 -- limiting the key type to Long -- which Nico Kruber and I > called TemporalState in our prototype. > > I look forward to sharing what we learned, and to discussing this further, > but I am completely overwhelmed with Flink Forward preparations at the > moment. > > Best, > David > > On Wed, Oct 14, 2020 at 9:30 AM Sean Z <[hidden email]> wrote: > > > Hi devs, > > > > Current DataStream API doesn't have SortedMapState supported. There are > > lots of use cases based on sorted time-series data like range-query or > > higher/lower key fetch, and ordered data seems like a nature of > time-series > > stream processing. Therefore, we propose to support the > KeyedSortedMapState > > feature. > > > > There were some previous discussions [1] about SortedMapState, and the > > thread was closed because blink code might cover this feature. However, > the > > blink code[2] wasn't merged into the master branch since then. The major > > concern is the inconsistent comparison between heap/off-heap state > > backends. In RocksDB, the comparison should be based on bytes, which > makes > > generic key types support challenging, and in heap state backend, the > > comparison is more about Comparable interface. > > > > There are two possible solutions to this issue in my opinion, > > 1. We could limit the key type to Long type, for most of the use cases > are > > about timestamp as a key. It's easier to implement but brings limitations > > to support generic key types. > > 2. We keep the different sorting behavior of different state backends and > > set it to bytes comparison for given serialization by default in off-heap > > state backends. Let users provide their own specific serializer if they > > want to sort some customized type on RocksDB. > > > > Look forward to having some discussions about this feature. Please share > > your ideas if anyone has context on this. Thanks! > > > > Best, > > Xinghan > > > > [1] https://issues.apache.org/jira/browse/FLINK-6219 > > [2] > > > > > https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java > > > |
Hi,
Thanks for bringing this discussion. I think limiting the key type to Long can't resolve the comparison problem, because the bytes order and value order of negative numbers is different. Unless, we limit the key type to positive Long. But how to check this before submitting a job? In Blink code, we keep different sorting behavior in different statebackends. We also supported sorted map state for various key types (almost all the atomic types). The idea is serializing the given type value into an ordered bytes, see more: https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered Best, Jark On Thu, 15 Oct 2020 at 06:46, Sean Z <[hidden email]> wrote: > Thanks for the reply! Look forward to learning more about this prototype. > Is there any way that we could track this TemporalState like Jira issue? or > should we start to create one in Jira? so anyone has interest like me, > could be part of the loop. Besides, is there any written docs/code about > the prototype so we could have more context about that? > > Best, > Xinghan > > > On Wed, Oct 14, 2020 at 11:09 AM David Anderson <[hidden email]> > wrote: > > > I'm very interested in this topic, and have even done some prototyping of > > solution 1 -- limiting the key type to Long -- which Nico Kruber and I > > called TemporalState in our prototype. > > > > I look forward to sharing what we learned, and to discussing this > further, > > but I am completely overwhelmed with Flink Forward preparations at the > > moment. > > > > Best, > > David > > > > On Wed, Oct 14, 2020 at 9:30 AM Sean Z <[hidden email]> wrote: > > > > > Hi devs, > > > > > > Current DataStream API doesn't have SortedMapState supported. There are > > > lots of use cases based on sorted time-series data like range-query or > > > higher/lower key fetch, and ordered data seems like a nature of > > time-series > > > stream processing. Therefore, we propose to support the > > KeyedSortedMapState > > > feature. > > > > > > There were some previous discussions [1] about SortedMapState, and the > > > thread was closed because blink code might cover this feature. However, > > the > > > blink code[2] wasn't merged into the master branch since then. The > major > > > concern is the inconsistent comparison between heap/off-heap state > > > backends. In RocksDB, the comparison should be based on bytes, which > > makes > > > generic key types support challenging, and in heap state backend, the > > > comparison is more about Comparable interface. > > > > > > There are two possible solutions to this issue in my opinion, > > > 1. We could limit the key type to Long type, for most of the use cases > > are > > > about timestamp as a key. It's easier to implement but brings > limitations > > > to support generic key types. > > > 2. We keep the different sorting behavior of different state backends > and > > > set it to bytes comparison for given serialization by default in > off-heap > > > state backends. Let users provide their own specific serializer if they > > > want to sort some customized type on RocksDB. > > > > > > Look forward to having some discussions about this feature. Please > share > > > your ideas if anyone has context on this. Thanks! > > > > > > Best, > > > Xinghan > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6219 > > > [2] > > > > > > > > > https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java > > > > > > |
Hi Jark,
Thanks for the reply and sharing thoughts. Yes, negative long will make things complicated. We had the exact same issue when implementing our own sortedMapState. This could be solved by some special pre-defined serializers, and it looks like that's what blink did [1] as you mentioned. Blink implementation could be a good starting point and it would be better to extend to support random key types as long as users provide their own serializer, which is what I mentioned in option 2. I'm more in favor of this approach too. But the support of random key types seems the main blocker that prevents this Blink feature being merged into Flink, from what I understand. In my opinion, limiting the key type might be easier to implement and it is actually not a bad idea to start with lite/consistent/fixed features. but yeah I'm not sure what the Temporal State is trying to solve here for I don't have too much context on it. [1] https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered/OrderedBytes.java Best, Xinghan On Wed, Oct 14, 2020 at 7:19 PM Jark Wu <[hidden email]> wrote: > Hi, > > Thanks for bringing this discussion. > > I think limiting the key type to Long can't resolve the comparison problem, > because the bytes order and value order of negative numbers is different. > Unless, we limit the key type to positive Long. But how to check this > before submitting a job? > > In Blink code, we keep different sorting behavior in different > statebackends. > We also supported sorted map state for various key types (almost all the > atomic types). > The idea is serializing the given type value into an ordered bytes, see > more: > > > https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered > > Best, > Jark > > On Thu, 15 Oct 2020 at 06:46, Sean Z <[hidden email]> wrote: > > > Thanks for the reply! Look forward to learning more about this prototype. > > Is there any way that we could track this TemporalState like Jira issue? > or > > should we start to create one in Jira? so anyone has interest like me, > > could be part of the loop. Besides, is there any written docs/code about > > the prototype so we could have more context about that? > > > > Best, > > Xinghan > > > > > > On Wed, Oct 14, 2020 at 11:09 AM David Anderson <[hidden email]> > > wrote: > > > > > I'm very interested in this topic, and have even done some prototyping > of > > > solution 1 -- limiting the key type to Long -- which Nico Kruber and I > > > called TemporalState in our prototype. > > > > > > I look forward to sharing what we learned, and to discussing this > > further, > > > but I am completely overwhelmed with Flink Forward preparations at the > > > moment. > > > > > > Best, > > > David > > > > > > On Wed, Oct 14, 2020 at 9:30 AM Sean Z <[hidden email]> wrote: > > > > > > > Hi devs, > > > > > > > > Current DataStream API doesn't have SortedMapState supported. There > are > > > > lots of use cases based on sorted time-series data like range-query > or > > > > higher/lower key fetch, and ordered data seems like a nature of > > > time-series > > > > stream processing. Therefore, we propose to support the > > > KeyedSortedMapState > > > > feature. > > > > > > > > There were some previous discussions [1] about SortedMapState, and > the > > > > thread was closed because blink code might cover this feature. > However, > > > the > > > > blink code[2] wasn't merged into the master branch since then. The > > major > > > > concern is the inconsistent comparison between heap/off-heap state > > > > backends. In RocksDB, the comparison should be based on bytes, which > > > makes > > > > generic key types support challenging, and in heap state backend, the > > > > comparison is more about Comparable interface. > > > > > > > > There are two possible solutions to this issue in my opinion, > > > > 1. We could limit the key type to Long type, for most of the use > cases > > > are > > > > about timestamp as a key. It's easier to implement but brings > > limitations > > > > to support generic key types. > > > > 2. We keep the different sorting behavior of different state backends > > and > > > > set it to bytes comparison for given serialization by default in > > off-heap > > > > state backends. Let users provide their own specific serializer if > they > > > > want to sort some customized type on RocksDB. > > > > > > > > Look forward to having some discussions about this feature. Please > > share > > > > your ideas if anyone has context on this. Thanks! > > > > > > > > Best, > > > > Xinghan > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6219 > > > > [2] > > > > > > > > > > > > > > https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java > > > > > > > > > > |
Do we think this will be useful for users or do we first want to
introduce this for internal use cases, such as the Table API/SQL runner? Aljoscha On 15.10.20 10:35, Sean Z wrote: > Hi Jark, > > Thanks for the reply and sharing thoughts. > Yes, negative long will make things complicated. We had the exact same > issue when implementing our own sortedMapState. > This could be solved by some special pre-defined serializers, and it looks > like that's what blink did [1] as you mentioned. > > Blink implementation could be a good starting point and it would be better > to extend to support random key types as long as users provide their own > serializer, which is what I mentioned in option 2. I'm more in favor of > this approach too. But the support of random key types seems the main > blocker that prevents this Blink feature being merged into Flink, from what > I understand. > > In my opinion, limiting the key type might be easier to implement and it is > actually not a bad idea to start with lite/consistent/fixed features. but > yeah I'm not sure what the Temporal State is trying to solve here for I > don't have too much context on it. > > > [1] > https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered/OrderedBytes.java > > > Best, > Xinghan > > On Wed, Oct 14, 2020 at 7:19 PM Jark Wu <[hidden email]> wrote: > >> Hi, >> >> Thanks for bringing this discussion. >> >> I think limiting the key type to Long can't resolve the comparison problem, >> because the bytes order and value order of negative numbers is different. >> Unless, we limit the key type to positive Long. But how to check this >> before submitting a job? >> >> In Blink code, we keep different sorting behavior in different >> statebackends. >> We also supported sorted map state for various key types (almost all the >> atomic types). >> The idea is serializing the given type value into an ordered bytes, see >> more: >> >> >> https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered >> >> Best, >> Jark >> >> On Thu, 15 Oct 2020 at 06:46, Sean Z <[hidden email]> wrote: >> >>> Thanks for the reply! Look forward to learning more about this prototype. >>> Is there any way that we could track this TemporalState like Jira issue? >> or >>> should we start to create one in Jira? so anyone has interest like me, >>> could be part of the loop. Besides, is there any written docs/code about >>> the prototype so we could have more context about that? >>> >>> Best, >>> Xinghan >>> >>> >>> On Wed, Oct 14, 2020 at 11:09 AM David Anderson <[hidden email]> >>> wrote: >>> >>>> I'm very interested in this topic, and have even done some prototyping >> of >>>> solution 1 -- limiting the key type to Long -- which Nico Kruber and I >>>> called TemporalState in our prototype. >>>> >>>> I look forward to sharing what we learned, and to discussing this >>> further, >>>> but I am completely overwhelmed with Flink Forward preparations at the >>>> moment. >>>> >>>> Best, >>>> David >>>> >>>> On Wed, Oct 14, 2020 at 9:30 AM Sean Z <[hidden email]> wrote: >>>> >>>>> Hi devs, >>>>> >>>>> Current DataStream API doesn't have SortedMapState supported. There >> are >>>>> lots of use cases based on sorted time-series data like range-query >> or >>>>> higher/lower key fetch, and ordered data seems like a nature of >>>> time-series >>>>> stream processing. Therefore, we propose to support the >>>> KeyedSortedMapState >>>>> feature. >>>>> >>>>> There were some previous discussions [1] about SortedMapState, and >> the >>>>> thread was closed because blink code might cover this feature. >> However, >>>> the >>>>> blink code[2] wasn't merged into the master branch since then. The >>> major >>>>> concern is the inconsistent comparison between heap/off-heap state >>>>> backends. In RocksDB, the comparison should be based on bytes, which >>>> makes >>>>> generic key types support challenging, and in heap state backend, the >>>>> comparison is more about Comparable interface. >>>>> >>>>> There are two possible solutions to this issue in my opinion, >>>>> 1. We could limit the key type to Long type, for most of the use >> cases >>>> are >>>>> about timestamp as a key. It's easier to implement but brings >>> limitations >>>>> to support generic key types. >>>>> 2. We keep the different sorting behavior of different state backends >>> and >>>>> set it to bytes comparison for given serialization by default in >>> off-heap >>>>> state backends. Let users provide their own specific serializer if >> they >>>>> want to sort some customized type on RocksDB. >>>>> >>>>> Look forward to having some discussions about this feature. Please >>> share >>>>> your ideas if anyone has context on this. Thanks! >>>>> >>>>> Best, >>>>> Xinghan >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-6219 >>>>> [2] >>>>> >>>>> >>>> >>> >> https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java >>>>> >>>> >>> >> > |
Free forum by Nabble | Edit this page |