Hey All,
The last couple of days I have been playing around with the idea of building a streaming key-value store abstraction using stateful streaming operators that can be used within Flink Streaming programs seamlessly. Operations executed on this KV store would be fault tolerant as it integrates with the checkpointing mechanism, and if we add timestamps to each put/get/... operation we can use the watermarks to create fully deterministic results. This functionality is very useful for many applications, and is very hard to implement properly with some dedicates kv store. The KVStore abstraction could look as follows: KVStore<K,V> store = new KVStore<>; Operations: store.put(DataStream<Tuple2<K,V>>) store.get(DataStream<K>) -> DataStream<KV<K,V>> store.remove(DataStream<K>) -> DataStream<KV<K,V>> store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> DataStream<KV<X,V>[]> For the resulting streams I used a special KV abstraction which let's us return null values. The implementation uses a simple streaming operator for executing most of the operations (for multi get there is an additional merge operator) with either local or partitioned states for storing the kev-value pairs (my current prototype uses local states). And it can either execute operations eagerly (which would not provide deterministic results), or buffer up operations and execute them in order upon watermarks. As for use cases you can probably come up with many I will save that for now :D I have a prototype implementation here that can execute the operations described above (does not handle watermarks and time yet): https://github.com/gyfora/flink/tree/KVStore And also an example job: https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java What do you think? If you like it I will work on writing tests and it still needs a lot of tweaking and refactoring. This might be something we want to include with the standard streaming libraries at one point. Cheers, Gyula |
Hello,
As for use cases, in my old job at Ericsson we were building a streaming system that was processing data from telephone networks, and it was using key-value stores a LOT. For example, keeping track of various state info of the users (which cell are they currently connected to, what bearers do they have, ...); mapping from IDs of users in one subsystem of the telephone network to the IDs of the same users in an other subsystem; mapping from IDs of phone calls to lists of IDs of participating users; etc. So I imagine they would like this a lot. (At least, if they were considering moving to Flink :)) Best, Gabor 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > Hey All, > > The last couple of days I have been playing around with the idea of > building a streaming key-value store abstraction using stateful streaming > operators that can be used within Flink Streaming programs seamlessly. > > Operations executed on this KV store would be fault tolerant as it > integrates with the checkpointing mechanism, and if we add timestamps to > each put/get/... operation we can use the watermarks to create fully > deterministic results. This functionality is very useful for many > applications, and is very hard to implement properly with some dedicates kv > store. > > The KVStore abstraction could look as follows: > > KVStore<K,V> store = new KVStore<>; > > Operations: > > store.put(DataStream<Tuple2<K,V>>) > store.get(DataStream<K>) -> DataStream<KV<K,V>> > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > DataStream<KV<X,V>[]> > > For the resulting streams I used a special KV abstraction which let's us > return null values. > > The implementation uses a simple streaming operator for executing most of > the operations (for multi get there is an additional merge operator) with > either local or partitioned states for storing the kev-value pairs (my > current prototype uses local states). And it can either execute operations > eagerly (which would not provide deterministic results), or buffer up > operations and execute them in order upon watermarks. > > As for use cases you can probably come up with many I will save that for > now :D > > I have a prototype implementation here that can execute the operations > described above (does not handle watermarks and time yet): > > https://github.com/gyfora/flink/tree/KVStore > And also an example job: > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > What do you think? > If you like it I will work on writing tests and it still needs a lot of > tweaking and refactoring. This might be something we want to include with > the standard streaming libraries at one point. > > Cheers, > Gyula |
@Gyula
Can you explain a bit what this KeyValue store would do more then the partitioned key/value state? On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <[hidden email]> wrote: > Hello, > > As for use cases, in my old job at Ericsson we were building a > streaming system that was processing data from telephone networks, and > it was using key-value stores a LOT. For example, keeping track of > various state info of the users (which cell are they currently > connected to, what bearers do they have, ...); mapping from IDs of > users in one subsystem of the telephone network to the IDs of the same > users in an other subsystem; mapping from IDs of phone calls to lists > of IDs of participating users; etc. > So I imagine they would like this a lot. (At least, if they were > considering moving to Flink :)) > > Best, > Gabor > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > Hey All, > > > > The last couple of days I have been playing around with the idea of > > building a streaming key-value store abstraction using stateful streaming > > operators that can be used within Flink Streaming programs seamlessly. > > > > Operations executed on this KV store would be fault tolerant as it > > integrates with the checkpointing mechanism, and if we add timestamps to > > each put/get/... operation we can use the watermarks to create fully > > deterministic results. This functionality is very useful for many > > applications, and is very hard to implement properly with some dedicates > kv > > store. > > > > The KVStore abstraction could look as follows: > > > > KVStore<K,V> store = new KVStore<>; > > > > Operations: > > > > store.put(DataStream<Tuple2<K,V>>) > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > > DataStream<KV<X,V>[]> > > > > For the resulting streams I used a special KV abstraction which let's us > > return null values. > > > > The implementation uses a simple streaming operator for executing most of > > the operations (for multi get there is an additional merge operator) with > > either local or partitioned states for storing the kev-value pairs (my > > current prototype uses local states). And it can either execute > operations > > eagerly (which would not provide deterministic results), or buffer up > > operations and execute them in order upon watermarks. > > > > As for use cases you can probably come up with many I will save that for > > now :D > > > > I have a prototype implementation here that can execute the operations > > described above (does not handle watermarks and time yet): > > > > https://github.com/gyfora/flink/tree/KVStore > > And also an example job: > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > What do you think? > > If you like it I will work on writing tests and it still needs a lot of > > tweaking and refactoring. This might be something we want to include with > > the standard streaming libraries at one point. > > > > Cheers, > > Gyula > |
@Stephan:
Technically speaking this is really just a partitioned key-value state and a fancy operator executing special operations on this state. From the user's perspective though this is something hard to implement. If you want to share state between two stream for instance this way (getting updates from one stream and enriching the other one) you would probably use a connected datastream and custom implement the Key-value store logic. But once you have one(or more) update stream and many get streams this implementation will not work. So either the user end up replicating the whole state in multiple connected operators, or custom implement some inefficient wrapper class to take care of all the put/get operations. The Idea behind this is to give a very simple abstraction for this type of processing that uses the flink runtime efficiently instead of relying on custom implementations. Let me give you a stupid example: You receive Temperature data in the form of (city, temperature), and you are computing a rolling avg for each city. Now you have 2 other incoming streams: first is a stream of some other info about the city let's say population (city, population) and you want to combine it with the last known avg temperature to produce (city, temp, pop) triplets. The second stream is a pair of cities (city,city) and you are interested in the difference of the temperature. For enriching the (city, pop) to (city,temp,pop) you would probably use a CoFlatMap and store the last known rolling avg as state. For computing the (city,city) temperature difference it is a little more difficult, as you need to get the temperature for both cities then combine in a second operator. If you don't want to replicate your state, you have to combine these two problems to a common wrapper type and execute them on a same operator which will keep the avg state. With the KVStore abstraction this is very simple: you create a KVStore<City, Temp> For enriching you use kvStore.getWithKeySelector() which will give you ((cit,pop), temp) pairs and you are done. For computing the difference, you can use kvStore.multiget(...) and get for the 2 cities at the same type. The kv store will abstract away the getting of the 2 keys separately and merging them so it will return [(city1, t1), (city2,t2)]. This might be slightly artificial example but I think it makes the point. Implementing these jobs efficiently is not trivial for the users but I think it is a very common problem. Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. szept. 8., K, 14:53): > @Gyula > > Can you explain a bit what this KeyValue store would do more then the > partitioned key/value state? > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <[hidden email]> wrote: > > > Hello, > > > > As for use cases, in my old job at Ericsson we were building a > > streaming system that was processing data from telephone networks, and > > it was using key-value stores a LOT. For example, keeping track of > > various state info of the users (which cell are they currently > > connected to, what bearers do they have, ...); mapping from IDs of > > users in one subsystem of the telephone network to the IDs of the same > > users in an other subsystem; mapping from IDs of phone calls to lists > > of IDs of participating users; etc. > > So I imagine they would like this a lot. (At least, if they were > > considering moving to Flink :)) > > > > Best, > > Gabor > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > > Hey All, > > > > > > The last couple of days I have been playing around with the idea of > > > building a streaming key-value store abstraction using stateful > streaming > > > operators that can be used within Flink Streaming programs seamlessly. > > > > > > Operations executed on this KV store would be fault tolerant as it > > > integrates with the checkpointing mechanism, and if we add timestamps > to > > > each put/get/... operation we can use the watermarks to create fully > > > deterministic results. This functionality is very useful for many > > > applications, and is very hard to implement properly with some > dedicates > > kv > > > store. > > > > > > The KVStore abstraction could look as follows: > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > Operations: > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > > > DataStream<KV<X,V>[]> > > > > > > For the resulting streams I used a special KV abstraction which let's > us > > > return null values. > > > > > > The implementation uses a simple streaming operator for executing most > of > > > the operations (for multi get there is an additional merge operator) > with > > > either local or partitioned states for storing the kev-value pairs (my > > > current prototype uses local states). And it can either execute > > operations > > > eagerly (which would not provide deterministic results), or buffer up > > > operations and execute them in order upon watermarks. > > > > > > As for use cases you can probably come up with many I will save that > for > > > now :D > > > > > > I have a prototype implementation here that can execute the operations > > > described above (does not handle watermarks and time yet): > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > And also an example job: > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > What do you think? > > > If you like it I will work on writing tests and it still needs a lot of > > > tweaking and refactoring. This might be something we want to include > with > > > the standard streaming libraries at one point. > > > > > > Cheers, > > > Gyula > > > |
That's a very nice application of the Stream API and partitioned state. :D
I think we should run some tests on a cluster based on this to see what kind of throughput the partitioned state system can handle and also how it behaves with larger numbers of keys. The KVStore is just an interface and the really heavy lifting is done by the state system so this would be a good test for it. On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email]> wrote: > @Stephan: > > Technically speaking this is really just a partitioned key-value state and > a fancy operator executing special operations on this state. > > From the user's perspective though this is something hard to implement. If > you want to share state between two stream for instance this way (getting > updates from one stream and enriching the other one) you would probably use > a connected datastream and custom implement the Key-value store logic. But > once you have one(or more) update stream and many get streams this > implementation will not work. So either the user end up replicating the > whole state in multiple connected operators, or custom implement some > inefficient wrapper class to take care of all the put/get operations. > > The Idea behind this is to give a very simple abstraction for this type of > processing that uses the flink runtime efficiently instead of relying on > custom implementations. > > Let me give you a stupid example: > > You receive Temperature data in the form of (city, temperature), and you > are computing a rolling avg for each city. > Now you have 2 other incoming streams: first is a stream of some other info > about the city let's say population (city, population) and you want to > combine it with the last known avg temperature to produce (city, temp, pop) > triplets. The second stream is a pair of cities (city,city) and you are > interested in the difference of the temperature. > > For enriching the (city, pop) to (city,temp,pop) you would probably use a > CoFlatMap and store the last known rolling avg as state. For computing the > (city,city) temperature difference it is a little more difficult, as you > need to get the temperature for both cities then combine in a second > operator. If you don't want to replicate your state, you have to combine > these two problems to a common wrapper type and execute them on a same > operator which will keep the avg state. > > With the KVStore abstraction this is very simple: > you create a KVStore<City, Temp> > For enriching you use kvStore.getWithKeySelector() which will give you > ((cit,pop), temp) pairs and you are done. For computing the difference, you > can use kvStore.multiget(...) and get for the 2 cities at the same type. > The kv store will abstract away the getting of the 2 keys separately and > merging them so it will return [(city1, t1), (city2,t2)]. > > This might be slightly artificial example but I think it makes the point. > Implementing these jobs efficiently is not trivial for the users but I > think it is a very common problem. > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. szept. 8., K, > 14:53): > > > @Gyula > > > > Can you explain a bit what this KeyValue store would do more then the > > partitioned key/value state? > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <[hidden email]> wrote: > > > > > Hello, > > > > > > As for use cases, in my old job at Ericsson we were building a > > > streaming system that was processing data from telephone networks, and > > > it was using key-value stores a LOT. For example, keeping track of > > > various state info of the users (which cell are they currently > > > connected to, what bearers do they have, ...); mapping from IDs of > > > users in one subsystem of the telephone network to the IDs of the same > > > users in an other subsystem; mapping from IDs of phone calls to lists > > > of IDs of participating users; etc. > > > So I imagine they would like this a lot. (At least, if they were > > > considering moving to Flink :)) > > > > > > Best, > > > Gabor > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > > > Hey All, > > > > > > > > The last couple of days I have been playing around with the idea of > > > > building a streaming key-value store abstraction using stateful > > streaming > > > > operators that can be used within Flink Streaming programs > seamlessly. > > > > > > > > Operations executed on this KV store would be fault tolerant as it > > > > integrates with the checkpointing mechanism, and if we add timestamps > > to > > > > each put/get/... operation we can use the watermarks to create fully > > > > deterministic results. This functionality is very useful for many > > > > applications, and is very hard to implement properly with some > > dedicates > > > kv > > > > store. > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > Operations: > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > > > > DataStream<KV<X,V>[]> > > > > > > > > For the resulting streams I used a special KV abstraction which let's > > us > > > > return null values. > > > > > > > > The implementation uses a simple streaming operator for executing > most > > of > > > > the operations (for multi get there is an additional merge operator) > > with > > > > either local or partitioned states for storing the kev-value pairs > (my > > > > current prototype uses local states). And it can either execute > > > operations > > > > eagerly (which would not provide deterministic results), or buffer up > > > > operations and execute them in order upon watermarks. > > > > > > > > As for use cases you can probably come up with many I will save that > > for > > > > now :D > > > > > > > > I have a prototype implementation here that can execute the > operations > > > > described above (does not handle watermarks and time yet): > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > And also an example job: > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > What do you think? > > > > If you like it I will work on writing tests and it still needs a lot > of > > > > tweaking and refactoring. This might be something we want to include > > with > > > > the standard streaming libraries at one point. > > > > > > > > Cheers, > > > > Gyula > > > > > > |
Just a silly question.
For the example you described, in a data flow model, you would do something like this: Have query ids added to the city pairs (qid, city1, city2), then split the query stream on the two cities and co-group it with the updates stream ((city1, qid) , (city, temp)), same for city2, then emit (qid, city1, temp1), (qid, city2, temp2) from the two co-groups, group on the qid, and apply a difference operator to get the final answer. Is your idea to implement a way to generalize this logic, or it would use remote read/write to a KV-store? -- Gianmarco On 8 September 2015 at 16:27, Aljoscha Krettek <[hidden email]> wrote: > That's a very nice application of the Stream API and partitioned state. :D > > I think we should run some tests on a cluster based on this to see what > kind of throughput the partitioned state system can handle and also how it > behaves with larger numbers of keys. The KVStore is just an interface and > the really heavy lifting is done by the state system so this would be a > good test for it. > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email]> wrote: > > > @Stephan: > > > > Technically speaking this is really just a partitioned key-value state > and > > a fancy operator executing special operations on this state. > > > > From the user's perspective though this is something hard to implement. > If > > you want to share state between two stream for instance this way (getting > > updates from one stream and enriching the other one) you would probably > use > > a connected datastream and custom implement the Key-value store logic. > But > > once you have one(or more) update stream and many get streams this > > implementation will not work. So either the user end up replicating the > > whole state in multiple connected operators, or custom implement some > > inefficient wrapper class to take care of all the put/get operations. > > > > The Idea behind this is to give a very simple abstraction for this type > of > > processing that uses the flink runtime efficiently instead of relying on > > custom implementations. > > > > Let me give you a stupid example: > > > > You receive Temperature data in the form of (city, temperature), and you > > are computing a rolling avg for each city. > > Now you have 2 other incoming streams: first is a stream of some other > info > > about the city let's say population (city, population) and you want to > > combine it with the last known avg temperature to produce (city, temp, > pop) > > triplets. The second stream is a pair of cities (city,city) and you are > > interested in the difference of the temperature. > > > > For enriching the (city, pop) to (city,temp,pop) you would probably use a > > CoFlatMap and store the last known rolling avg as state. For computing > the > > (city,city) temperature difference it is a little more difficult, as you > > need to get the temperature for both cities then combine in a second > > operator. If you don't want to replicate your state, you have to combine > > these two problems to a common wrapper type and execute them on a same > > operator which will keep the avg state. > > > > With the KVStore abstraction this is very simple: > > you create a KVStore<City, Temp> > > For enriching you use kvStore.getWithKeySelector() which will give you > > ((cit,pop), temp) pairs and you are done. For computing the difference, > you > > can use kvStore.multiget(...) and get for the 2 cities at the same type. > > The kv store will abstract away the getting of the 2 keys separately and > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > This might be slightly artificial example but I think it makes the point. > > Implementing these jobs efficiently is not trivial for the users but I > > think it is a very common problem. > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. szept. 8., K, > > 14:53): > > > > > @Gyula > > > > > > Can you explain a bit what this KeyValue store would do more then the > > > partitioned key/value state? > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <[hidden email]> wrote: > > > > > > > Hello, > > > > > > > > As for use cases, in my old job at Ericsson we were building a > > > > streaming system that was processing data from telephone networks, > and > > > > it was using key-value stores a LOT. For example, keeping track of > > > > various state info of the users (which cell are they currently > > > > connected to, what bearers do they have, ...); mapping from IDs of > > > > users in one subsystem of the telephone network to the IDs of the > same > > > > users in an other subsystem; mapping from IDs of phone calls to lists > > > > of IDs of participating users; etc. > > > > So I imagine they would like this a lot. (At least, if they were > > > > considering moving to Flink :)) > > > > > > > > Best, > > > > Gabor > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > > > > Hey All, > > > > > > > > > > The last couple of days I have been playing around with the idea of > > > > > building a streaming key-value store abstraction using stateful > > > streaming > > > > > operators that can be used within Flink Streaming programs > > seamlessly. > > > > > > > > > > Operations executed on this KV store would be fault tolerant as it > > > > > integrates with the checkpointing mechanism, and if we add > timestamps > > > to > > > > > each put/get/... operation we can use the watermarks to create > fully > > > > > deterministic results. This functionality is very useful for many > > > > > applications, and is very hard to implement properly with some > > > dedicates > > > > kv > > > > > store. > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > Operations: > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > For the resulting streams I used a special KV abstraction which > let's > > > us > > > > > return null values. > > > > > > > > > > The implementation uses a simple streaming operator for executing > > most > > > of > > > > > the operations (for multi get there is an additional merge > operator) > > > with > > > > > either local or partitioned states for storing the kev-value pairs > > (my > > > > > current prototype uses local states). And it can either execute > > > > operations > > > > > eagerly (which would not provide deterministic results), or buffer > up > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > As for use cases you can probably come up with many I will save > that > > > for > > > > > now :D > > > > > > > > > > I have a prototype implementation here that can execute the > > operations > > > > > described above (does not handle watermarks and time yet): > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > What do you think? > > > > > If you like it I will work on writing tests and it still needs a > lot > > of > > > > > tweaking and refactoring. This might be something we want to > include > > > with > > > > > the standard streaming libraries at one point. > > > > > > > > > > Cheers, > > > > > Gyula > > > > > > > > > > |
Hey Gianmarco,
So the implementation looks something different: The update stream is received by a stateful KVStoreOperator which stores the K-V pairs as their partitioned state. The query for the 2 cities is assigned an ID yes, and is split to the 2 cities, and each of these are sent to the same KVStoreOperator as the update stream. The output is the value for each key practically (qid, city1, temp1) which is retreived from the operator state , and this output is merged in a next operator to form the KV[] output on which the user can execute the difference if he wants. So actually no co-group is happening although semantically it might be similar. Instead I use stateful operators to be much more efficient. Does this answer you question? Gyula Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: 2015. szept. 9., Sze, 14:29): > Just a silly question. > For the example you described, in a data flow model, you would do something > like this: > > Have query ids added to the city pairs (qid, city1, city2), > then split the query stream on the two cities and co-group it with the > updates stream ((city1, qid) , (city, temp)), same for city2, > then emit (qid, city1, temp1), (qid, city2, temp2) from the two co-groups, > group on the qid, and apply a difference operator to get the final answer. > > Is your idea to implement a way to generalize this logic, or it would use > remote read/write to a KV-store? > > -- > Gianmarco > > On 8 September 2015 at 16:27, Aljoscha Krettek <[hidden email]> > wrote: > > > That's a very nice application of the Stream API and partitioned state. > :D > > > > I think we should run some tests on a cluster based on this to see what > > kind of throughput the partitioned state system can handle and also how > it > > behaves with larger numbers of keys. The KVStore is just an interface and > > the really heavy lifting is done by the state system so this would be a > > good test for it. > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email]> wrote: > > > > > @Stephan: > > > > > > Technically speaking this is really just a partitioned key-value state > > and > > > a fancy operator executing special operations on this state. > > > > > > From the user's perspective though this is something hard to implement. > > If > > > you want to share state between two stream for instance this way > (getting > > > updates from one stream and enriching the other one) you would probably > > use > > > a connected datastream and custom implement the Key-value store logic. > > But > > > once you have one(or more) update stream and many get streams this > > > implementation will not work. So either the user end up replicating the > > > whole state in multiple connected operators, or custom implement some > > > inefficient wrapper class to take care of all the put/get operations. > > > > > > The Idea behind this is to give a very simple abstraction for this type > > of > > > processing that uses the flink runtime efficiently instead of relying > on > > > custom implementations. > > > > > > Let me give you a stupid example: > > > > > > You receive Temperature data in the form of (city, temperature), and > you > > > are computing a rolling avg for each city. > > > Now you have 2 other incoming streams: first is a stream of some other > > info > > > about the city let's say population (city, population) and you want to > > > combine it with the last known avg temperature to produce (city, temp, > > pop) > > > triplets. The second stream is a pair of cities (city,city) and you are > > > interested in the difference of the temperature. > > > > > > For enriching the (city, pop) to (city,temp,pop) you would probably > use a > > > CoFlatMap and store the last known rolling avg as state. For computing > > the > > > (city,city) temperature difference it is a little more difficult, as > you > > > need to get the temperature for both cities then combine in a second > > > operator. If you don't want to replicate your state, you have to > combine > > > these two problems to a common wrapper type and execute them on a same > > > operator which will keep the avg state. > > > > > > With the KVStore abstraction this is very simple: > > > you create a KVStore<City, Temp> > > > For enriching you use kvStore.getWithKeySelector() which will give you > > > ((cit,pop), temp) pairs and you are done. For computing the difference, > > you > > > can use kvStore.multiget(...) and get for the 2 cities at the same > type. > > > The kv store will abstract away the getting of the 2 keys separately > and > > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > > > This might be slightly artificial example but I think it makes the > point. > > > Implementing these jobs efficiently is not trivial for the users but I > > > think it is a very common problem. > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. szept. 8., K, > > > 14:53): > > > > > > > @Gyula > > > > > > > > Can you explain a bit what this KeyValue store would do more then the > > > > partitioned key/value state? > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <[hidden email]> > wrote: > > > > > > > > > Hello, > > > > > > > > > > As for use cases, in my old job at Ericsson we were building a > > > > > streaming system that was processing data from telephone networks, > > and > > > > > it was using key-value stores a LOT. For example, keeping track of > > > > > various state info of the users (which cell are they currently > > > > > connected to, what bearers do they have, ...); mapping from IDs of > > > > > users in one subsystem of the telephone network to the IDs of the > > same > > > > > users in an other subsystem; mapping from IDs of phone calls to > lists > > > > > of IDs of participating users; etc. > > > > > So I imagine they would like this a lot. (At least, if they were > > > > > considering moving to Flink :)) > > > > > > > > > > Best, > > > > > Gabor > > > > > > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > > > > > Hey All, > > > > > > > > > > > > The last couple of days I have been playing around with the idea > of > > > > > > building a streaming key-value store abstraction using stateful > > > > streaming > > > > > > operators that can be used within Flink Streaming programs > > > seamlessly. > > > > > > > > > > > > Operations executed on this KV store would be fault tolerant as > it > > > > > > integrates with the checkpointing mechanism, and if we add > > timestamps > > > > to > > > > > > each put/get/... operation we can use the watermarks to create > > fully > > > > > > deterministic results. This functionality is very useful for many > > > > > > applications, and is very hard to implement properly with some > > > > dedicates > > > > > kv > > > > > > store. > > > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > > > Operations: > > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > > > For the resulting streams I used a special KV abstraction which > > let's > > > > us > > > > > > return null values. > > > > > > > > > > > > The implementation uses a simple streaming operator for executing > > > most > > > > of > > > > > > the operations (for multi get there is an additional merge > > operator) > > > > with > > > > > > either local or partitioned states for storing the kev-value > pairs > > > (my > > > > > > current prototype uses local states). And it can either execute > > > > > operations > > > > > > eagerly (which would not provide deterministic results), or > buffer > > up > > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > > > As for use cases you can probably come up with many I will save > > that > > > > for > > > > > > now :D > > > > > > > > > > > > I have a prototype implementation here that can execute the > > > operations > > > > > > described above (does not handle watermarks and time yet): > > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > > > What do you think? > > > > > > If you like it I will work on writing tests and it still needs a > > lot > > > of > > > > > > tweaking and refactoring. This might be something we want to > > include > > > > with > > > > > > the standard streaming libraries at one point. > > > > > > > > > > > > Cheers, > > > > > > Gyula > > > > > > > > > > > > > > > |
Yes, pretty clear. I guess semantically it's still a co-group, but
implemented slightly differently. Thanks! -- Gianmarco On 9 September 2015 at 15:37, Gyula Fóra <[hidden email]> wrote: > Hey Gianmarco, > > So the implementation looks something different: > > The update stream is received by a stateful KVStoreOperator which stores > the K-V pairs as their partitioned state. > > The query for the 2 cities is assigned an ID yes, and is split to the 2 > cities, and each of these are sent to the same KVStoreOperator as the > update stream. The output is the value for each key practically (qid, > city1, temp1) which is retreived from the operator state , and this output > is merged in a next operator to form the KV[] output on which the user can > execute the difference if he wants. > > So actually no co-group is happening although semantically it might be > similar. Instead I use stateful operators to be much more efficient. > > Does this answer you question? > > Gyula > > Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: 2015. > szept. 9., Sze, 14:29): > > > Just a silly question. > > For the example you described, in a data flow model, you would do > something > > like this: > > > > Have query ids added to the city pairs (qid, city1, city2), > > then split the query stream on the two cities and co-group it with the > > updates stream ((city1, qid) , (city, temp)), same for city2, > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two > co-groups, > > group on the qid, and apply a difference operator to get the final > answer. > > > > Is your idea to implement a way to generalize this logic, or it would > use > > remote read/write to a KV-store? > > > > -- > > Gianmarco > > > > On 8 September 2015 at 16:27, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > That's a very nice application of the Stream API and partitioned state. > > :D > > > > > > I think we should run some tests on a cluster based on this to see > what > > > kind of throughput the partitioned state system can handle and also how > > it > > > behaves with larger numbers of keys. The KVStore is just an interface > and > > > the really heavy lifting is done by the state system so this would be a > > > good test for it. > > > > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email]> wrote: > > > > > > > @Stephan: > > > > > > > > Technically speaking this is really just a partitioned key-value > state > > > and > > > > a fancy operator executing special operations on this state. > > > > > > > > From the user's perspective though this is something hard to > implement. > > > If > > > > you want to share state between two stream for instance this way > > (getting > > > > updates from one stream and enriching the other one) you would > probably > > > use > > > > a connected datastream and custom implement the Key-value store > logic. > > > But > > > > once you have one(or more) update stream and many get streams this > > > > implementation will not work. So either the user end up replicating > the > > > > whole state in multiple connected operators, or custom implement some > > > > inefficient wrapper class to take care of all the put/get operations. > > > > > > > > The Idea behind this is to give a very simple abstraction for this > type > > > of > > > > processing that uses the flink runtime efficiently instead of relying > > on > > > > custom implementations. > > > > > > > > Let me give you a stupid example: > > > > > > > > You receive Temperature data in the form of (city, temperature), and > > you > > > > are computing a rolling avg for each city. > > > > Now you have 2 other incoming streams: first is a stream of some > other > > > info > > > > about the city let's say population (city, population) and you want > to > > > > combine it with the last known avg temperature to produce (city, > temp, > > > pop) > > > > triplets. The second stream is a pair of cities (city,city) and you > are > > > > interested in the difference of the temperature. > > > > > > > > For enriching the (city, pop) to (city,temp,pop) you would probably > > use a > > > > CoFlatMap and store the last known rolling avg as state. For > computing > > > the > > > > (city,city) temperature difference it is a little more difficult, as > > you > > > > need to get the temperature for both cities then combine in a second > > > > operator. If you don't want to replicate your state, you have to > > combine > > > > these two problems to a common wrapper type and execute them on a > same > > > > operator which will keep the avg state. > > > > > > > > With the KVStore abstraction this is very simple: > > > > you create a KVStore<City, Temp> > > > > For enriching you use kvStore.getWithKeySelector() which will give > you > > > > ((cit,pop), temp) pairs and you are done. For computing the > difference, > > > you > > > > can use kvStore.multiget(...) and get for the 2 cities at the same > > type. > > > > The kv store will abstract away the getting of the 2 keys separately > > and > > > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > > > > > This might be slightly artificial example but I think it makes the > > point. > > > > Implementing these jobs efficiently is not trivial for the users but > I > > > > think it is a very common problem. > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. szept. 8., > K, > > > > 14:53): > > > > > > > > > @Gyula > > > > > > > > > > Can you explain a bit what this KeyValue store would do more then > the > > > > > partitioned key/value state? > > > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <[hidden email]> > > wrote: > > > > > > > > > > > Hello, > > > > > > > > > > > > As for use cases, in my old job at Ericsson we were building a > > > > > > streaming system that was processing data from telephone > networks, > > > and > > > > > > it was using key-value stores a LOT. For example, keeping track > of > > > > > > various state info of the users (which cell are they currently > > > > > > connected to, what bearers do they have, ...); mapping from IDs > of > > > > > > users in one subsystem of the telephone network to the IDs of the > > > same > > > > > > users in an other subsystem; mapping from IDs of phone calls to > > lists > > > > > > of IDs of participating users; etc. > > > > > > So I imagine they would like this a lot. (At least, if they were > > > > > > considering moving to Flink :)) > > > > > > > > > > > > Best, > > > > > > Gabor > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > > > > > > Hey All, > > > > > > > > > > > > > > The last couple of days I have been playing around with the > idea > > of > > > > > > > building a streaming key-value store abstraction using stateful > > > > > streaming > > > > > > > operators that can be used within Flink Streaming programs > > > > seamlessly. > > > > > > > > > > > > > > Operations executed on this KV store would be fault tolerant as > > it > > > > > > > integrates with the checkpointing mechanism, and if we add > > > timestamps > > > > > to > > > > > > > each put/get/... operation we can use the watermarks to create > > > fully > > > > > > > deterministic results. This functionality is very useful for > many > > > > > > > applications, and is very hard to implement properly with some > > > > > dedicates > > > > > > kv > > > > > > > store. > > > > > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > > > > > Operations: > > > > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > > > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > > > > > For the resulting streams I used a special KV abstraction which > > > let's > > > > > us > > > > > > > return null values. > > > > > > > > > > > > > > The implementation uses a simple streaming operator for > executing > > > > most > > > > > of > > > > > > > the operations (for multi get there is an additional merge > > > operator) > > > > > with > > > > > > > either local or partitioned states for storing the kev-value > > pairs > > > > (my > > > > > > > current prototype uses local states). And it can either execute > > > > > > operations > > > > > > > eagerly (which would not provide deterministic results), or > > buffer > > > up > > > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > > > > > As for use cases you can probably come up with many I will save > > > that > > > > > for > > > > > > > now :D > > > > > > > > > > > > > > I have a prototype implementation here that can execute the > > > > operations > > > > > > > described above (does not handle watermarks and time yet): > > > > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > > > > > What do you think? > > > > > > > If you like it I will work on writing tests and it still needs > a > > > lot > > > > of > > > > > > > tweaking and refactoring. This might be something we want to > > > include > > > > > with > > > > > > > the standard streaming libraries at one point. > > > > > > > > > > > > > > Cheers, > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > |
Hey All,
We decided to make this a standalone library until it is stable enough and then we can decide whether we want to keep it like that or include in the project: https://github.com/gyfora/StreamKV Cheers, Gyula Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: 2015. szept. 9., Sze, 20:25): > Yes, pretty clear. I guess semantically it's still a co-group, but > implemented slightly differently. > > Thanks! > > -- > Gianmarco > > On 9 September 2015 at 15:37, Gyula Fóra <[hidden email]> wrote: > > > Hey Gianmarco, > > > > So the implementation looks something different: > > > > The update stream is received by a stateful KVStoreOperator which stores > > the K-V pairs as their partitioned state. > > > > The query for the 2 cities is assigned an ID yes, and is split to the 2 > > cities, and each of these are sent to the same KVStoreOperator as the > > update stream. The output is the value for each key practically (qid, > > city1, temp1) which is retreived from the operator state , and this > output > > is merged in a next operator to form the KV[] output on which the user > can > > execute the difference if he wants. > > > > So actually no co-group is happening although semantically it might be > > similar. Instead I use stateful operators to be much more efficient. > > > > Does this answer you question? > > > > Gyula > > > > Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: > 2015. > > szept. 9., Sze, 14:29): > > > > > Just a silly question. > > > For the example you described, in a data flow model, you would do > > something > > > like this: > > > > > > Have query ids added to the city pairs (qid, city1, city2), > > > then split the query stream on the two cities and co-group it with the > > > updates stream ((city1, qid) , (city, temp)), same for city2, > > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two > > co-groups, > > > group on the qid, and apply a difference operator to get the final > > answer. > > > > > > Is your idea to implement a way to generalize this logic, or it would > > use > > > remote read/write to a KV-store? > > > > > > -- > > > Gianmarco > > > > > > On 8 September 2015 at 16:27, Aljoscha Krettek <[hidden email]> > > > wrote: > > > > > > > That's a very nice application of the Stream API and partitioned > state. > > > :D > > > > > > > > I think we should run some tests on a cluster based on this to see > > what > > > > kind of throughput the partitioned state system can handle and also > how > > > it > > > > behaves with larger numbers of keys. The KVStore is just an interface > > and > > > > the really heavy lifting is done by the state system so this would > be a > > > > good test for it. > > > > > > > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email]> wrote: > > > > > > > > > @Stephan: > > > > > > > > > > Technically speaking this is really just a partitioned key-value > > state > > > > and > > > > > a fancy operator executing special operations on this state. > > > > > > > > > > From the user's perspective though this is something hard to > > implement. > > > > If > > > > > you want to share state between two stream for instance this way > > > (getting > > > > > updates from one stream and enriching the other one) you would > > probably > > > > use > > > > > a connected datastream and custom implement the Key-value store > > logic. > > > > But > > > > > once you have one(or more) update stream and many get streams this > > > > > implementation will not work. So either the user end up replicating > > the > > > > > whole state in multiple connected operators, or custom implement > some > > > > > inefficient wrapper class to take care of all the put/get > operations. > > > > > > > > > > The Idea behind this is to give a very simple abstraction for this > > type > > > > of > > > > > processing that uses the flink runtime efficiently instead of > relying > > > on > > > > > custom implementations. > > > > > > > > > > Let me give you a stupid example: > > > > > > > > > > You receive Temperature data in the form of (city, temperature), > and > > > you > > > > > are computing a rolling avg for each city. > > > > > Now you have 2 other incoming streams: first is a stream of some > > other > > > > info > > > > > about the city let's say population (city, population) and you want > > to > > > > > combine it with the last known avg temperature to produce (city, > > temp, > > > > pop) > > > > > triplets. The second stream is a pair of cities (city,city) and you > > are > > > > > interested in the difference of the temperature. > > > > > > > > > > For enriching the (city, pop) to (city,temp,pop) you would probably > > > use a > > > > > CoFlatMap and store the last known rolling avg as state. For > > computing > > > > the > > > > > (city,city) temperature difference it is a little more difficult, > as > > > you > > > > > need to get the temperature for both cities then combine in a > second > > > > > operator. If you don't want to replicate your state, you have to > > > combine > > > > > these two problems to a common wrapper type and execute them on a > > same > > > > > operator which will keep the avg state. > > > > > > > > > > With the KVStore abstraction this is very simple: > > > > > you create a KVStore<City, Temp> > > > > > For enriching you use kvStore.getWithKeySelector() which will give > > you > > > > > ((cit,pop), temp) pairs and you are done. For computing the > > difference, > > > > you > > > > > can use kvStore.multiget(...) and get for the 2 cities at the same > > > type. > > > > > The kv store will abstract away the getting of the 2 keys > separately > > > and > > > > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > > > > > > > This might be slightly artificial example but I think it makes the > > > point. > > > > > Implementing these jobs efficiently is not trivial for the users > but > > I > > > > > think it is a very common problem. > > > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. szept. > 8., > > K, > > > > > 14:53): > > > > > > > > > > > @Gyula > > > > > > > > > > > > Can you explain a bit what this KeyValue store would do more then > > the > > > > > > partitioned key/value state? > > > > > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <[hidden email]> > > > wrote: > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > As for use cases, in my old job at Ericsson we were building a > > > > > > > streaming system that was processing data from telephone > > networks, > > > > and > > > > > > > it was using key-value stores a LOT. For example, keeping track > > of > > > > > > > various state info of the users (which cell are they currently > > > > > > > connected to, what bearers do they have, ...); mapping from IDs > > of > > > > > > > users in one subsystem of the telephone network to the IDs of > the > > > > same > > > > > > > users in an other subsystem; mapping from IDs of phone calls to > > > lists > > > > > > > of IDs of participating users; etc. > > > > > > > So I imagine they would like this a lot. (At least, if they > were > > > > > > > considering moving to Flink :)) > > > > > > > > > > > > > > Best, > > > > > > > Gabor > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > > > > > > > Hey All, > > > > > > > > > > > > > > > > The last couple of days I have been playing around with the > > idea > > > of > > > > > > > > building a streaming key-value store abstraction using > stateful > > > > > > streaming > > > > > > > > operators that can be used within Flink Streaming programs > > > > > seamlessly. > > > > > > > > > > > > > > > > Operations executed on this KV store would be fault tolerant > as > > > it > > > > > > > > integrates with the checkpointing mechanism, and if we add > > > > timestamps > > > > > > to > > > > > > > > each put/get/... operation we can use the watermarks to > create > > > > fully > > > > > > > > deterministic results. This functionality is very useful for > > many > > > > > > > > applications, and is very hard to implement properly with > some > > > > > > dedicates > > > > > > > kv > > > > > > > > store. > > > > > > > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > > > > > > > Operations: > > > > > > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > > > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) -> > > > > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > > > > > > > For the resulting streams I used a special KV abstraction > which > > > > let's > > > > > > us > > > > > > > > return null values. > > > > > > > > > > > > > > > > The implementation uses a simple streaming operator for > > executing > > > > > most > > > > > > of > > > > > > > > the operations (for multi get there is an additional merge > > > > operator) > > > > > > with > > > > > > > > either local or partitioned states for storing the kev-value > > > pairs > > > > > (my > > > > > > > > current prototype uses local states). And it can either > execute > > > > > > > operations > > > > > > > > eagerly (which would not provide deterministic results), or > > > buffer > > > > up > > > > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > > > > > > > As for use cases you can probably come up with many I will > save > > > > that > > > > > > for > > > > > > > > now :D > > > > > > > > > > > > > > > > I have a prototype implementation here that can execute the > > > > > operations > > > > > > > > described above (does not handle watermarks and time yet): > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > > > > > > > What do you think? > > > > > > > > If you like it I will work on writing tests and it still > needs > > a > > > > lot > > > > > of > > > > > > > > tweaking and refactoring. This might be something we want to > > > > include > > > > > > with > > > > > > > > the standard streaming libraries at one point. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
I think that is actually a cool way to kick of an addition to the system.
Gives you a lot of flexibility and releasing and testing... It helps, though, to upload maven artifacts for it! On Tue, Sep 15, 2015 at 7:18 PM, Gyula Fóra <[hidden email]> wrote: > Hey All, > > We decided to make this a standalone library until it is stable enough and > then we can decide whether we want to keep it like that or include in the > project: > > https://github.com/gyfora/StreamKV > > Cheers, > Gyula > > Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: 2015. > szept. 9., Sze, 20:25): > > > Yes, pretty clear. I guess semantically it's still a co-group, but > > implemented slightly differently. > > > > Thanks! > > > > -- > > Gianmarco > > > > On 9 September 2015 at 15:37, Gyula Fóra <[hidden email]> wrote: > > > > > Hey Gianmarco, > > > > > > So the implementation looks something different: > > > > > > The update stream is received by a stateful KVStoreOperator which > stores > > > the K-V pairs as their partitioned state. > > > > > > The query for the 2 cities is assigned an ID yes, and is split to the 2 > > > cities, and each of these are sent to the same KVStoreOperator as the > > > update stream. The output is the value for each key practically (qid, > > > city1, temp1) which is retreived from the operator state , and this > > output > > > is merged in a next operator to form the KV[] output on which the user > > can > > > execute the difference if he wants. > > > > > > So actually no co-group is happening although semantically it might be > > > similar. Instead I use stateful operators to be much more efficient. > > > > > > Does this answer you question? > > > > > > Gyula > > > > > > Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: > > 2015. > > > szept. 9., Sze, 14:29): > > > > > > > Just a silly question. > > > > For the example you described, in a data flow model, you would do > > > something > > > > like this: > > > > > > > > Have query ids added to the city pairs (qid, city1, city2), > > > > then split the query stream on the two cities and co-group it with > the > > > > updates stream ((city1, qid) , (city, temp)), same for city2, > > > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two > > > co-groups, > > > > group on the qid, and apply a difference operator to get the final > > > answer. > > > > > > > > Is your idea to implement a way to generalize this logic, or it > would > > > use > > > > remote read/write to a KV-store? > > > > > > > > -- > > > > Gianmarco > > > > > > > > On 8 September 2015 at 16:27, Aljoscha Krettek <[hidden email]> > > > > wrote: > > > > > > > > > That's a very nice application of the Stream API and partitioned > > state. > > > > :D > > > > > > > > > > I think we should run some tests on a cluster based on this to see > > > what > > > > > kind of throughput the partitioned state system can handle and also > > how > > > > it > > > > > behaves with larger numbers of keys. The KVStore is just an > interface > > > and > > > > > the really heavy lifting is done by the state system so this would > > be a > > > > > good test for it. > > > > > > > > > > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email]> > wrote: > > > > > > > > > > > @Stephan: > > > > > > > > > > > > Technically speaking this is really just a partitioned key-value > > > state > > > > > and > > > > > > a fancy operator executing special operations on this state. > > > > > > > > > > > > From the user's perspective though this is something hard to > > > implement. > > > > > If > > > > > > you want to share state between two stream for instance this way > > > > (getting > > > > > > updates from one stream and enriching the other one) you would > > > probably > > > > > use > > > > > > a connected datastream and custom implement the Key-value store > > > logic. > > > > > But > > > > > > once you have one(or more) update stream and many get streams > this > > > > > > implementation will not work. So either the user end up > replicating > > > the > > > > > > whole state in multiple connected operators, or custom implement > > some > > > > > > inefficient wrapper class to take care of all the put/get > > operations. > > > > > > > > > > > > The Idea behind this is to give a very simple abstraction for > this > > > type > > > > > of > > > > > > processing that uses the flink runtime efficiently instead of > > relying > > > > on > > > > > > custom implementations. > > > > > > > > > > > > Let me give you a stupid example: > > > > > > > > > > > > You receive Temperature data in the form of (city, temperature), > > and > > > > you > > > > > > are computing a rolling avg for each city. > > > > > > Now you have 2 other incoming streams: first is a stream of some > > > other > > > > > info > > > > > > about the city let's say population (city, population) and you > want > > > to > > > > > > combine it with the last known avg temperature to produce (city, > > > temp, > > > > > pop) > > > > > > triplets. The second stream is a pair of cities (city,city) and > you > > > are > > > > > > interested in the difference of the temperature. > > > > > > > > > > > > For enriching the (city, pop) to (city,temp,pop) you would > probably > > > > use a > > > > > > CoFlatMap and store the last known rolling avg as state. For > > > computing > > > > > the > > > > > > (city,city) temperature difference it is a little more difficult, > > as > > > > you > > > > > > need to get the temperature for both cities then combine in a > > second > > > > > > operator. If you don't want to replicate your state, you have to > > > > combine > > > > > > these two problems to a common wrapper type and execute them on a > > > same > > > > > > operator which will keep the avg state. > > > > > > > > > > > > With the KVStore abstraction this is very simple: > > > > > > you create a KVStore<City, Temp> > > > > > > For enriching you use kvStore.getWithKeySelector() which will > give > > > you > > > > > > ((cit,pop), temp) pairs and you are done. For computing the > > > difference, > > > > > you > > > > > > can use kvStore.multiget(...) and get for the 2 cities at the > same > > > > type. > > > > > > The kv store will abstract away the getting of the 2 keys > > separately > > > > and > > > > > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > > > > > > > > > This might be slightly artificial example but I think it makes > the > > > > point. > > > > > > Implementing these jobs efficiently is not trivial for the users > > but > > > I > > > > > > think it is a very common problem. > > > > > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. szept. > > 8., > > > K, > > > > > > 14:53): > > > > > > > > > > > > > @Gyula > > > > > > > > > > > > > > Can you explain a bit what this KeyValue store would do more > then > > > the > > > > > > > partitioned key/value state? > > > > > > > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > As for use cases, in my old job at Ericsson we were building > a > > > > > > > > streaming system that was processing data from telephone > > > networks, > > > > > and > > > > > > > > it was using key-value stores a LOT. For example, keeping > track > > > of > > > > > > > > various state info of the users (which cell are they > currently > > > > > > > > connected to, what bearers do they have, ...); mapping from > IDs > > > of > > > > > > > > users in one subsystem of the telephone network to the IDs of > > the > > > > > same > > > > > > > > users in an other subsystem; mapping from IDs of phone calls > to > > > > lists > > > > > > > > of IDs of participating users; etc. > > > > > > > > So I imagine they would like this a lot. (At least, if they > > were > > > > > > > > considering moving to Flink :)) > > > > > > > > > > > > > > > > Best, > > > > > > > > Gabor > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > > > > > > > > Hey All, > > > > > > > > > > > > > > > > > > The last couple of days I have been playing around with the > > > idea > > > > of > > > > > > > > > building a streaming key-value store abstraction using > > stateful > > > > > > > streaming > > > > > > > > > operators that can be used within Flink Streaming programs > > > > > > seamlessly. > > > > > > > > > > > > > > > > > > Operations executed on this KV store would be fault > tolerant > > as > > > > it > > > > > > > > > integrates with the checkpointing mechanism, and if we add > > > > > timestamps > > > > > > > to > > > > > > > > > each put/get/... operation we can use the watermarks to > > create > > > > > fully > > > > > > > > > deterministic results. This functionality is very useful > for > > > many > > > > > > > > > applications, and is very hard to implement properly with > > some > > > > > > > dedicates > > > > > > > > kv > > > > > > > > > store. > > > > > > > > > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > > > > > > > > > Operations: > > > > > > > > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > > > > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) > -> > > > > > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > > > > > > > > > For the resulting streams I used a special KV abstraction > > which > > > > > let's > > > > > > > us > > > > > > > > > return null values. > > > > > > > > > > > > > > > > > > The implementation uses a simple streaming operator for > > > executing > > > > > > most > > > > > > > of > > > > > > > > > the operations (for multi get there is an additional merge > > > > > operator) > > > > > > > with > > > > > > > > > either local or partitioned states for storing the > kev-value > > > > pairs > > > > > > (my > > > > > > > > > current prototype uses local states). And it can either > > execute > > > > > > > > operations > > > > > > > > > eagerly (which would not provide deterministic results), or > > > > buffer > > > > > up > > > > > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > > > > > > > > > As for use cases you can probably come up with many I will > > save > > > > > that > > > > > > > for > > > > > > > > > now :D > > > > > > > > > > > > > > > > > > I have a prototype implementation here that can execute the > > > > > > operations > > > > > > > > > described above (does not handle watermarks and time yet): > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > If you like it I will work on writing tests and it still > > needs > > > a > > > > > lot > > > > > > of > > > > > > > > > tweaking and refactoring. This might be something we want > to > > > > > include > > > > > > > with > > > > > > > > > the standard streaming libraries at one point. > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hi Gyula,
I'm currently looking after ways to enrich streams with external data. Have you got any update on the topic in general or on StreamKV? I've checked out the code but it won't build, mainly because StateCheckpointer has been removed since [FLINK-2808]. Any hint on a quick replacement, before I dive in deeper? Cheers, 2015-09-15 20:29 GMT+02:00 Stephan Ewen <[hidden email]>: > I think that is actually a cool way to kick of an addition to the system. > Gives you a lot of flexibility and releasing and testing... > > It helps, though, to upload maven artifacts for it! > > On Tue, Sep 15, 2015 at 7:18 PM, Gyula Fóra <[hidden email]> wrote: > > > Hey All, > > > > We decided to make this a standalone library until it is stable enough > and > > then we can decide whether we want to keep it like that or include in the > > project: > > > > https://github.com/gyfora/StreamKV > > > > Cheers, > > Gyula > > > > Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: > 2015. > > szept. 9., Sze, 20:25): > > > > > Yes, pretty clear. I guess semantically it's still a co-group, but > > > implemented slightly differently. > > > > > > Thanks! > > > > > > -- > > > Gianmarco > > > > > > On 9 September 2015 at 15:37, Gyula Fóra <[hidden email]> wrote: > > > > > > > Hey Gianmarco, > > > > > > > > So the implementation looks something different: > > > > > > > > The update stream is received by a stateful KVStoreOperator which > > stores > > > > the K-V pairs as their partitioned state. > > > > > > > > The query for the 2 cities is assigned an ID yes, and is split to > the 2 > > > > cities, and each of these are sent to the same KVStoreOperator as > the > > > > update stream. The output is the value for each key practically (qid, > > > > city1, temp1) which is retreived from the operator state , and this > > > output > > > > is merged in a next operator to form the KV[] output on which the > user > > > can > > > > execute the difference if he wants. > > > > > > > > So actually no co-group is happening although semantically it might > be > > > > similar. Instead I use stateful operators to be much more efficient. > > > > > > > > Does this answer you question? > > > > > > > > Gyula > > > > > > > > Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: > > > 2015. > > > > szept. 9., Sze, 14:29): > > > > > > > > > Just a silly question. > > > > > For the example you described, in a data flow model, you would do > > > > something > > > > > like this: > > > > > > > > > > Have query ids added to the city pairs (qid, city1, city2), > > > > > then split the query stream on the two cities and co-group it with > > the > > > > > updates stream ((city1, qid) , (city, temp)), same for city2, > > > > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two > > > > co-groups, > > > > > group on the qid, and apply a difference operator to get the final > > > > answer. > > > > > > > > > > Is your idea to implement a way to generalize this logic, or it > > would > > > > use > > > > > remote read/write to a KV-store? > > > > > > > > > > -- > > > > > Gianmarco > > > > > > > > > > On 8 September 2015 at 16:27, Aljoscha Krettek < > [hidden email]> > > > > > wrote: > > > > > > > > > > > That's a very nice application of the Stream API and partitioned > > > state. > > > > > :D > > > > > > > > > > > > I think we should run some tests on a cluster based on this to > see > > > > what > > > > > > kind of throughput the partitioned state system can handle and > also > > > how > > > > > it > > > > > > behaves with larger numbers of keys. The KVStore is just an > > interface > > > > and > > > > > > the really heavy lifting is done by the state system so this > would > > > be a > > > > > > good test for it. > > > > > > > > > > > > > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email]> > > wrote: > > > > > > > > > > > > > @Stephan: > > > > > > > > > > > > > > Technically speaking this is really just a partitioned > key-value > > > > state > > > > > > and > > > > > > > a fancy operator executing special operations on this state. > > > > > > > > > > > > > > From the user's perspective though this is something hard to > > > > implement. > > > > > > If > > > > > > > you want to share state between two stream for instance this > way > > > > > (getting > > > > > > > updates from one stream and enriching the other one) you would > > > > probably > > > > > > use > > > > > > > a connected datastream and custom implement the Key-value store > > > > logic. > > > > > > But > > > > > > > once you have one(or more) update stream and many get streams > > this > > > > > > > implementation will not work. So either the user end up > > replicating > > > > the > > > > > > > whole state in multiple connected operators, or custom > implement > > > some > > > > > > > inefficient wrapper class to take care of all the put/get > > > operations. > > > > > > > > > > > > > > The Idea behind this is to give a very simple abstraction for > > this > > > > type > > > > > > of > > > > > > > processing that uses the flink runtime efficiently instead of > > > relying > > > > > on > > > > > > > custom implementations. > > > > > > > > > > > > > > Let me give you a stupid example: > > > > > > > > > > > > > > You receive Temperature data in the form of (city, > temperature), > > > and > > > > > you > > > > > > > are computing a rolling avg for each city. > > > > > > > Now you have 2 other incoming streams: first is a stream of > some > > > > other > > > > > > info > > > > > > > about the city let's say population (city, population) and you > > want > > > > to > > > > > > > combine it with the last known avg temperature to produce > (city, > > > > temp, > > > > > > pop) > > > > > > > triplets. The second stream is a pair of cities (city,city) and > > you > > > > are > > > > > > > interested in the difference of the temperature. > > > > > > > > > > > > > > For enriching the (city, pop) to (city,temp,pop) you would > > probably > > > > > use a > > > > > > > CoFlatMap and store the last known rolling avg as state. For > > > > computing > > > > > > the > > > > > > > (city,city) temperature difference it is a little more > difficult, > > > as > > > > > you > > > > > > > need to get the temperature for both cities then combine in a > > > second > > > > > > > operator. If you don't want to replicate your state, you have > to > > > > > combine > > > > > > > these two problems to a common wrapper type and execute them > on a > > > > same > > > > > > > operator which will keep the avg state. > > > > > > > > > > > > > > With the KVStore abstraction this is very simple: > > > > > > > you create a KVStore<City, Temp> > > > > > > > For enriching you use kvStore.getWithKeySelector() which will > > give > > > > you > > > > > > > ((cit,pop), temp) pairs and you are done. For computing the > > > > difference, > > > > > > you > > > > > > > can use kvStore.multiget(...) and get for the 2 cities at the > > same > > > > > type. > > > > > > > The kv store will abstract away the getting of the 2 keys > > > separately > > > > > and > > > > > > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > > > > > > > > > > > This might be slightly artificial example but I think it makes > > the > > > > > point. > > > > > > > Implementing these jobs efficiently is not trivial for the > users > > > but > > > > I > > > > > > > think it is a very common problem. > > > > > > > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. > szept. > > > 8., > > > > K, > > > > > > > 14:53): > > > > > > > > > > > > > > > @Gyula > > > > > > > > > > > > > > > > Can you explain a bit what this KeyValue store would do more > > then > > > > the > > > > > > > > partitioned key/value state? > > > > > > > > > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > > > As for use cases, in my old job at Ericsson we were > building > > a > > > > > > > > > streaming system that was processing data from telephone > > > > networks, > > > > > > and > > > > > > > > > it was using key-value stores a LOT. For example, keeping > > track > > > > of > > > > > > > > > various state info of the users (which cell are they > > currently > > > > > > > > > connected to, what bearers do they have, ...); mapping from > > IDs > > > > of > > > > > > > > > users in one subsystem of the telephone network to the IDs > of > > > the > > > > > > same > > > > > > > > > users in an other subsystem; mapping from IDs of phone > calls > > to > > > > > lists > > > > > > > > > of IDs of participating users; etc. > > > > > > > > > So I imagine they would like this a lot. (At least, if they > > > were > > > > > > > > > considering moving to Flink :)) > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Gabor > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email]>: > > > > > > > > > > Hey All, > > > > > > > > > > > > > > > > > > > > The last couple of days I have been playing around with > the > > > > idea > > > > > of > > > > > > > > > > building a streaming key-value store abstraction using > > > stateful > > > > > > > > streaming > > > > > > > > > > operators that can be used within Flink Streaming > programs > > > > > > > seamlessly. > > > > > > > > > > > > > > > > > > > > Operations executed on this KV store would be fault > > tolerant > > > as > > > > > it > > > > > > > > > > integrates with the checkpointing mechanism, and if we > add > > > > > > timestamps > > > > > > > > to > > > > > > > > > > each put/get/... operation we can use the watermarks to > > > create > > > > > > fully > > > > > > > > > > deterministic results. This functionality is very useful > > for > > > > many > > > > > > > > > > applications, and is very hard to implement properly with > > > some > > > > > > > > dedicates > > > > > > > > > kv > > > > > > > > > > store. > > > > > > > > > > > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > > > > > > > > > > > Operations: > > > > > > > > > > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]> > > > > > > > > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) > > -> > > > > > > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > > > > > > > > > > > For the resulting streams I used a special KV abstraction > > > which > > > > > > let's > > > > > > > > us > > > > > > > > > > return null values. > > > > > > > > > > > > > > > > > > > > The implementation uses a simple streaming operator for > > > > executing > > > > > > > most > > > > > > > > of > > > > > > > > > > the operations (for multi get there is an additional > merge > > > > > > operator) > > > > > > > > with > > > > > > > > > > either local or partitioned states for storing the > > kev-value > > > > > pairs > > > > > > > (my > > > > > > > > > > current prototype uses local states). And it can either > > > execute > > > > > > > > > operations > > > > > > > > > > eagerly (which would not provide deterministic results), > or > > > > > buffer > > > > > > up > > > > > > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > > > > > > > > > > > As for use cases you can probably come up with many I > will > > > save > > > > > > that > > > > > > > > for > > > > > > > > > > now :D > > > > > > > > > > > > > > > > > > > > I have a prototype implementation here that can execute > the > > > > > > > operations > > > > > > > > > > described above (does not handle watermarks and time > yet): > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > If you like it I will work on writing tests and it still > > > needs > > > > a > > > > > > lot > > > > > > > of > > > > > > > > > > tweaking and refactoring. This might be something we want > > to > > > > > > include > > > > > > > > with > > > > > > > > > > the standard streaming libraries at one point. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- *Nam-Luc TRAN* R&D Manager EURA NOVA (M) +32 498 37 36 23 *euranova.eu <http://euranova.eu>* |
Hi!
Sorry for the late answer, I completely missed this email. (Thanks Robert for pointing out). You won't be able to use that project as it was dependent on an earlier snapshot version that still had completely different state semantics. I don't think it is realistic that I will re-implment this any time soon, but I think you can easily do what you want in the following way: Let's say you have 2 streams, the first contains the enrichment data per key let's say enrichments = DataStream<Tuple2<key, state>> . The second stream is the event stream that you want to enrich: events = DataStream<Tuple2<key, event>> To apply the enrichments the easiest is to use a CoFlatMap with a partitioned value state inside: events.connect(enrichments).keyBy(0,0).flatMap(new YourCoFlatMap()) In this case if you declare a value state inside YourCoFlatMap it will be kept per key. For example in the open method: state = getRuntimeContext().getState(new ValueStateDescriptor("stateName", type, defaultValue)). Now that you have everything set up, in flatMap1 (for events) you would query the state : state.value() and enrich your data in flatMap2 you would update the state: state.update(newState) Does this make sense to you? Or is the use case completely different? Cheers, Gyula Nam-Luc Tran <[hidden email]> ezt írta (időpont: 2016. márc. 18., P, 18:25): > Hi Gyula, > > I'm currently looking after ways to enrich streams with external data. Have > you got any update on the topic in general or on StreamKV? > > I've checked out the code but it won't build, mainly because > StateCheckpointer has been removed since [FLINK-2808]. Any hint on a quick > replacement, before I dive in deeper? > > Cheers, > > 2015-09-15 20:29 GMT+02:00 Stephan Ewen <[hidden email]>: > > > I think that is actually a cool way to kick of an addition to the system. > > Gives you a lot of flexibility and releasing and testing... > > > > It helps, though, to upload maven artifacts for it! > > > > On Tue, Sep 15, 2015 at 7:18 PM, Gyula Fóra <[hidden email]> wrote: > > > > > Hey All, > > > > > > We decided to make this a standalone library until it is stable enough > > and > > > then we can decide whether we want to keep it like that or include in > the > > > project: > > > > > > https://github.com/gyfora/StreamKV > > > > > > Cheers, > > > Gyula > > > > > > Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: > > 2015. > > > szept. 9., Sze, 20:25): > > > > > > > Yes, pretty clear. I guess semantically it's still a co-group, but > > > > implemented slightly differently. > > > > > > > > Thanks! > > > > > > > > -- > > > > Gianmarco > > > > > > > > On 9 September 2015 at 15:37, Gyula Fóra <[hidden email]> > wrote: > > > > > > > > > Hey Gianmarco, > > > > > > > > > > So the implementation looks something different: > > > > > > > > > > The update stream is received by a stateful KVStoreOperator which > > > stores > > > > > the K-V pairs as their partitioned state. > > > > > > > > > > The query for the 2 cities is assigned an ID yes, and is split to > > the 2 > > > > > cities, and each of these are sent to the same KVStoreOperator as > > the > > > > > update stream. The output is the value for each key practically > (qid, > > > > > city1, temp1) which is retreived from the operator state , and this > > > > output > > > > > is merged in a next operator to form the KV[] output on which the > > user > > > > can > > > > > execute the difference if he wants. > > > > > > > > > > So actually no co-group is happening although semantically it might > > be > > > > > similar. Instead I use stateful operators to be much more > efficient. > > > > > > > > > > Does this answer you question? > > > > > > > > > > Gyula > > > > > > > > > > Gianmarco De Francisci Morales <[hidden email]> ezt írta > (időpont: > > > > 2015. > > > > > szept. 9., Sze, 14:29): > > > > > > > > > > > Just a silly question. > > > > > > For the example you described, in a data flow model, you would do > > > > > something > > > > > > like this: > > > > > > > > > > > > Have query ids added to the city pairs (qid, city1, city2), > > > > > > then split the query stream on the two cities and co-group it > with > > > the > > > > > > updates stream ((city1, qid) , (city, temp)), same for city2, > > > > > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two > > > > > co-groups, > > > > > > group on the qid, and apply a difference operator to get the > final > > > > > answer. > > > > > > > > > > > > Is your idea to implement a way to generalize this logic, or it > > > would > > > > > use > > > > > > remote read/write to a KV-store? > > > > > > > > > > > > -- > > > > > > Gianmarco > > > > > > > > > > > > On 8 September 2015 at 16:27, Aljoscha Krettek < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > That's a very nice application of the Stream API and > partitioned > > > > state. > > > > > > :D > > > > > > > > > > > > > > I think we should run some tests on a cluster based on this to > > see > > > > > what > > > > > > > kind of throughput the partitioned state system can handle and > > also > > > > how > > > > > > it > > > > > > > behaves with larger numbers of keys. The KVStore is just an > > > interface > > > > > and > > > > > > > the really heavy lifting is done by the state system so this > > would > > > > be a > > > > > > > good test for it. > > > > > > > > > > > > > > > > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email]> > > > wrote: > > > > > > > > > > > > > > > @Stephan: > > > > > > > > > > > > > > > > Technically speaking this is really just a partitioned > > key-value > > > > > state > > > > > > > and > > > > > > > > a fancy operator executing special operations on this state. > > > > > > > > > > > > > > > > From the user's perspective though this is something hard to > > > > > implement. > > > > > > > If > > > > > > > > you want to share state between two stream for instance this > > way > > > > > > (getting > > > > > > > > updates from one stream and enriching the other one) you > would > > > > > probably > > > > > > > use > > > > > > > > a connected datastream and custom implement the Key-value > store > > > > > logic. > > > > > > > But > > > > > > > > once you have one(or more) update stream and many get streams > > > this > > > > > > > > implementation will not work. So either the user end up > > > replicating > > > > > the > > > > > > > > whole state in multiple connected operators, or custom > > implement > > > > some > > > > > > > > inefficient wrapper class to take care of all the put/get > > > > operations. > > > > > > > > > > > > > > > > The Idea behind this is to give a very simple abstraction for > > > this > > > > > type > > > > > > > of > > > > > > > > processing that uses the flink runtime efficiently instead of > > > > relying > > > > > > on > > > > > > > > custom implementations. > > > > > > > > > > > > > > > > Let me give you a stupid example: > > > > > > > > > > > > > > > > You receive Temperature data in the form of (city, > > temperature), > > > > and > > > > > > you > > > > > > > > are computing a rolling avg for each city. > > > > > > > > Now you have 2 other incoming streams: first is a stream of > > some > > > > > other > > > > > > > info > > > > > > > > about the city let's say population (city, population) and > you > > > want > > > > > to > > > > > > > > combine it with the last known avg temperature to produce > > (city, > > > > > temp, > > > > > > > pop) > > > > > > > > triplets. The second stream is a pair of cities (city,city) > and > > > you > > > > > are > > > > > > > > interested in the difference of the temperature. > > > > > > > > > > > > > > > > For enriching the (city, pop) to (city,temp,pop) you would > > > probably > > > > > > use a > > > > > > > > CoFlatMap and store the last known rolling avg as state. For > > > > > computing > > > > > > > the > > > > > > > > (city,city) temperature difference it is a little more > > difficult, > > > > as > > > > > > you > > > > > > > > need to get the temperature for both cities then combine in a > > > > second > > > > > > > > operator. If you don't want to replicate your state, you have > > to > > > > > > combine > > > > > > > > these two problems to a common wrapper type and execute them > > on a > > > > > same > > > > > > > > operator which will keep the avg state. > > > > > > > > > > > > > > > > With the KVStore abstraction this is very simple: > > > > > > > > you create a KVStore<City, Temp> > > > > > > > > For enriching you use kvStore.getWithKeySelector() which will > > > give > > > > > you > > > > > > > > ((cit,pop), temp) pairs and you are done. For computing the > > > > > difference, > > > > > > > you > > > > > > > > can use kvStore.multiget(...) and get for the 2 cities at the > > > same > > > > > > type. > > > > > > > > The kv store will abstract away the getting of the 2 keys > > > > separately > > > > > > and > > > > > > > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > > > > > > > > > > > > > This might be slightly artificial example but I think it > makes > > > the > > > > > > point. > > > > > > > > Implementing these jobs efficiently is not trivial for the > > users > > > > but > > > > > I > > > > > > > > think it is a very common problem. > > > > > > > > > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. > > szept. > > > > 8., > > > > > K, > > > > > > > > 14:53): > > > > > > > > > > > > > > > > > @Gyula > > > > > > > > > > > > > > > > > > Can you explain a bit what this KeyValue store would do > more > > > then > > > > > the > > > > > > > > > partitioned key/value state? > > > > > > > > > > > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > > > > > As for use cases, in my old job at Ericsson we were > > building > > > a > > > > > > > > > > streaming system that was processing data from telephone > > > > > networks, > > > > > > > and > > > > > > > > > > it was using key-value stores a LOT. For example, keeping > > > track > > > > > of > > > > > > > > > > various state info of the users (which cell are they > > > currently > > > > > > > > > > connected to, what bearers do they have, ...); mapping > from > > > IDs > > > > > of > > > > > > > > > > users in one subsystem of the telephone network to the > IDs > > of > > > > the > > > > > > > same > > > > > > > > > > users in an other subsystem; mapping from IDs of phone > > calls > > > to > > > > > > lists > > > > > > > > > > of IDs of participating users; etc. > > > > > > > > > > So I imagine they would like this a lot. (At least, if > they > > > > were > > > > > > > > > > considering moving to Flink :)) > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Gabor > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <[hidden email] > >: > > > > > > > > > > > Hey All, > > > > > > > > > > > > > > > > > > > > > > The last couple of days I have been playing around with > > the > > > > > idea > > > > > > of > > > > > > > > > > > building a streaming key-value store abstraction using > > > > stateful > > > > > > > > > streaming > > > > > > > > > > > operators that can be used within Flink Streaming > > programs > > > > > > > > seamlessly. > > > > > > > > > > > > > > > > > > > > > > Operations executed on this KV store would be fault > > > tolerant > > > > as > > > > > > it > > > > > > > > > > > integrates with the checkpointing mechanism, and if we > > add > > > > > > > timestamps > > > > > > > > > to > > > > > > > > > > > each put/get/... operation we can use the watermarks to > > > > create > > > > > > > fully > > > > > > > > > > > deterministic results. This functionality is very > useful > > > for > > > > > many > > > > > > > > > > > applications, and is very hard to implement properly > with > > > > some > > > > > > > > > dedicates > > > > > > > > > > kv > > > > > > > > > > > store. > > > > > > > > > > > > > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > > > > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > > > > > > > > > > > > > Operations: > > > > > > > > > > > > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > > > store.multiGet(DataStream<K[]>) -> > DataStream<KV<K,V>[]> > > > > > > > > > > > store.getWithKeySelector(DataStream<X>, > KeySelector<X,K>) > > > -> > > > > > > > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > > > > > > > > > > > > > For the resulting streams I used a special KV > abstraction > > > > which > > > > > > > let's > > > > > > > > > us > > > > > > > > > > > return null values. > > > > > > > > > > > > > > > > > > > > > > The implementation uses a simple streaming operator for > > > > > executing > > > > > > > > most > > > > > > > > > of > > > > > > > > > > > the operations (for multi get there is an additional > > merge > > > > > > > operator) > > > > > > > > > with > > > > > > > > > > > either local or partitioned states for storing the > > > kev-value > > > > > > pairs > > > > > > > > (my > > > > > > > > > > > current prototype uses local states). And it can either > > > > execute > > > > > > > > > > operations > > > > > > > > > > > eagerly (which would not provide deterministic > results), > > or > > > > > > buffer > > > > > > > up > > > > > > > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > > > > > > > > > > > > > As for use cases you can probably come up with many I > > will > > > > save > > > > > > > that > > > > > > > > > for > > > > > > > > > > > now :D > > > > > > > > > > > > > > > > > > > > > > I have a prototype implementation here that can execute > > the > > > > > > > > operations > > > > > > > > > > > described above (does not handle watermarks and time > > yet): > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > > > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > If you like it I will work on writing tests and it > still > > > > needs > > > > > a > > > > > > > lot > > > > > > > > of > > > > > > > > > > > tweaking and refactoring. This might be something we > want > > > to > > > > > > > include > > > > > > > > > with > > > > > > > > > > > the standard streaming libraries at one point. > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > *Nam-Luc TRAN* > > R&D Manager > > EURA NOVA > > (M) +32 498 37 36 23 > > *euranova.eu <http://euranova.eu>* > |
>Sorry for the late answer, I completely missed this email. (Thanks Robert
for pointing out). No problem ;) >Now that you have everything set up, in flatMap1 (for events) you would query the state : state.value() and enrich your data >in flatMap2 you would update the state: state.update(newState) In this example, how are the states in the enrichments stream (enrichments = DataStream<Tuple2<key, state>>) and the value state declared inside YourCoFlatMap linked? >in flatMap2 you would update the state: state.update(newState) Wouldn't that only update the state declared in YourCoFlatMap, and not the state in the enrichments stream? Cheers, 2016-03-23 15:38 GMT+01:00 Gyula Fóra <[hidden email]>: > Hi! > > Sorry for the late answer, I completely missed this email. (Thanks Robert > for pointing out). > > You won't be able to use that project as it was dependent on an earlier > snapshot version that still had completely different state semantics. > I don't think it is realistic that I will re-implment this any time soon, > but I think you can easily do what you want in the following way: > > Let's say you have 2 streams, the first contains the enrichment data per > key let's say enrichments = DataStream<Tuple2<key, state>> . > The second stream is the event stream that you want to enrich: events = > DataStream<Tuple2<key, event>> > > To apply the enrichments the easiest is to use a CoFlatMap with a > partitioned value state inside: > > events.connect(enrichments).keyBy(0,0).flatMap(new YourCoFlatMap()) > > In this case if you declare a value state inside YourCoFlatMap it will be > kept per key. For example in the open method: > state = getRuntimeContext().getState(new ValueStateDescriptor("stateName", > type, defaultValue)). > > Now that you have everything set up, in flatMap1 (for events) you would > query the state : state.value() and enrich your data > in flatMap2 you would update the state: state.update(newState) > > Does this make sense to you? Or is the use case completely different? > > Cheers, > Gyula > > Nam-Luc Tran <[hidden email]> ezt írta (időpont: 2016. márc. 18., > P, 18:25): > > > Hi Gyula, > > > > I'm currently looking after ways to enrich streams with external data. > Have > > you got any update on the topic in general or on StreamKV? > > > > I've checked out the code but it won't build, mainly because > > StateCheckpointer has been removed since [FLINK-2808]. Any hint on a > quick > > replacement, before I dive in deeper? > > > > Cheers, > > > > 2015-09-15 20:29 GMT+02:00 Stephan Ewen <[hidden email]>: > > > > > I think that is actually a cool way to kick of an addition to the > system. > > > Gives you a lot of flexibility and releasing and testing... > > > > > > It helps, though, to upload maven artifacts for it! > > > > > > On Tue, Sep 15, 2015 at 7:18 PM, Gyula Fóra <[hidden email]> wrote: > > > > > > > Hey All, > > > > > > > > We decided to make this a standalone library until it is stable > enough > > > and > > > > then we can decide whether we want to keep it like that or include in > > the > > > > project: > > > > > > > > https://github.com/gyfora/StreamKV > > > > > > > > Cheers, > > > > Gyula > > > > > > > > Gianmarco De Francisci Morales <[hidden email]> ezt írta (időpont: > > > 2015. > > > > szept. 9., Sze, 20:25): > > > > > > > > > Yes, pretty clear. I guess semantically it's still a co-group, but > > > > > implemented slightly differently. > > > > > > > > > > Thanks! > > > > > > > > > > -- > > > > > Gianmarco > > > > > > > > > > On 9 September 2015 at 15:37, Gyula Fóra <[hidden email]> > > wrote: > > > > > > > > > > > Hey Gianmarco, > > > > > > > > > > > > So the implementation looks something different: > > > > > > > > > > > > The update stream is received by a stateful KVStoreOperator which > > > > stores > > > > > > the K-V pairs as their partitioned state. > > > > > > > > > > > > The query for the 2 cities is assigned an ID yes, and is split to > > > the 2 > > > > > > cities, and each of these are sent to the same KVStoreOperator > as > > > the > > > > > > update stream. The output is the value for each key practically > > (qid, > > > > > > city1, temp1) which is retreived from the operator state , and > this > > > > > output > > > > > > is merged in a next operator to form the KV[] output on which the > > > user > > > > > can > > > > > > execute the difference if he wants. > > > > > > > > > > > > So actually no co-group is happening although semantically it > might > > > be > > > > > > similar. Instead I use stateful operators to be much more > > efficient. > > > > > > > > > > > > Does this answer you question? > > > > > > > > > > > > Gyula > > > > > > > > > > > > Gianmarco De Francisci Morales <[hidden email]> ezt írta > > (időpont: > > > > > 2015. > > > > > > szept. 9., Sze, 14:29): > > > > > > > > > > > > > Just a silly question. > > > > > > > For the example you described, in a data flow model, you would > do > > > > > > something > > > > > > > like this: > > > > > > > > > > > > > > Have query ids added to the city pairs (qid, city1, city2), > > > > > > > then split the query stream on the two cities and co-group it > > with > > > > the > > > > > > > updates stream ((city1, qid) , (city, temp)), same for city2, > > > > > > > then emit (qid, city1, temp1), (qid, city2, temp2) from the two > > > > > > co-groups, > > > > > > > group on the qid, and apply a difference operator to get the > > final > > > > > > answer. > > > > > > > > > > > > > > Is your idea to implement a way to generalize this logic, or > it > > > > would > > > > > > use > > > > > > > remote read/write to a KV-store? > > > > > > > > > > > > > > -- > > > > > > > Gianmarco > > > > > > > > > > > > > > On 8 September 2015 at 16:27, Aljoscha Krettek < > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > That's a very nice application of the Stream API and > > partitioned > > > > > state. > > > > > > > :D > > > > > > > > > > > > > > > > I think we should run some tests on a cluster based on this > to > > > see > > > > > > what > > > > > > > > kind of throughput the partitioned state system can handle > and > > > also > > > > > how > > > > > > > it > > > > > > > > behaves with larger numbers of keys. The KVStore is just an > > > > interface > > > > > > and > > > > > > > > the really heavy lifting is done by the state system so this > > > would > > > > > be a > > > > > > > > good test for it. > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <[hidden email] > > > > > > wrote: > > > > > > > > > > > > > > > > > @Stephan: > > > > > > > > > > > > > > > > > > Technically speaking this is really just a partitioned > > > key-value > > > > > > state > > > > > > > > and > > > > > > > > > a fancy operator executing special operations on this > state. > > > > > > > > > > > > > > > > > > From the user's perspective though this is something hard > to > > > > > > implement. > > > > > > > > If > > > > > > > > > you want to share state between two stream for instance > this > > > way > > > > > > > (getting > > > > > > > > > updates from one stream and enriching the other one) you > > would > > > > > > probably > > > > > > > > use > > > > > > > > > a connected datastream and custom implement the Key-value > > store > > > > > > logic. > > > > > > > > But > > > > > > > > > once you have one(or more) update stream and many get > streams > > > > this > > > > > > > > > implementation will not work. So either the user end up > > > > replicating > > > > > > the > > > > > > > > > whole state in multiple connected operators, or custom > > > implement > > > > > some > > > > > > > > > inefficient wrapper class to take care of all the put/get > > > > > operations. > > > > > > > > > > > > > > > > > > The Idea behind this is to give a very simple abstraction > for > > > > this > > > > > > type > > > > > > > > of > > > > > > > > > processing that uses the flink runtime efficiently instead > of > > > > > relying > > > > > > > on > > > > > > > > > custom implementations. > > > > > > > > > > > > > > > > > > Let me give you a stupid example: > > > > > > > > > > > > > > > > > > You receive Temperature data in the form of (city, > > > temperature), > > > > > and > > > > > > > you > > > > > > > > > are computing a rolling avg for each city. > > > > > > > > > Now you have 2 other incoming streams: first is a stream of > > > some > > > > > > other > > > > > > > > info > > > > > > > > > about the city let's say population (city, population) and > > you > > > > want > > > > > > to > > > > > > > > > combine it with the last known avg temperature to produce > > > (city, > > > > > > temp, > > > > > > > > pop) > > > > > > > > > triplets. The second stream is a pair of cities (city,city) > > and > > > > you > > > > > > are > > > > > > > > > interested in the difference of the temperature. > > > > > > > > > > > > > > > > > > For enriching the (city, pop) to (city,temp,pop) you would > > > > probably > > > > > > > use a > > > > > > > > > CoFlatMap and store the last known rolling avg as state. > For > > > > > > computing > > > > > > > > the > > > > > > > > > (city,city) temperature difference it is a little more > > > difficult, > > > > > as > > > > > > > you > > > > > > > > > need to get the temperature for both cities then combine > in a > > > > > second > > > > > > > > > operator. If you don't want to replicate your state, you > have > > > to > > > > > > > combine > > > > > > > > > these two problems to a common wrapper type and execute > them > > > on a > > > > > > same > > > > > > > > > operator which will keep the avg state. > > > > > > > > > > > > > > > > > > With the KVStore abstraction this is very simple: > > > > > > > > > you create a KVStore<City, Temp> > > > > > > > > > For enriching you use kvStore.getWithKeySelector() which > will > > > > give > > > > > > you > > > > > > > > > ((cit,pop), temp) pairs and you are done. For computing the > > > > > > difference, > > > > > > > > you > > > > > > > > > can use kvStore.multiget(...) and get for the 2 cities at > the > > > > same > > > > > > > type. > > > > > > > > > The kv store will abstract away the getting of the 2 keys > > > > > separately > > > > > > > and > > > > > > > > > merging them so it will return [(city1, t1), (city2,t2)]. > > > > > > > > > > > > > > > > > > This might be slightly artificial example but I think it > > makes > > > > the > > > > > > > point. > > > > > > > > > Implementing these jobs efficiently is not trivial for the > > > users > > > > > but > > > > > > I > > > > > > > > > think it is a very common problem. > > > > > > > > > > > > > > > > > > Stephan Ewen <[hidden email]> ezt írta (időpont: 2015. > > > szept. > > > > > 8., > > > > > > K, > > > > > > > > > 14:53): > > > > > > > > > > > > > > > > > > > @Gyula > > > > > > > > > > > > > > > > > > > > Can you explain a bit what this KeyValue store would do > > more > > > > then > > > > > > the > > > > > > > > > > partitioned key/value state? > > > > > > > > > > > > > > > > > > > > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay < > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > > > > > > > > > As for use cases, in my old job at Ericsson we were > > > building > > > > a > > > > > > > > > > > streaming system that was processing data from > telephone > > > > > > networks, > > > > > > > > and > > > > > > > > > > > it was using key-value stores a LOT. For example, > keeping > > > > track > > > > > > of > > > > > > > > > > > various state info of the users (which cell are they > > > > currently > > > > > > > > > > > connected to, what bearers do they have, ...); mapping > > from > > > > IDs > > > > > > of > > > > > > > > > > > users in one subsystem of the telephone network to the > > IDs > > > of > > > > > the > > > > > > > > same > > > > > > > > > > > users in an other subsystem; mapping from IDs of phone > > > calls > > > > to > > > > > > > lists > > > > > > > > > > > of IDs of participating users; etc. > > > > > > > > > > > So I imagine they would like this a lot. (At least, if > > they > > > > > were > > > > > > > > > > > considering moving to Flink :)) > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Gabor > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra < > [hidden email] > > >: > > > > > > > > > > > > Hey All, > > > > > > > > > > > > > > > > > > > > > > > > The last couple of days I have been playing around > with > > > the > > > > > > idea > > > > > > > of > > > > > > > > > > > > building a streaming key-value store abstraction > using > > > > > stateful > > > > > > > > > > streaming > > > > > > > > > > > > operators that can be used within Flink Streaming > > > programs > > > > > > > > > seamlessly. > > > > > > > > > > > > > > > > > > > > > > > > Operations executed on this KV store would be fault > > > > tolerant > > > > > as > > > > > > > it > > > > > > > > > > > > integrates with the checkpointing mechanism, and if > we > > > add > > > > > > > > timestamps > > > > > > > > > > to > > > > > > > > > > > > each put/get/... operation we can use the watermarks > to > > > > > create > > > > > > > > fully > > > > > > > > > > > > deterministic results. This functionality is very > > useful > > > > for > > > > > > many > > > > > > > > > > > > applications, and is very hard to implement properly > > with > > > > > some > > > > > > > > > > dedicates > > > > > > > > > > > kv > > > > > > > > > > > > store. > > > > > > > > > > > > > > > > > > > > > > > > The KVStore abstraction could look as follows: > > > > > > > > > > > > > > > > > > > > > > > > KVStore<K,V> store = new KVStore<>; > > > > > > > > > > > > > > > > > > > > > > > > Operations: > > > > > > > > > > > > > > > > > > > > > > > > store.put(DataStream<Tuple2<K,V>>) > > > > > > > > > > > > store.get(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>> > > > > > > > > > > > > store.multiGet(DataStream<K[]>) -> > > DataStream<KV<K,V>[]> > > > > > > > > > > > > store.getWithKeySelector(DataStream<X>, > > KeySelector<X,K>) > > > > -> > > > > > > > > > > > > DataStream<KV<X,V>[]> > > > > > > > > > > > > > > > > > > > > > > > > For the resulting streams I used a special KV > > abstraction > > > > > which > > > > > > > > let's > > > > > > > > > > us > > > > > > > > > > > > return null values. > > > > > > > > > > > > > > > > > > > > > > > > The implementation uses a simple streaming operator > for > > > > > > executing > > > > > > > > > most > > > > > > > > > > of > > > > > > > > > > > > the operations (for multi get there is an additional > > > merge > > > > > > > > operator) > > > > > > > > > > with > > > > > > > > > > > > either local or partitioned states for storing the > > > > kev-value > > > > > > > pairs > > > > > > > > > (my > > > > > > > > > > > > current prototype uses local states). And it can > either > > > > > execute > > > > > > > > > > > operations > > > > > > > > > > > > eagerly (which would not provide deterministic > > results), > > > or > > > > > > > buffer > > > > > > > > up > > > > > > > > > > > > operations and execute them in order upon watermarks. > > > > > > > > > > > > > > > > > > > > > > > > As for use cases you can probably come up with many I > > > will > > > > > save > > > > > > > > that > > > > > > > > > > for > > > > > > > > > > > > now :D > > > > > > > > > > > > > > > > > > > > > > > > I have a prototype implementation here that can > execute > > > the > > > > > > > > > operations > > > > > > > > > > > > described above (does not handle watermarks and time > > > yet): > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/tree/KVStore > > > > > > > > > > > > And also an example job: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > If you like it I will work on writing tests and it > > still > > > > > needs > > > > > > a > > > > > > > > lot > > > > > > > > > of > > > > > > > > > > > > tweaking and refactoring. This might be something we > > want > > > > to > > > > > > > > include > > > > > > > > > > with > > > > > > > > > > > > the standard streaming libraries at one point. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > *Nam-Luc TRAN* > > > > R&D Manager > > > > EURA NOVA > > > > (M) +32 498 37 36 23 > > > > *euranova.eu <http://euranova.eu>* > > > -- *Nam-Luc TRAN* R&D Manager EURA NOVA (M) +32 498 37 36 23 *euranova.eu <http://euranova.eu>* |
Free forum by Nabble | Edit this page |