Hi Flink Devs,
Use cases that I see quite frequently in the real world would benefit from a different watermarking / event time model than the one currently implemented in Flink. I would call Flink's current approach partition-based watermarking or maybe subtask-based watermarking. In this model the current "event time" is a property local to each subtask instance in a dataflow graph. The event time at any subtask is the minimum of the watermarks it has received on each of it's input streams. There are a couple of issues with this model that are not optimal for some (maybe many) use cases. 1) A single slow subtask (or say source partition) anywhere in the dataflow can mean no progress can be made on the computation at all. 2) In many real world scenarios the time skew across keys can be *many* times greater than the time skew within the data with the same key. In this discussion I'll use "time skew" to refer to the out-of-orderness with respect to timestamp of the data. Out-of-orderness is a mouthful ;) Anyway, let me provide an example or two. In IoT applications the source of events is a particular device out in the world, let's say a device in a connected car application. The data for some particular device may be very bursty and we will certainly get events from these devices in Flink out-of-order just because of things like partitions in Kafka, shuffles in Flink, etc. However, the time skew in the data for a single device should likely be very small (milliseconds or maybe seconds).. However, in the same application the time skew across different devices can be huge (hours or even days). An obvious example of this, again using connected cars as a representative example is the following: Car A is recording data locally at 12:00 pm on Saturday but doesn't currently have a network connection. Car B is doing the same thing but does have a network connection. Car A will transmit it's data when the network comes back on line. Let's say this is at 4pm. Car B was transmitting it's data immediately. This creates a huge time skew (4 hours) in the observed datastream when looked at as a whole. However, the time skew in that data for Car A or Car B alone could be tiny. It will be out of order of course but maybe by only milliseconds or seconds. What the above means in the end for Flink is that the watermarks must be delayed by up to 4 hours or more because we're looking at the data stream as a whole -- otherwise the data for Car A will be considered late. The time skew in the data stream when looked at as a whole is large even though the time skew for any key may be tiny. This is the problem I would like to see a solution for. The basic idea of keeping track of watermarks and event time "per-key" rather than per partition or subtask would solve I think both of these problems stated above and both of these are real issues for production applications. The obvious downside of trying to do this per-key is that the amount of state you have to track is much larger and potentially unbounded. However, I could see this approach working if the keyspace isn't growing rapidly but is stable or grows slowly. The saving grace here is that this may actually be true of the types of applications where this would be especially useful. Think IoT use cases. Another approach to keeping state size in check would be a configurable TTL for a key. Anyway, I'm throwing this out here on the mailing list in case anyone is interested in this discussion, has thought about the problem deeply already, has use cases of their own they've run into or has ideas for a solution to this problem. Thanks for reading.. -Jamie -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> [hidden email] |
Hey Jamie!
Key-based progress tracking sounds like local-only progress tracking to me, there is no need to use a low watermarking mechanism at all since all streams of a key are handled by a single partition at a time (per operator). Thus, this could be much easier to implement and support (i.e., no need to broadcast the progress state of each partition all the time). State-wise it should be fine too if it is backed by rocksdb, especially if we have MapState in the future. Just my quick thoughts on this, to get the discussion going :) cheers Paris > On 23 Feb 2017, at 01:01, Jamie Grier <[hidden email]> wrote: > > Hi Flink Devs, > > Use cases that I see quite frequently in the real world would benefit from > a different watermarking / event time model than the one currently > implemented in Flink. > > I would call Flink's current approach partition-based watermarking or maybe > subtask-based watermarking. In this model the current "event time" is a > property local to each subtask instance in a dataflow graph. The event > time at any subtask is the minimum of the watermarks it has received on > each of it's input streams. > > There are a couple of issues with this model that are not optimal for some > (maybe many) use cases. > > 1) A single slow subtask (or say source partition) anywhere in the dataflow > can mean no progress can be made on the computation at all. > > 2) In many real world scenarios the time skew across keys can be *many* > times greater than the time skew within the data with the same key. > > In this discussion I'll use "time skew" to refer to the out-of-orderness > with respect to timestamp of the data. Out-of-orderness is a mouthful ;) > > Anyway, let me provide an example or two. > > In IoT applications the source of events is a particular device out in the > world, let's say a device in a connected car application. The data for > some particular device may be very bursty and we will certainly get events > from these devices in Flink out-of-order just because of things like > partitions in Kafka, shuffles in Flink, etc. However, the time skew in the > data for a single device should likely be very small (milliseconds or maybe > seconds).. > > However, in the same application the time skew across different devices can > be huge (hours or even days). An obvious example of this, again using > connected cars as a representative example is the following: Car A is > recording data locally at 12:00 pm on Saturday but doesn't currently have a > network connection. Car B is doing the same thing but does have a network > connection. Car A will transmit it's data when the network comes back on > line. Let's say this is at 4pm. Car B was transmitting it's data > immediately. This creates a huge time skew (4 hours) in the observed > datastream when looked at as a whole. However, the time skew in that data > for Car A or Car B alone could be tiny. It will be out of order of course > but maybe by only milliseconds or seconds. > > What the above means in the end for Flink is that the watermarks must be > delayed by up to 4 hours or more because we're looking at the data stream > as a whole -- otherwise the data for Car A will be considered late. The > time skew in the data stream when looked at as a whole is large even though > the time skew for any key may be tiny. > > This is the problem I would like to see a solution for. The basic idea of > keeping track of watermarks and event time "per-key" rather than per > partition or subtask would solve I think both of these problems stated > above and both of these are real issues for production applications. > > The obvious downside of trying to do this per-key is that the amount of > state you have to track is much larger and potentially unbounded. However, > I could see this approach working if the keyspace isn't growing rapidly but > is stable or grows slowly. The saving grace here is that this may actually > be true of the types of applications where this would be especially > useful. Think IoT use cases. Another approach to keeping state size in > check would be a configurable TTL for a key. > > Anyway, I'm throwing this out here on the mailing list in case anyone is > interested in this discussion, has thought about the problem deeply > already, has use cases of their own they've run into or has ideas for a > solution to this problem. > > Thanks for reading.. > > -Jamie > > > -- > > Jamie Grier > data Artisans, Director of Applications Engineering > @jamiegrier <https://twitter.com/jamiegrier> > [hidden email] |
Hey all,
Let me share some ideas about this. @Paris: The local-only progress tracking indeed seems easier, we do not need to broadcast anything. Implementation-wise it is easier, but performance-wise probably not. If one key can come from multiple sources, there could be a lot more network overhead with per-key tracking then broadcasting, somewhat paradoxically. Say source instance S1 sends messages and watermarks to operator instances O1, O2. In the broadcasting case, S1 would send one message to O1 and one to O2 per watermark (of course it depends on how fast the watermarks arrive), total of 2. Although, if we keep track of per-key watermarks, S1 would need to send watermarks for every key directed to O1, also for O2. So if 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks arrive at the same rate per-key as per-source in the previous case) we S1 would send a total of 20 watermarks. Another question is whether how large the state-per-key is? If it's really small (an integer maybe, or state of a small state machine), then the overhead of keeping track of a (Long) watermark is large memory-wise. E.g. Int state vs. Long watermark results in 3x as large state. Also, the checkpointing would be ~3x as slow. Of course, for large states a Long watermark would not mean much overhead. We could resolve the memory issue by using some kind of sketch data structure. Right now the granularity of watermark handling is per-operator-instance. On the other hand, per-key granularity might be costly. What if we increased the granularity of watermarks inside an operator by keeping more than one watermark tracker in one operator? This could be quite simply done with a hash table. With a hash table of size 1, we would yield the current semantics (per-operator-instance granularity). With a hash table large enough to have at most one key per bucket, we would yield per-key watermark tracking. In between lies the trade-off between handling time-skew and a lot of memory overhead. This does not seem hard to implement. Of course, at some point we would still need to take care of watermarks per-key. Imagine that keys A and B would go to the same bucket of the hash table, and watermarks are coming in like this: (B,20), (A,10), (A,15), (A,40). Then the watermark of the bucket should be the minimum as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of the watermarks of A and B separately. But after we have a correct watermark for the bucket, all we need to care about is the bucket watermarks. So somewhere (most probably at the source) we would have to pay memory overhead of tracking every key, but nowhere else in the topology. Regarding the potentially large network overhead, the same compression could be useful. I.e. we would not send watermarks from one operator per-key, but rather per-hash. Again, the trade-off between time skew and memory consumption is configurable by the size of the hash table used. Cheers, Gabor On 2017-02-23 08:57, Paris Carbone wrote: > Hey Jamie! > > Key-based progress tracking sounds like local-only progress tracking to me, there is no need to use a low watermarking mechanism at all since all streams of a key are handled by a single partition at a time (per operator). > Thus, this could be much easier to implement and support (i.e., no need to broadcast the progress state of each partition all the time). > State-wise it should be fine too if it is backed by rocksdb, especially if we have MapState in the future. > > Just my quick thoughts on this, to get the discussion going :) > > cheers > Paris > >> On 23 Feb 2017, at 01:01, Jamie Grier <[hidden email]> wrote: >> >> Hi Flink Devs, >> >> Use cases that I see quite frequently in the real world would benefit from >> a different watermarking / event time model than the one currently >> implemented in Flink. >> >> I would call Flink's current approach partition-based watermarking or maybe >> subtask-based watermarking. In this model the current "event time" is a >> property local to each subtask instance in a dataflow graph. The event >> time at any subtask is the minimum of the watermarks it has received on >> each of it's input streams. >> >> There are a couple of issues with this model that are not optimal for some >> (maybe many) use cases. >> >> 1) A single slow subtask (or say source partition) anywhere in the dataflow >> can mean no progress can be made on the computation at all. >> >> 2) In many real world scenarios the time skew across keys can be *many* >> times greater than the time skew within the data with the same key. >> >> In this discussion I'll use "time skew" to refer to the out-of-orderness >> with respect to timestamp of the data. Out-of-orderness is a mouthful ;) >> >> Anyway, let me provide an example or two. >> >> In IoT applications the source of events is a particular device out in the >> world, let's say a device in a connected car application. The data for >> some particular device may be very bursty and we will certainly get events >> from these devices in Flink out-of-order just because of things like >> partitions in Kafka, shuffles in Flink, etc. However, the time skew in the >> data for a single device should likely be very small (milliseconds or maybe >> seconds).. >> >> However, in the same application the time skew across different devices can >> be huge (hours or even days). An obvious example of this, again using >> connected cars as a representative example is the following: Car A is >> recording data locally at 12:00 pm on Saturday but doesn't currently have a >> network connection. Car B is doing the same thing but does have a network >> connection. Car A will transmit it's data when the network comes back on >> line. Let's say this is at 4pm. Car B was transmitting it's data >> immediately. This creates a huge time skew (4 hours) in the observed >> datastream when looked at as a whole. However, the time skew in that data >> for Car A or Car B alone could be tiny. It will be out of order of course >> but maybe by only milliseconds or seconds. >> >> What the above means in the end for Flink is that the watermarks must be >> delayed by up to 4 hours or more because we're looking at the data stream >> as a whole -- otherwise the data for Car A will be considered late. The >> time skew in the data stream when looked at as a whole is large even though >> the time skew for any key may be tiny. >> >> This is the problem I would like to see a solution for. The basic idea of >> keeping track of watermarks and event time "per-key" rather than per >> partition or subtask would solve I think both of these problems stated >> above and both of these are real issues for production applications. >> >> The obvious downside of trying to do this per-key is that the amount of >> state you have to track is much larger and potentially unbounded. However, >> I could see this approach working if the keyspace isn't growing rapidly but >> is stable or grows slowly. The saving grace here is that this may actually >> be true of the types of applications where this would be especially >> useful. Think IoT use cases. Another approach to keeping state size in >> check would be a configurable TTL for a key. >> >> Anyway, I'm throwing this out here on the mailing list in case anyone is >> interested in this discussion, has thought about the problem deeply >> already, has use cases of their own they've run into or has ideas for a >> solution to this problem. >> >> Thanks for reading.. >> >> -Jamie >> >> >> -- >> >> Jamie Grier >> data Artisans, Director of Applications Engineering >> @jamiegrier <https://twitter.com/jamiegrier> >> [hidden email] > |
This is indeed an interesting topic, thanks for starting the discussion,
Jamie! I now thought about this for a while, since more and more people seem to be asking about it lately. First, I thought that per-key watermark handling would not be necessary because it can be done locally (as Paris suggested), then I realised that that's not actually the case and thought that this wouldn't be possible. In the end, I came to realise that it is indeed possible (with some caveats), although with a huge overhead in the amount of state that we have to keep and with changes to our API. I'll try and walk you through my thought process. Let's first look at local watermark tracking, that is, tracking the watermark locally at the operator that needs it, for example a WindowOperator. I initially thought that this would be sufficient. Assume we have a pipeline like this: Source -> KeyBy -> WindowOperator -> ... If we have parallelism=1, then all elements for a given key k will be read by the same source operator instance and they will arrive (in-order) at the WindowOperator. It doesn't matter whether we track the per-key watermarks at the Source or at the WindowOperator because we see the same elements in the same order at each operator, per key. Now, think about this pipeline: Source1 --+ |-> Union -> KeyBy -> WindowOperator -> ... Source2 --+ (you can either think about two sources or once source that has several parallel instances, i.e. parallelism > 1) Here, both Source1 and Source2 can emit elements with our key k. If Source1 is faster than Source2 and the watermarking logic at the WindowOperator determines the watermark based on the incoming element timestamps (for example, using the BoundedLatenessTimestampExtractor) then the elements coming from Source2 will be considered late at the WindowOperator. From this we know that our WindowOperator needs to calculate the watermark similarly to how watermark calculation currently happens in Flink: the watermark is the minimum of the watermark of all upstream operations. In this case it would be: the minimum upstream watermarks of operations that emit elements with key k. For per-partition watermarks this works because the number of upstream operations is know and we simply keep an array that has the current upstream watermark for each input operation. For per-key watermarks this would mean that we have to keep k*u upstream watermarks where u is the number of upstream operations. This can be quite large. Another problem is that the observed keys change, i.e. the key space is evolving and we need to retire keys from our calculations lest we run out of space. We could find a solution based on a feature we recently introduced in Flink: https://github.com/apache/flink/pull/2801. The sources keep track of whether they have input and signal to downstream operations whether they should be included in the watermark calculation logic. A similar thing could be done per-key, where each source signals to downstream operations that there is a new key and that we should start calculating watermarks for this. When a source determines that no more data will come for a key (which in itself is a bit of a tricky problem) then it should signal to downstream operations to take the key out of watermark calculations, that is that we can release some space. The above is analysing, on a purely technical level, the feasibility of such a feature. I think it is feasible but can be very expensive in terms of state size requirements. Gabor also pointed this out above and gave a few suggestions on reducing that size. We would also need to change our API to allow tracking the lineage of keys or to enforce that a key stays the same throughout a pipeline. Consider this pipeline: Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator where KeyBy1 and KeyBy2 extract a different key, respectively. How would watermarks be tracked across this change of keys? Would we know which of the prior keys and up being keys according to KeyBy2, i.e. do we have some kind of key lineage information? One approach for solving this would be to introduce a new API that allows extracting a key at the source and will keep this key on the elements until the sink. For example: DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); input .map() .window(...) // notice that we don't need keyBy because it is implicit .reduce(...) .map(...) .window(...) ... The DeluxeKeyedStream (name preliminary ;-) would allow the operations that we today have on KeyedStream and on DataStream and it would always maintain the key that was assigned at the sources. The result of each operation would again be a DeluxeKeyedStream. This way, we could track watermarks per key. I know it's a bit of a (very) lengthy mail, but what do you think? On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <[hidden email]> wrote: > Hey all, > > Let me share some ideas about this. > > @Paris: The local-only progress tracking indeed seems easier, we do not > need to broadcast anything. Implementation-wise it is easier, but > performance-wise probably not. If one key can come from multiple > sources, there could be a lot more network overhead with per-key > tracking then broadcasting, somewhat paradoxically. Say source instance > S1 sends messages and watermarks to operator instances O1, O2. In the > broadcasting case, S1 would send one message to O1 and one to O2 per > watermark (of course it depends on how fast the watermarks arrive), > total of 2. Although, if we keep track of per-key watermarks, S1 would > need to send watermarks for every key directed to O1, also for O2. So if > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks > arrive at the same rate per-key as per-source in the previous case) we > S1 would send a total of 20 watermarks. > > Another question is whether how large the state-per-key is? If it's > really small (an integer maybe, or state of a small state machine), then > the overhead of keeping track of a (Long) watermark is large > memory-wise. E.g. Int state vs. Long watermark results in 3x as large > state. Also, the checkpointing would be ~3x as slow. Of course, for > large states a Long watermark would not mean much overhead. > > We could resolve the memory issue by using some kind of sketch data > structure. Right now the granularity of watermark handling is > per-operator-instance. On the other hand, per-key granularity might be > costly. What if we increased the granularity of watermarks inside an > operator by keeping more than one watermark tracker in one operator? > This could be quite simply done with a hash table. With a hash table of > size 1, we would yield the current semantics (per-operator-instance > granularity). With a hash table large enough to have at most one key per > bucket, we would yield per-key watermark tracking. In between lies the > trade-off between handling time-skew and a lot of memory overhead. This > does not seem hard to implement. > > Of course, at some point we would still need to take care of watermarks > per-key. Imagine that keys A and B would go to the same bucket of the > hash table, and watermarks are coming in like this: (B,20), (A,10), > (A,15), (A,40). Then the watermark of the bucket should be the minimum > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of > the watermarks of A and B separately. But after we have a correct > watermark for the bucket, all we need to care about is the bucket > watermarks. So somewhere (most probably at the source) we would have to > pay memory overhead of tracking every key, but nowhere else in the > topology. > > Regarding the potentially large network overhead, the same compression > could be useful. I.e. we would not send watermarks from one operator > per-key, but rather per-hash. Again, the trade-off between time skew and > memory consumption is configurable by the size of the hash table used. > > Cheers, > Gabor > > On 2017-02-23 08:57, Paris Carbone wrote: > > > Hey Jamie! > > > > Key-based progress tracking sounds like local-only progress tracking to > me, there is no need to use a low watermarking mechanism at all since all > streams of a key are handled by a single partition at a time (per operator). > > Thus, this could be much easier to implement and support (i.e., no need > to broadcast the progress state of each partition all the time). > > State-wise it should be fine too if it is backed by rocksdb, especially > if we have MapState in the future. > > > > Just my quick thoughts on this, to get the discussion going :) > > > > cheers > > Paris > > > >> On 23 Feb 2017, at 01:01, Jamie Grier <[hidden email]> wrote: > >> > >> Hi Flink Devs, > >> > >> Use cases that I see quite frequently in the real world would benefit > from > >> a different watermarking / event time model than the one currently > >> implemented in Flink. > >> > >> I would call Flink's current approach partition-based watermarking or > maybe > >> subtask-based watermarking. In this model the current "event time" is a > >> property local to each subtask instance in a dataflow graph. The event > >> time at any subtask is the minimum of the watermarks it has received on > >> each of it's input streams. > >> > >> There are a couple of issues with this model that are not optimal for > some > >> (maybe many) use cases. > >> > >> 1) A single slow subtask (or say source partition) anywhere in the > dataflow > >> can mean no progress can be made on the computation at all. > >> > >> 2) In many real world scenarios the time skew across keys can be *many* > >> times greater than the time skew within the data with the same key. > >> > >> In this discussion I'll use "time skew" to refer to the out-of-orderness > >> with respect to timestamp of the data. Out-of-orderness is a mouthful > ;) > >> > >> Anyway, let me provide an example or two. > >> > >> In IoT applications the source of events is a particular device out in > the > >> world, let's say a device in a connected car application. The data for > >> some particular device may be very bursty and we will certainly get > events > >> from these devices in Flink out-of-order just because of things like > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew in > the > >> data for a single device should likely be very small (milliseconds or > maybe > >> seconds).. > >> > >> However, in the same application the time skew across different devices > can > >> be huge (hours or even days). An obvious example of this, again using > >> connected cars as a representative example is the following: Car A is > >> recording data locally at 12:00 pm on Saturday but doesn't currently > have a > >> network connection. Car B is doing the same thing but does have a > network > >> connection. Car A will transmit it's data when the network comes back > on > >> line. Let's say this is at 4pm. Car B was transmitting it's data > >> immediately. This creates a huge time skew (4 hours) in the observed > >> datastream when looked at as a whole. However, the time skew in that > data > >> for Car A or Car B alone could be tiny. It will be out of order of > course > >> but maybe by only milliseconds or seconds. > >> > >> What the above means in the end for Flink is that the watermarks must be > >> delayed by up to 4 hours or more because we're looking at the data > stream > >> as a whole -- otherwise the data for Car A will be considered late. The > >> time skew in the data stream when looked at as a whole is large even > though > >> the time skew for any key may be tiny. > >> > >> This is the problem I would like to see a solution for. The basic idea > of > >> keeping track of watermarks and event time "per-key" rather than per > >> partition or subtask would solve I think both of these problems stated > >> above and both of these are real issues for production applications. > >> > >> The obvious downside of trying to do this per-key is that the amount of > >> state you have to track is much larger and potentially unbounded. > However, > >> I could see this approach working if the keyspace isn't growing rapidly > but > >> is stable or grows slowly. The saving grace here is that this may > actually > >> be true of the types of applications where this would be especially > >> useful. Think IoT use cases. Another approach to keeping state size in > >> check would be a configurable TTL for a key. > >> > >> Anyway, I'm throwing this out here on the mailing list in case anyone is > >> interested in this discussion, has thought about the problem deeply > >> already, has use cases of their own they've run into or has ideas for a > >> solution to this problem. > >> > >> Thanks for reading.. > >> > >> -Jamie > >> > >> > >> -- > >> > >> Jamie Grier > >> data Artisans, Director of Applications Engineering > >> @jamiegrier <https://twitter.com/jamiegrier> > >> [hidden email] > > > > |
Throwing in some thoughts:
When a source determines that no more data will come for a key (which in itself is a bit of a tricky problem) then it should signal to downstream operations to take the key out of watermark calculations, that is that we can release some space. I don’t think this is possible without exposing API for the UDF to signal there will be no more data for a specific key. We could detect idleness of a key at the source operator, but without any help from user logic, essentially it can only be seen as "temporarily idle", which is not helpful in reducing the state as the watermark state for that key still needs to be kept downstream. So to achieve this, I think the only option would be to expose new APIs here too. It’s like how we recently exposed a new `markAsTemporarilyIdle` method in the SourceFunction.SourceContext interface, but instead a `markKeyTerminated` that must be called by the source UDF to be able to save state space and have no feasible fallback detection strategy. DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); input .map() .window(...) // notice that we don't need keyBy because it is implicit .reduce(...) .map(...) .window(...) ... Would this mean that another `keyBy` isn’t allowed downstream? Or still allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta key” to track key lineage? On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek ([hidden email]) wrote: This is indeed an interesting topic, thanks for starting the discussion, Jamie! I now thought about this for a while, since more and more people seem to be asking about it lately. First, I thought that per-key watermark handling would not be necessary because it can be done locally (as Paris suggested), then I realised that that's not actually the case and thought that this wouldn't be possible. In the end, I came to realise that it is indeed possible (with some caveats), although with a huge overhead in the amount of state that we have to keep and with changes to our API. I'll try and walk you through my thought process. Let's first look at local watermark tracking, that is, tracking the watermark locally at the operator that needs it, for example a WindowOperator. I initially thought that this would be sufficient. Assume we have a pipeline like this: Source -> KeyBy -> WindowOperator -> ... If we have parallelism=1, then all elements for a given key k will be read by the same source operator instance and they will arrive (in-order) at the WindowOperator. It doesn't matter whether we track the per-key watermarks at the Source or at the WindowOperator because we see the same elements in the same order at each operator, per key. Now, think about this pipeline: Source1 --+ |-> Union -> KeyBy -> WindowOperator -> ... Source2 --+ (you can either think about two sources or once source that has several parallel instances, i.e. parallelism > 1) Here, both Source1 and Source2 can emit elements with our key k. If Source1 is faster than Source2 and the watermarking logic at the WindowOperator determines the watermark based on the incoming element timestamps (for example, using the BoundedLatenessTimestampExtractor) then the elements coming from Source2 will be considered late at the WindowOperator. From this we know that our WindowOperator needs to calculate the watermark similarly to how watermark calculation currently happens in Flink: the watermark is the minimum of the watermark of all upstream operations. In this case it would be: the minimum upstream watermarks of operations that emit elements with key k. For per-partition watermarks this works because the number of upstream operations is know and we simply keep an array that has the current upstream watermark for each input operation. For per-key watermarks this would mean that we have to keep k*u upstream watermarks where u is the number of upstream operations. This can be quite large. Another problem is that the observed keys change, i.e. the key space is evolving and we need to retire keys from our calculations lest we run out of space. We could find a solution based on a feature we recently introduced in Flink: https://github.com/apache/flink/pull/2801. The sources keep track of whether they have input and signal to downstream operations whether they should be included in the watermark calculation logic. A similar thing could be done per-key, where each source signals to downstream operations that there is a new key and that we should start calculating watermarks for this. When a source determines that no more data will come for a key (which in itself is a bit of a tricky problem) then it should signal to downstream operations to take the key out of watermark calculations, that is that we can release some space. The above is analysing, on a purely technical level, the feasibility of such a feature. I think it is feasible but can be very expensive in terms of state size requirements. Gabor also pointed this out above and gave a few suggestions on reducing that size. We would also need to change our API to allow tracking the lineage of keys or to enforce that a key stays the same throughout a pipeline. Consider this pipeline: Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator where KeyBy1 and KeyBy2 extract a different key, respectively. How would watermarks be tracked across this change of keys? Would we know which of the prior keys and up being keys according to KeyBy2, i.e. do we have some kind of key lineage information? One approach for solving this would be to introduce a new API that allows extracting a key at the source and will keep this key on the elements until the sink. For example: DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); input .map() .window(...) // notice that we don't need keyBy because it is implicit .reduce(...) .map(...) .window(...) ... The DeluxeKeyedStream (name preliminary ;-) would allow the operations that we today have on KeyedStream and on DataStream and it would always maintain the key that was assigned at the sources. The result of each operation would again be a DeluxeKeyedStream. This way, we could track watermarks per key. I know it's a bit of a (very) lengthy mail, but what do you think? On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <[hidden email]> wrote: > Hey all, > > Let me share some ideas about this. > > @Paris: The local-only progress tracking indeed seems easier, we do not > need to broadcast anything. Implementation-wise it is easier, but > performance-wise probably not. If one key can come from multiple > sources, there could be a lot more network overhead with per-key > tracking then broadcasting, somewhat paradoxically. Say source instance > S1 sends messages and watermarks to operator instances O1, O2. In the > broadcasting case, S1 would send one message to O1 and one to O2 per > watermark (of course it depends on how fast the watermarks arrive), > total of 2. Although, if we keep track of per-key watermarks, S1 would > need to send watermarks for every key directed to O1, also for O2. So if > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks > arrive at the same rate per-key as per-source in the previous case) we > S1 would send a total of 20 watermarks. > > Another question is whether how large the state-per-key is? If it's > really small (an integer maybe, or state of a small state machine), then > the overhead of keeping track of a (Long) watermark is large > memory-wise. E.g. Int state vs. Long watermark results in 3x as large > state. Also, the checkpointing would be ~3x as slow. Of course, for > large states a Long watermark would not mean much overhead. > > We could resolve the memory issue by using some kind of sketch data > structure. Right now the granularity of watermark handling is > per-operator-instance. On the other hand, per-key granularity might be > costly. What if we increased the granularity of watermarks inside an > operator by keeping more than one watermark tracker in one operator? > This could be quite simply done with a hash table. With a hash table of > size 1, we would yield the current semantics (per-operator-instance > granularity). With a hash table large enough to have at most one key per > bucket, we would yield per-key watermark tracking. In between lies the > trade-off between handling time-skew and a lot of memory overhead. This > does not seem hard to implement. > > Of course, at some point we would still need to take care of watermarks > per-key. Imagine that keys A and B would go to the same bucket of the > hash table, and watermarks are coming in like this: (B,20), (A,10), > (A,15), (A,40). Then the watermark of the bucket should be the minimum > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of > the watermarks of A and B separately. But after we have a correct > watermark for the bucket, all we need to care about is the bucket > watermarks. So somewhere (most probably at the source) we would have to > pay memory overhead of tracking every key, but nowhere else in the > topology. > > Regarding the potentially large network overhead, the same compression > could be useful. I.e. we would not send watermarks from one operator > per-key, but rather per-hash. Again, the trade-off between time skew and > memory consumption is configurable by the size of the hash table used. > > Cheers, > Gabor > > On 2017-02-23 08:57, Paris Carbone wrote: > > > Hey Jamie! > > > > Key-based progress tracking sounds like local-only progress tracking to > me, there is no need to use a low watermarking mechanism at all since all > streams of a key are handled by a single partition at a time (per operator). > > Thus, this could be much easier to implement and support (i.e., no need > to broadcast the progress state of each partition all the time). > > State-wise it should be fine too if it is backed by rocksdb, especially > if we have MapState in the future. > > > > Just my quick thoughts on this, to get the discussion going :) > > > > cheers > > Paris > > > >> On 23 Feb 2017, at 01:01, Jamie Grier <[hidden email]> wrote: > >> > >> Hi Flink Devs, > >> > >> Use cases that I see quite frequently in the real world would benefit > from > >> a different watermarking / event time model than the one currently > >> implemented in Flink. > >> > >> I would call Flink's current approach partition-based watermarking or > maybe > >> subtask-based watermarking. In this model the current "event time" is a > >> property local to each subtask instance in a dataflow graph. The event > >> time at any subtask is the minimum of the watermarks it has received on > >> each of it's input streams. > >> > >> There are a couple of issues with this model that are not optimal for > some > >> (maybe many) use cases. > >> > >> 1) A single slow subtask (or say source partition) anywhere in the > dataflow > >> can mean no progress can be made on the computation at all. > >> > >> 2) In many real world scenarios the time skew across keys can be *many* > >> times greater than the time skew within the data with the same key. > >> > >> In this discussion I'll use "time skew" to refer to the out-of-orderness > >> with respect to timestamp of the data. Out-of-orderness is a mouthful > ;) > >> > >> Anyway, let me provide an example or two. > >> > >> In IoT applications the source of events is a particular device out in > the > >> world, let's say a device in a connected car application. The data for > >> some particular device may be very bursty and we will certainly get > events > >> from these devices in Flink out-of-order just because of things like > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew in > the > >> data for a single device should likely be very small (milliseconds or > maybe > >> seconds).. > >> > >> However, in the same application the time skew across different devices > can > >> be huge (hours or even days). An obvious example of this, again using > >> connected cars as a representative example is the following: Car A is > >> recording data locally at 12:00 pm on Saturday but doesn't currently > have a > >> network connection. Car B is doing the same thing but does have a > network > >> connection. Car A will transmit it's data when the network comes back > on > >> line. Let's say this is at 4pm. Car B was transmitting it's data > >> immediately. This creates a huge time skew (4 hours) in the observed > >> datastream when looked at as a whole. However, the time skew in that > data > >> for Car A or Car B alone could be tiny. It will be out of order of > course > >> but maybe by only milliseconds or seconds. > >> > >> What the above means in the end for Flink is that the watermarks must be > >> delayed by up to 4 hours or more because we're looking at the data > stream > >> as a whole -- otherwise the data for Car A will be considered late. The > >> time skew in the data stream when looked at as a whole is large even > though > >> the time skew for any key may be tiny. > >> > >> This is the problem I would like to see a solution for. The basic idea > of > >> keeping track of watermarks and event time "per-key" rather than per > >> partition or subtask would solve I think both of these problems stated > >> above and both of these are real issues for production applications. > >> > >> The obvious downside of trying to do this per-key is that the amount of > >> state you have to track is much larger and potentially unbounded. > However, > >> I could see this approach working if the keyspace isn't growing rapidly > but > >> is stable or grows slowly. The saving grace here is that this may > actually > >> be true of the types of applications where this would be especially > >> useful. Think IoT use cases. Another approach to keeping state size in > >> check would be a configurable TTL for a key. > >> > >> Anyway, I'm throwing this out here on the mailing list in case anyone is > >> interested in this discussion, has thought about the problem deeply > >> already, has use cases of their own they've run into or has ideas for a > >> solution to this problem. > >> > >> Thanks for reading.. > >> > >> -Jamie > >> > >> > >> -- > >> > >> Jamie Grier > >> data Artisans, Director of Applications Engineering > >> @jamiegrier <https://twitter.com/jamiegrier> > >> [hidden email] > > > > |
@Tzu-Li Yes, the deluxe stream would not allow another keyBy(). Or we could
allow it but then we would exit the world of the deluxe stream and per-key watermarks and go back to the realm of normal streams and keyed streams. On Tue, 28 Feb 2017 at 10:08 Tzu-Li (Gordon) Tai <[hidden email]> wrote: > Throwing in some thoughts: > > When a source determines that no more data will come for a key (which > in itself is a bit of a tricky problem) then it should signal to > downstream > operations to take the key out of watermark calculations, that is that we > can release some space. > I don’t think this is possible without exposing API for the UDF to signal > there will be no more data for a specific key. We could detect idleness of > a key at the source operator, but without any help from user logic, > essentially it can only be seen as "temporarily idle", which is not helpful > in reducing the state as the watermark state for that key still needs to be > kept downstream. > > So to achieve this, I think the only option would be to expose new APIs > here too. > > It’s like how we recently exposed a new `markAsTemporarilyIdle` method in > the SourceFunction.SourceContext interface, but instead a > `markKeyTerminated` that must be called by the source UDF to be able to > save state space and have no feasible fallback detection strategy. > > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); > input > .map() > .window(...) // notice that we don't need keyBy because it is implicit > .reduce(...) > .map(...) > .window(...) > ... > > Would this mean that another `keyBy` isn’t allowed downstream? Or still > allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta key” > to track key lineage? > > On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek ([hidden email]) > wrote: > > This is indeed an interesting topic, thanks for starting the discussion, > Jamie! > > I now thought about this for a while, since more and more people seem to be > asking about it lately. First, I thought that per-key watermark handling > would not be necessary because it can be done locally (as Paris suggested), > then I realised that that's not actually the case and thought that this > wouldn't be possible. In the end, I came to realise that it is indeed > possible (with some caveats), although with a huge overhead in the amount > of state that we have to keep and with changes to our API. I'll try and > walk you through my thought process. > > Let's first look at local watermark tracking, that is, tracking the > watermark locally at the operator that needs it, for example a > WindowOperator. I initially thought that this would be sufficient. Assume > we have a pipeline like this: > > Source -> KeyBy -> WindowOperator -> ... > > If we have parallelism=1, then all elements for a given key k will be read > by the same source operator instance and they will arrive (in-order) at the > WindowOperator. It doesn't matter whether we track the per-key watermarks > at the Source or at the WindowOperator because we see the same elements in > the same order at each operator, per key. > > Now, think about this pipeline: > > Source1 --+ > |-> Union -> KeyBy -> WindowOperator -> ... > Source2 --+ > > (you can either think about two sources or once source that has several > parallel instances, i.e. parallelism > 1) > > Here, both Source1 and Source2 can emit elements with our key k. If Source1 > is faster than Source2 and the watermarking logic at the WindowOperator > determines the watermark based on the incoming element timestamps (for > example, using the BoundedLatenessTimestampExtractor) then the elements > coming from Source2 will be considered late at the WindowOperator. > > From this we know that our WindowOperator needs to calculate the watermark > similarly to how watermark calculation currently happens in Flink: the > watermark is the minimum of the watermark of all upstream operations. In > this case it would be: the minimum upstream watermarks of operations that > emit elements with key k. For per-partition watermarks this works because > the number of upstream operations is know and we simply keep an array that > has the current upstream watermark for each input operation. For per-key > watermarks this would mean that we have to keep k*u upstream watermarks > where u is the number of upstream operations. This can be quite large. > Another problem is that the observed keys change, i.e. the key space is > evolving and we need to retire keys from our calculations lest we run out > of space. > > We could find a solution based on a feature we recently introduced in > Flink: https://github.com/apache/flink/pull/2801. The sources keep track > of > whether they have input and signal to downstream operations whether they > should be included in the watermark calculation logic. A similar thing > could be done per-key, where each source signals to downstream operations > that there is a new key and that we should start calculating watermarks for > this. When a source determines that no more data will come for a key (which > in itself is a bit of a tricky problem) then it should signal to downstream > operations to take the key out of watermark calculations, that is that we > can release some space. > > The above is analysing, on a purely technical level, the feasibility of > such a feature. I think it is feasible but can be very expensive in terms > of state size requirements. Gabor also pointed this out above and gave a > few suggestions on reducing that size. > > We would also need to change our API to allow tracking the lineage of keys > or to enforce that a key stays the same throughout a pipeline. Consider > this pipeline: > > Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator > > where KeyBy1 and KeyBy2 extract a different key, respectively. How would > watermarks be tracked across this change of keys? Would we know which of > the prior keys and up being keys according to KeyBy2, i.e. do we have some > kind of key lineage information? > > One approach for solving this would be to introduce a new API that allows > extracting a key at the source and will keep this key on the elements until > the sink. For example: > > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); > input > .map() > .window(...) // notice that we don't need keyBy because it is implicit > .reduce(...) > .map(...) > .window(...) > ... > > The DeluxeKeyedStream (name preliminary ;-) would allow the operations that > we today have on KeyedStream and on DataStream and it would always maintain > the key that was assigned at the sources. The result of each operation > would again be a DeluxeKeyedStream. This way, we could track watermarks per > key. > > I know it's a bit of a (very) lengthy mail, but what do you think? > > > On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <[hidden email]> wrote: > > > Hey all, > > > > Let me share some ideas about this. > > > > @Paris: The local-only progress tracking indeed seems easier, we do not > > need to broadcast anything. Implementation-wise it is easier, but > > performance-wise probably not. If one key can come from multiple > > sources, there could be a lot more network overhead with per-key > > tracking then broadcasting, somewhat paradoxically. Say source instance > > S1 sends messages and watermarks to operator instances O1, O2. In the > > broadcasting case, S1 would send one message to O1 and one to O2 per > > watermark (of course it depends on how fast the watermarks arrive), > > total of 2. Although, if we keep track of per-key watermarks, S1 would > > need to send watermarks for every key directed to O1, also for O2. So if > > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks > > arrive at the same rate per-key as per-source in the previous case) we > > S1 would send a total of 20 watermarks. > > > > Another question is whether how large the state-per-key is? If it's > > really small (an integer maybe, or state of a small state machine), then > > the overhead of keeping track of a (Long) watermark is large > > memory-wise. E.g. Int state vs. Long watermark results in 3x as large > > state. Also, the checkpointing would be ~3x as slow. Of course, for > > large states a Long watermark would not mean much overhead. > > > > We could resolve the memory issue by using some kind of sketch data > > structure. Right now the granularity of watermark handling is > > per-operator-instance. On the other hand, per-key granularity might be > > costly. What if we increased the granularity of watermarks inside an > > operator by keeping more than one watermark tracker in one operator? > > This could be quite simply done with a hash table. With a hash table of > > size 1, we would yield the current semantics (per-operator-instance > > granularity). With a hash table large enough to have at most one key per > > bucket, we would yield per-key watermark tracking. In between lies the > > trade-off between handling time-skew and a lot of memory overhead. This > > does not seem hard to implement. > > > > Of course, at some point we would still need to take care of watermarks > > per-key. Imagine that keys A and B would go to the same bucket of the > > hash table, and watermarks are coming in like this: (B,20), (A,10), > > (A,15), (A,40). Then the watermark of the bucket should be the minimum > > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of > > the watermarks of A and B separately. But after we have a correct > > watermark for the bucket, all we need to care about is the bucket > > watermarks. So somewhere (most probably at the source) we would have to > > pay memory overhead of tracking every key, but nowhere else in the > > topology. > > > > Regarding the potentially large network overhead, the same compression > > could be useful. I.e. we would not send watermarks from one operator > > per-key, but rather per-hash. Again, the trade-off between time skew and > > memory consumption is configurable by the size of the hash table used. > > > > Cheers, > > Gabor > > > > On 2017-02-23 08:57, Paris Carbone wrote: > > > > > Hey Jamie! > > > > > > Key-based progress tracking sounds like local-only progress tracking to > > me, there is no need to use a low watermarking mechanism at all since all > > streams of a key are handled by a single partition at a time (per > operator). > > > Thus, this could be much easier to implement and support (i.e., no need > > to broadcast the progress state of each partition all the time). > > > State-wise it should be fine too if it is backed by rocksdb, especially > > if we have MapState in the future. > > > > > > Just my quick thoughts on this, to get the discussion going :) > > > > > > cheers > > > Paris > > > > > >> On 23 Feb 2017, at 01:01, Jamie Grier <[hidden email]> > wrote: > > >> > > >> Hi Flink Devs, > > >> > > >> Use cases that I see quite frequently in the real world would benefit > > from > > >> a different watermarking / event time model than the one currently > > >> implemented in Flink. > > >> > > >> I would call Flink's current approach partition-based watermarking or > > maybe > > >> subtask-based watermarking. In this model the current "event time" is > a > > >> property local to each subtask instance in a dataflow graph. The event > > >> time at any subtask is the minimum of the watermarks it has received > on > > >> each of it's input streams. > > >> > > >> There are a couple of issues with this model that are not optimal for > > some > > >> (maybe many) use cases. > > >> > > >> 1) A single slow subtask (or say source partition) anywhere in the > > dataflow > > >> can mean no progress can be made on the computation at all. > > >> > > >> 2) In many real world scenarios the time skew across keys can be > *many* > > >> times greater than the time skew within the data with the same key. > > >> > > >> In this discussion I'll use "time skew" to refer to the > out-of-orderness > > >> with respect to timestamp of the data. Out-of-orderness is a mouthful > > ;) > > >> > > >> Anyway, let me provide an example or two. > > >> > > >> In IoT applications the source of events is a particular device out in > > the > > >> world, let's say a device in a connected car application. The data for > > >> some particular device may be very bursty and we will certainly get > > events > > >> from these devices in Flink out-of-order just because of things like > > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew in > > the > > >> data for a single device should likely be very small (milliseconds or > > maybe > > >> seconds).. > > >> > > >> However, in the same application the time skew across different > devices > > can > > >> be huge (hours or even days). An obvious example of this, again using > > >> connected cars as a representative example is the following: Car A is > > >> recording data locally at 12:00 pm on Saturday but doesn't currently > > have a > > >> network connection. Car B is doing the same thing but does have a > > network > > >> connection. Car A will transmit it's data when the network comes back > > on > > >> line. Let's say this is at 4pm. Car B was transmitting it's data > > >> immediately. This creates a huge time skew (4 hours) in the observed > > >> datastream when looked at as a whole. However, the time skew in that > > data > > >> for Car A or Car B alone could be tiny. It will be out of order of > > course > > >> but maybe by only milliseconds or seconds. > > >> > > >> What the above means in the end for Flink is that the watermarks must > be > > >> delayed by up to 4 hours or more because we're looking at the data > > stream > > >> as a whole -- otherwise the data for Car A will be considered late. > The > > >> time skew in the data stream when looked at as a whole is large even > > though > > >> the time skew for any key may be tiny. > > >> > > >> This is the problem I would like to see a solution for. The basic idea > > of > > >> keeping track of watermarks and event time "per-key" rather than per > > >> partition or subtask would solve I think both of these problems stated > > >> above and both of these are real issues for production applications. > > >> > > >> The obvious downside of trying to do this per-key is that the amount > of > > >> state you have to track is much larger and potentially unbounded. > > However, > > >> I could see this approach working if the keyspace isn't growing > rapidly > > but > > >> is stable or grows slowly. The saving grace here is that this may > > actually > > >> be true of the types of applications where this would be especially > > >> useful. Think IoT use cases. Another approach to keeping state size in > > >> check would be a configurable TTL for a key. > > >> > > >> Anyway, I'm throwing this out here on the mailing list in case anyone > is > > >> interested in this discussion, has thought about the problem deeply > > >> already, has use cases of their own they've run into or has ideas for > a > > >> solution to this problem. > > >> > > >> Thanks for reading.. > > >> > > >> -Jamie > > >> > > >> > > >> -- > > >> > > >> Jamie Grier > > >> data Artisans, Director of Applications Engineering > > >> @jamiegrier <https://twitter.com/jamiegrier> > > >> [hidden email] > > > > > > > > |
Thinking about this a bit more...
I think it may be interesting to enable two modes for event-time advancement in Flink 1) The current mode which I'll call partition-based, pessimistic, event-time advancement 2) Key-based, eager, event-time advancement In this key-based eager mode it's actually quite simple and it basically becomes a completely local thing as Paris stated. In this mode you would advance event time, per-key, along with the maximum (adjusted) timestamp you've seen rather than the minimum. So the current event time at any node for some key is simply the maximum timestamp you've seen - adjusted (like now) with the logic from a timestamp extractor -- for example the BoundedOutOfOrderness extractor. This is very simple and could possibly work well as long as the delay used in the event-time calculation is enough to adjust for the real time skew you're likely to observe for any key. I wonder how well this might work in practice? On Tue, Feb 28, 2017 at 6:22 AM, Aljoscha Krettek <[hidden email]> wrote: > @Tzu-Li Yes, the deluxe stream would not allow another keyBy(). Or we could > allow it but then we would exit the world of the deluxe stream and per-key > watermarks and go back to the realm of normal streams and keyed streams. > > On Tue, 28 Feb 2017 at 10:08 Tzu-Li (Gordon) Tai <[hidden email]> > wrote: > > > Throwing in some thoughts: > > > > When a source determines that no more data will come for a key (which > > in itself is a bit of a tricky problem) then it should signal to > > downstream > > operations to take the key out of watermark calculations, that is that we > > can release some space. > > I don’t think this is possible without exposing API for the UDF to signal > > there will be no more data for a specific key. We could detect idleness > of > > a key at the source operator, but without any help from user logic, > > essentially it can only be seen as "temporarily idle", which is not > helpful > > in reducing the state as the watermark state for that key still needs to > be > > kept downstream. > > > > So to achieve this, I think the only option would be to expose new APIs > > here too. > > > > It’s like how we recently exposed a new `markAsTemporarilyIdle` method in > > the SourceFunction.SourceContext interface, but instead a > > `markKeyTerminated` that must be called by the source UDF to be able to > > save state space and have no feasible fallback detection strategy. > > > > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); > > input > > .map() > > .window(...) // notice that we don't need keyBy because it is implicit > > .reduce(...) > > .map(...) > > .window(...) > > ... > > > > Would this mean that another `keyBy` isn’t allowed downstream? Or still > > allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta > key” > > to track key lineage? > > > > On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek ( > [hidden email]) > > wrote: > > > > This is indeed an interesting topic, thanks for starting the discussion, > > Jamie! > > > > I now thought about this for a while, since more and more people seem to > be > > asking about it lately. First, I thought that per-key watermark handling > > would not be necessary because it can be done locally (as Paris > suggested), > > then I realised that that's not actually the case and thought that this > > wouldn't be possible. In the end, I came to realise that it is indeed > > possible (with some caveats), although with a huge overhead in the amount > > of state that we have to keep and with changes to our API. I'll try and > > walk you through my thought process. > > > > Let's first look at local watermark tracking, that is, tracking the > > watermark locally at the operator that needs it, for example a > > WindowOperator. I initially thought that this would be sufficient. Assume > > we have a pipeline like this: > > > > Source -> KeyBy -> WindowOperator -> ... > > > > If we have parallelism=1, then all elements for a given key k will be > read > > by the same source operator instance and they will arrive (in-order) at > the > > WindowOperator. It doesn't matter whether we track the per-key watermarks > > at the Source or at the WindowOperator because we see the same elements > in > > the same order at each operator, per key. > > > > Now, think about this pipeline: > > > > Source1 --+ > > |-> Union -> KeyBy -> WindowOperator -> ... > > Source2 --+ > > > > (you can either think about two sources or once source that has several > > parallel instances, i.e. parallelism > 1) > > > > Here, both Source1 and Source2 can emit elements with our key k. If > Source1 > > is faster than Source2 and the watermarking logic at the WindowOperator > > determines the watermark based on the incoming element timestamps (for > > example, using the BoundedLatenessTimestampExtractor) then the elements > > coming from Source2 will be considered late at the WindowOperator. > > > > From this we know that our WindowOperator needs to calculate the > watermark > > similarly to how watermark calculation currently happens in Flink: the > > watermark is the minimum of the watermark of all upstream operations. In > > this case it would be: the minimum upstream watermarks of operations that > > emit elements with key k. For per-partition watermarks this works because > > the number of upstream operations is know and we simply keep an array > that > > has the current upstream watermark for each input operation. For per-key > > watermarks this would mean that we have to keep k*u upstream watermarks > > where u is the number of upstream operations. This can be quite large. > > Another problem is that the observed keys change, i.e. the key space is > > evolving and we need to retire keys from our calculations lest we run out > > of space. > > > > We could find a solution based on a feature we recently introduced in > > Flink: https://github.com/apache/flink/pull/2801. The sources keep track > > of > > whether they have input and signal to downstream operations whether they > > should be included in the watermark calculation logic. A similar thing > > could be done per-key, where each source signals to downstream operations > > that there is a new key and that we should start calculating watermarks > for > > this. When a source determines that no more data will come for a key > (which > > in itself is a bit of a tricky problem) then it should signal to > downstream > > operations to take the key out of watermark calculations, that is that we > > can release some space. > > > > The above is analysing, on a purely technical level, the feasibility of > > such a feature. I think it is feasible but can be very expensive in terms > > of state size requirements. Gabor also pointed this out above and gave a > > few suggestions on reducing that size. > > > > We would also need to change our API to allow tracking the lineage of > keys > > or to enforce that a key stays the same throughout a pipeline. Consider > > this pipeline: > > > > Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator > > > > where KeyBy1 and KeyBy2 extract a different key, respectively. How would > > watermarks be tracked across this change of keys? Would we know which of > > the prior keys and up being keys according to KeyBy2, i.e. do we have > some > > kind of key lineage information? > > > > One approach for solving this would be to introduce a new API that allows > > extracting a key at the source and will keep this key on the elements > until > > the sink. For example: > > > > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); > > input > > .map() > > .window(...) // notice that we don't need keyBy because it is implicit > > .reduce(...) > > .map(...) > > .window(...) > > ... > > > > The DeluxeKeyedStream (name preliminary ;-) would allow the operations > that > > we today have on KeyedStream and on DataStream and it would always > maintain > > the key that was assigned at the sources. The result of each operation > > would again be a DeluxeKeyedStream. This way, we could track watermarks > per > > key. > > > > I know it's a bit of a (very) lengthy mail, but what do you think? > > > > > > On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <[hidden email]> > wrote: > > > > > Hey all, > > > > > > Let me share some ideas about this. > > > > > > @Paris: The local-only progress tracking indeed seems easier, we do not > > > need to broadcast anything. Implementation-wise it is easier, but > > > performance-wise probably not. If one key can come from multiple > > > sources, there could be a lot more network overhead with per-key > > > tracking then broadcasting, somewhat paradoxically. Say source instance > > > S1 sends messages and watermarks to operator instances O1, O2. In the > > > broadcasting case, S1 would send one message to O1 and one to O2 per > > > watermark (of course it depends on how fast the watermarks arrive), > > > total of 2. Although, if we keep track of per-key watermarks, S1 would > > > need to send watermarks for every key directed to O1, also for O2. So > if > > > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if > watermarks > > > arrive at the same rate per-key as per-source in the previous case) we > > > S1 would send a total of 20 watermarks. > > > > > > Another question is whether how large the state-per-key is? If it's > > > really small (an integer maybe, or state of a small state machine), > then > > > the overhead of keeping track of a (Long) watermark is large > > > memory-wise. E.g. Int state vs. Long watermark results in 3x as large > > > state. Also, the checkpointing would be ~3x as slow. Of course, for > > > large states a Long watermark would not mean much overhead. > > > > > > We could resolve the memory issue by using some kind of sketch data > > > structure. Right now the granularity of watermark handling is > > > per-operator-instance. On the other hand, per-key granularity might be > > > costly. What if we increased the granularity of watermarks inside an > > > operator by keeping more than one watermark tracker in one operator? > > > This could be quite simply done with a hash table. With a hash table of > > > size 1, we would yield the current semantics (per-operator-instance > > > granularity). With a hash table large enough to have at most one key > per > > > bucket, we would yield per-key watermark tracking. In between lies the > > > trade-off between handling time-skew and a lot of memory overhead. This > > > does not seem hard to implement. > > > > > > Of course, at some point we would still need to take care of watermarks > > > per-key. Imagine that keys A and B would go to the same bucket of the > > > hash table, and watermarks are coming in like this: (B,20), (A,10), > > > (A,15), (A,40). Then the watermark of the bucket should be the minimum > > > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of > > > the watermarks of A and B separately. But after we have a correct > > > watermark for the bucket, all we need to care about is the bucket > > > watermarks. So somewhere (most probably at the source) we would have to > > > pay memory overhead of tracking every key, but nowhere else in the > > > topology. > > > > > > Regarding the potentially large network overhead, the same compression > > > could be useful. I.e. we would not send watermarks from one operator > > > per-key, but rather per-hash. Again, the trade-off between time skew > and > > > memory consumption is configurable by the size of the hash table used. > > > > > > Cheers, > > > Gabor > > > > > > On 2017-02-23 08:57, Paris Carbone wrote: > > > > > > > Hey Jamie! > > > > > > > > Key-based progress tracking sounds like local-only progress tracking > to > > > me, there is no need to use a low watermarking mechanism at all since > all > > > streams of a key are handled by a single partition at a time (per > > operator). > > > > Thus, this could be much easier to implement and support (i.e., no > need > > > to broadcast the progress state of each partition all the time). > > > > State-wise it should be fine too if it is backed by rocksdb, > especially > > > if we have MapState in the future. > > > > > > > > Just my quick thoughts on this, to get the discussion going :) > > > > > > > > cheers > > > > Paris > > > > > > > >> On 23 Feb 2017, at 01:01, Jamie Grier <[hidden email]> > > wrote: > > > >> > > > >> Hi Flink Devs, > > > >> > > > >> Use cases that I see quite frequently in the real world would > benefit > > > from > > > >> a different watermarking / event time model than the one currently > > > >> implemented in Flink. > > > >> > > > >> I would call Flink's current approach partition-based watermarking > or > > > maybe > > > >> subtask-based watermarking. In this model the current "event time" > is > > a > > > >> property local to each subtask instance in a dataflow graph. The > event > > > >> time at any subtask is the minimum of the watermarks it has received > > on > > > >> each of it's input streams. > > > >> > > > >> There are a couple of issues with this model that are not optimal > for > > > some > > > >> (maybe many) use cases. > > > >> > > > >> 1) A single slow subtask (or say source partition) anywhere in the > > > dataflow > > > >> can mean no progress can be made on the computation at all. > > > >> > > > >> 2) In many real world scenarios the time skew across keys can be > > *many* > > > >> times greater than the time skew within the data with the same key. > > > >> > > > >> In this discussion I'll use "time skew" to refer to the > > out-of-orderness > > > >> with respect to timestamp of the data. Out-of-orderness is a > mouthful > > > ;) > > > >> > > > >> Anyway, let me provide an example or two. > > > >> > > > >> In IoT applications the source of events is a particular device out > in > > > the > > > >> world, let's say a device in a connected car application. The data > for > > > >> some particular device may be very bursty and we will certainly get > > > events > > > >> from these devices in Flink out-of-order just because of things like > > > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew > in > > > the > > > >> data for a single device should likely be very small (milliseconds > or > > > maybe > > > >> seconds).. > > > >> > > > >> However, in the same application the time skew across different > > devices > > > can > > > >> be huge (hours or even days). An obvious example of this, again > using > > > >> connected cars as a representative example is the following: Car A > is > > > >> recording data locally at 12:00 pm on Saturday but doesn't currently > > > have a > > > >> network connection. Car B is doing the same thing but does have a > > > network > > > >> connection. Car A will transmit it's data when the network comes > back > > > on > > > >> line. Let's say this is at 4pm. Car B was transmitting it's data > > > >> immediately. This creates a huge time skew (4 hours) in the observed > > > >> datastream when looked at as a whole. However, the time skew in that > > > data > > > >> for Car A or Car B alone could be tiny. It will be out of order of > > > course > > > >> but maybe by only milliseconds or seconds. > > > >> > > > >> What the above means in the end for Flink is that the watermarks > must > > be > > > >> delayed by up to 4 hours or more because we're looking at the data > > > stream > > > >> as a whole -- otherwise the data for Car A will be considered late. > > The > > > >> time skew in the data stream when looked at as a whole is large even > > > though > > > >> the time skew for any key may be tiny. > > > >> > > > >> This is the problem I would like to see a solution for. The basic > idea > > > of > > > >> keeping track of watermarks and event time "per-key" rather than per > > > >> partition or subtask would solve I think both of these problems > stated > > > >> above and both of these are real issues for production applications. > > > >> > > > >> The obvious downside of trying to do this per-key is that the amount > > of > > > >> state you have to track is much larger and potentially unbounded. > > > However, > > > >> I could see this approach working if the keyspace isn't growing > > rapidly > > > but > > > >> is stable or grows slowly. The saving grace here is that this may > > > actually > > > >> be true of the types of applications where this would be especially > > > >> useful. Think IoT use cases. Another approach to keeping state size > in > > > >> check would be a configurable TTL for a key. > > > >> > > > >> Anyway, I'm throwing this out here on the mailing list in case > anyone > > is > > > >> interested in this discussion, has thought about the problem deeply > > > >> already, has use cases of their own they've run into or has ideas > for > > a > > > >> solution to this problem. > > > >> > > > >> Thanks for reading.. > > > >> > > > >> -Jamie > > > >> > > > >> > > > >> -- > > > >> > > > >> Jamie Grier > > > >> data Artisans, Director of Applications Engineering > > > >> @jamiegrier <https://twitter.com/jamiegrier> > > > >> [hidden email] > > > > > > > > > > > > > -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> [hidden email] |
Free forum by Nabble | Edit this page |