Hi all,
Currently, I am working on FLINK-8577 Implement proctime DataStream to Table upsert conversion <https://issues.apache.org/jira/browse/FLINK-8577>. And a design doc can be found here <https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing>. It received many valuable suggestions. Many thanks to all of you. However there are some problems I think may need more discussion. *Terms* 1. *Upsert Stream:* Stream that include a key definition and will be updated. Message types include insert, update and delete. 2. *Upsert Source:* Source that ingest Upsert Stream. 3. *Empty Delete:* For a specific key, the first message is a delete message. *Problem to be discussed* How to handle empty deletes for UpsertSource? *Ways to solve the problem* 1. Throw away empty delete messages in the UpsertSource(personally in favor of this option) - advantages - This makes sense in semantics. An empty table + delete message is still an empty table. Losing deletes does not affect the final results. - At present, the operators or functions in flink are assumed to process the add message first and then delete. Throw away empty deletes in source, so that the downstream operators do not need to consider the empty deletes. - disadvantages - Maintaining the state in source is expensive, especially for some simple sql like: UpsertSource -> Calc -> UpsertSink. 2. Throw away empty delete messages when source generate retractions, otherwise pass empty delete messages down - advantages - Downstream operator does not need to consider empty delete messages when the source generates retraction. - Performance is better since source don't have to maintain state if it doesn't generate retractions. - disadvantages - The judgment that whether the downstream operator will receive empty delete messages is complicated. Not only take source into consideration, but also should consider the operators that are followed by source. Take join as an example, for the sql: upsert_source -> upsert_join, the join receives empty deletes while in sql(upsert_source -> group_by -> upsert_join), the join doesn't since empty deletes are ingested by group_by. - The semantics of how to process empty deletes are not clear. Users may be difficult to understand, because sometimes empty deletes are passed down, but sometimes don't. 3. Pass empty delete messages down always - advantages - Performance is better since source don't have to maintain state if it doesn't generate retractions. - disadvantages - All table operators and functions in flink need to consider empty deletes. *Another related problem* Another related problem is FLINK-9528 Incorrect results: Filter does not treat Upsert messages correctly <https://issues.apache.org/jira/browse/FLINK-9528> which I think should be considered together. The problem in FLINK-9528 is, for sql like upsert_source -> filter -> upsert_sink, when the data of a key changes from non-filtering to filtering, the filter only removes the upsert message such that the previous version remains in the result. 1. One way to solve the problem is to make UpserSource generates retractions. 2. Another way is to make a filter aware of the update semantics (retract or upsert) and convert the upsert message into a delete message if the predicate evaluates to false. The second way will also generate many empty delete messages. To avoid too many empty deletes, the solution is to maintain a filter state at sink to prevent the empty deletes from causing devastating pressure on the physical database. However, if UpsertSource can also output empty deletes, these empty deletes will be more difficult to control. We don't know where these deletes come from, and whether should be filtered out. The ambiguity of the semantics of processing empty deletes makes the user unable to judge whether there will be empty deletes output. *My personal opinion* From my point of view, I think the first option(Throw away empty delete messages in the UpsertSource) is the best, not only because the semantics are more clear but also the processing logic of the entire table layer can be more simple thus more efficient. Furthermore the performance loss is acceptable (We can even only store key in state when source doesn't generate retraction). Any suggestions are greatly appreciated! Best, Hequn |
Hi,
Thanks for bringing up this issue here. I’m not sure whether sometimes swallowing empty deletes could be a problem or always swallowing/forwarding them is better. I guess for most use cases it doesn't matter. Maybe the best for now would be to always forward them, since if they are a problem, user could handle them somehow, either in custom sink wrapper or in system that’s downstream from Flink. Also maybe we could have this configurable in the future. However this thing seems to me like a much lower priority compared to performance implications. Forcing upsert source to always keep all of the keys on the state is not only costly, but in many cases it can be a blocker from executing a query at all. Not only for the UpsertSource -> Calc -> UpsertSink, but also for example in the future for joins or ORDER BY (especially with LIMIT) as well. I would apply same reasoning to FLINK-9528. Piotrek > On 19 Aug 2018, at 08:21, Hequn Cheng <[hidden email]> wrote: > > Hi all, > > Currently, I am working on FLINK-8577 Implement proctime DataStream to > Table upsert conversion <https://issues.apache.org/jira/browse/FLINK-8577>. > And a design doc can be found here > <https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing>. > It received many valuable suggestions. Many thanks to all of you. > However there are some problems I think may need more discussion. > > *Terms* > > 1. *Upsert Stream:* Stream that include a key definition and will be > updated. Message types include insert, update and delete. > 2. *Upsert Source:* Source that ingest Upsert Stream. > 3. *Empty Delete:* For a specific key, the first message is a delete > message. > > *Problem to be discussed* > How to handle empty deletes for UpsertSource? > > *Ways to solve the problem* > > 1. Throw away empty delete messages in the UpsertSource(personally in > favor of this option) > - advantages > - This makes sense in semantics. An empty table + delete message is > still an empty table. Losing deletes does not affect the final results. > - At present, the operators or functions in flink are assumed to > process the add message first and then delete. Throw away > empty deletes in > source, so that the downstream operators do not need to > consider the empty > deletes. > - disadvantages > - Maintaining the state in source is expensive, especially for some > simple sql like: UpsertSource -> Calc -> UpsertSink. > 2. Throw away empty delete messages when source generate > retractions, otherwise pass empty delete messages down > - advantages > - Downstream operator does not need to consider empty delete messages > when the source generates retraction. > - Performance is better since source don't have to maintain state > if it doesn't generate retractions. > - disadvantages > - The judgment that whether the downstream operator will receive > empty delete messages is complicated. Not only take source into > consideration, but also should consider the operators that > are followed by > source. Take join as an example, for the sql: upsert_source > -> upsert_join, > the join receives empty deletes while in sql(upsert_source -> > group_by -> > upsert_join), the join doesn't since empty deletes are ingested by > group_by. > - The semantics of how to process empty deletes are not clear. > Users may be difficult to understand, because sometimes empty > deletes are > passed down, but sometimes don't. > 3. Pass empty delete messages down always > - advantages > - Performance is better since source don't have to maintain state if > it doesn't generate retractions. > - disadvantages > - All table operators and functions in flink need to consider empty > deletes. > > *Another related problem* > Another related problem is FLINK-9528 Incorrect results: Filter does not > treat Upsert messages correctly > <https://issues.apache.org/jira/browse/FLINK-9528> which I think should be > considered together. > The problem in FLINK-9528 is, for sql like upsert_source -> filter -> > upsert_sink, when the data of a key changes from non-filtering to > filtering, the filter only removes the upsert message such that the > previous version remains in the result. > > 1. One way to solve the problem is to make UpserSource generates > retractions. > 2. Another way is to make a filter aware of the update semantics > (retract or upsert) and convert the upsert message into a delete message if > the predicate evaluates to false. > > The second way will also generate many empty delete messages. To avoid too > many empty deletes, the solution is to maintain a filter state at sink to > prevent the empty deletes from causing devastating pressure on the physical > database. However, if UpsertSource can also output empty deletes, these > empty deletes will be more difficult to control. We don't know where these > deletes come from, and whether should be filtered out. The ambiguity of the > semantics of processing empty deletes makes the user unable to judge > whether there will be empty deletes output. > > *My personal opinion* > From my point of view, I think the first option(Throw away empty delete > messages in the UpsertSource) is the best, not only because the semantics > are more clear but also the processing logic of the entire table layer can > be more simple thus more efficient. Furthermore the performance loss is > acceptable (We can even only store key in state when source doesn't > generate retraction). > > Any suggestions are greatly appreciated! > > Best, Hequn |
Hi,
Thanks fort starting this discussion Hequn! These are a tricky questions. 1) Handling empty deletes in UpsertSource. I think forwarding empty deletes would only have a semantic difference if the output is persisted in a non-empty external table, e.g., a Cassandra table with entries. If we would forward, the delete, we might remove data from the sink table. This could be a desired effect. Therefore, I think we should be able to forward empty deleted and filtering them out could be a configurable option. 2) Handling upsert messages in Filters. It think the problem is a bit better described with the following query which writes to an upsert sink: SELECT user, count(*) FROM clicks GROUP BY user HAVING count(*) < 10 As long as the count for a user is smaller than 10, an update is passed to the upsert sink. When the count reaches 10, the Filter would need to convert the update message into a delete message (right now, the filter just removes the message completely). This would happen for every update of the count, i.e., the upsert sink would need to handle many delete messages for data that is not longer in the external storage (repeated deleted). There are multiple ways to handle this issue: * Make the Filter stateful and only convert the first message into a delete and filter all subsequent updates. (Could also be a best effort cache, LRU...) * Make the UpsertSink opterator (optionally) stateful and track all deleted entries. (Could also be a best effort cache, LRU...) * For the (common?) special case of Aggregation -> Filter, we could offer a dedicated operator that applies the filter within the aggregation. --- So, we are dealing with two cases here: 1) Deleting what has not been ingested yet (although the result might be in the external sink). I would forward deletes and filter them optionally, i.e., approach 3 by default and having approach 1 as an option 2) Deleting what has been deleted already. I think having a best-effort cache in the Filter might be the best approach. The GROUP-BY-HAVING operator might be a nice addition. IMO, we should not give guarantees that an UpsertSink won't receive repeated deletes. If this is a problem for certain sink system, we could give an option for an exact filter based on state. What do you think? Best, Fabian 2018-08-20 13:51 GMT+02:00 Piotr Nowojski <[hidden email]>: > Hi, > > Thanks for bringing up this issue here. > > I’m not sure whether sometimes swallowing empty deletes could be a problem > or always swallowing/forwarding them is better. I guess for most use cases > it doesn't matter. Maybe the best for now would be to always forward them, > since if they are a problem, user could handle them somehow, either in > custom sink wrapper or in system that’s downstream from Flink. Also maybe > we could have this configurable in the future. > > However this thing seems to me like a much lower priority compared to > performance implications. Forcing upsert source to always keep all of the > keys on the state is not only costly, but in many cases it can be a blocker > from executing a query at all. Not only for the UpsertSource -> Calc -> > UpsertSink, but also for example in the future for joins or ORDER BY > (especially with LIMIT) as well. > > I would apply same reasoning to FLINK-9528. > > Piotrek > > > On 19 Aug 2018, at 08:21, Hequn Cheng <[hidden email]> wrote: > > > > Hi all, > > > > Currently, I am working on FLINK-8577 Implement proctime DataStream to > > Table upsert conversion <https://issues.apache.org/ > jira/browse/FLINK-8577>. > > And a design doc can be found here > > <https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVez > aWe0y7Xqd0c1zE/edit?usp=sharing>. > > It received many valuable suggestions. Many thanks to all of you. > > However there are some problems I think may need more discussion. > > > > *Terms* > > > > 1. *Upsert Stream:* Stream that include a key definition and will be > > updated. Message types include insert, update and delete. > > 2. *Upsert Source:* Source that ingest Upsert Stream. > > 3. *Empty Delete:* For a specific key, the first message is a delete > > message. > > > > *Problem to be discussed* > > How to handle empty deletes for UpsertSource? > > > > *Ways to solve the problem* > > > > 1. Throw away empty delete messages in the UpsertSource(personally in > > favor of this option) > > - advantages > > - This makes sense in semantics. An empty table + delete message is > > still an empty table. Losing deletes does not affect the final > results. > > - At present, the operators or functions in flink are assumed to > > process the add message first and then delete. Throw away > > empty deletes in > > source, so that the downstream operators do not need to > > consider the empty > > deletes. > > - disadvantages > > - Maintaining the state in source is expensive, especially for some > > simple sql like: UpsertSource -> Calc -> UpsertSink. > > 2. Throw away empty delete messages when source generate > > retractions, otherwise pass empty delete messages down > > - advantages > > - Downstream operator does not need to consider empty delete > messages > > when the source generates retraction. > > - Performance is better since source don't have to maintain state > > if it doesn't generate retractions. > > - disadvantages > > - The judgment that whether the downstream operator will receive > > empty delete messages is complicated. Not only take source into > > consideration, but also should consider the operators that > > are followed by > > source. Take join as an example, for the sql: upsert_source > > -> upsert_join, > > the join receives empty deletes while in sql(upsert_source -> > > group_by -> > > upsert_join), the join doesn't since empty deletes are ingested > by > > group_by. > > - The semantics of how to process empty deletes are not clear. > > Users may be difficult to understand, because sometimes empty > > deletes are > > passed down, but sometimes don't. > > 3. Pass empty delete messages down always > > - advantages > > - Performance is better since source don't have to maintain state if > > it doesn't generate retractions. > > - disadvantages > > - All table operators and functions in flink need to consider empty > > deletes. > > > > *Another related problem* > > Another related problem is FLINK-9528 Incorrect results: Filter does not > > treat Upsert messages correctly > > <https://issues.apache.org/jira/browse/FLINK-9528> which I think should > be > > considered together. > > The problem in FLINK-9528 is, for sql like upsert_source -> filter -> > > upsert_sink, when the data of a key changes from non-filtering to > > filtering, the filter only removes the upsert message such that the > > previous version remains in the result. > > > > 1. One way to solve the problem is to make UpserSource generates > > retractions. > > 2. Another way is to make a filter aware of the update semantics > > (retract or upsert) and convert the upsert message into a delete > message if > > the predicate evaluates to false. > > > > The second way will also generate many empty delete messages. To avoid > too > > many empty deletes, the solution is to maintain a filter state at sink to > > prevent the empty deletes from causing devastating pressure on the > physical > > database. However, if UpsertSource can also output empty deletes, these > > empty deletes will be more difficult to control. We don't know where > these > > deletes come from, and whether should be filtered out. The ambiguity of > the > > semantics of processing empty deletes makes the user unable to judge > > whether there will be empty deletes output. > > > > *My personal opinion* > > From my point of view, I think the first option(Throw away empty delete > > messages in the UpsertSource) is the best, not only because the semantics > > are more clear but also the processing logic of the entire table layer > can > > be more simple thus more efficient. Furthermore the performance loss is > > acceptable (We can even only store key in state when source doesn't > > generate retraction). > > > > Any suggestions are greatly appreciated! > > > > Best, Hequn > > |
In reply to this post by Piotr Nowojski
Hi Hequn,
Thanks for this discussion. Personally, I’m also in favor of option 3. There are two reasons for that: (1) A proctime-based upsert table source does not guarantee the records’ order, which means empty delete messages may not really be "empty". Simply discarding them may cause semantics problems. (2) Materializing the table in the source doesn't sound like an efficient solution, especially considering the downstream operators may also need to materialize the immediate tables many times. Therefore, why not choosing a "lazy strategy", i.e., just forward the messages and let the operators that are sensitive with empty delete to tackle them. As for the filtering problem, maybe the best approach would be to cache all the keys that meet the criteria and send a retract message when it changes. BTW, recently, I’m getting a more and more intense feeling that maybe we should merge the retract message and upsert message into a unified “update message”. (Append Stream VS Update Stream). Best, Xingcan > On Aug 20, 2018, at 7:51 PM, Piotr Nowojski <[hidden email]> wrote: > > Hi, > > Thanks for bringing up this issue here. > > I’m not sure whether sometimes swallowing empty deletes could be a problem or always swallowing/forwarding them is better. I guess for most use cases it doesn't matter. Maybe the best for now would be to always forward them, since if they are a problem, user could handle them somehow, either in custom sink wrapper or in system that’s downstream from Flink. Also maybe we could have this configurable in the future. > > However this thing seems to me like a much lower priority compared to performance implications. Forcing upsert source to always keep all of the keys on the state is not only costly, but in many cases it can be a blocker from executing a query at all. Not only for the UpsertSource -> Calc -> UpsertSink, but also for example in the future for joins or ORDER BY (especially with LIMIT) as well. > > I would apply same reasoning to FLINK-9528. > > Piotrek > >> On 19 Aug 2018, at 08:21, Hequn Cheng <[hidden email]> wrote: >> >> Hi all, >> >> Currently, I am working on FLINK-8577 Implement proctime DataStream to >> Table upsert conversion <https://issues.apache.org/jira/browse/FLINK-8577>. >> And a design doc can be found here >> <https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing>. >> It received many valuable suggestions. Many thanks to all of you. >> However there are some problems I think may need more discussion. >> >> *Terms* >> >> 1. *Upsert Stream:* Stream that include a key definition and will be >> updated. Message types include insert, update and delete. >> 2. *Upsert Source:* Source that ingest Upsert Stream. >> 3. *Empty Delete:* For a specific key, the first message is a delete >> message. >> >> *Problem to be discussed* >> How to handle empty deletes for UpsertSource? >> >> *Ways to solve the problem* >> >> 1. Throw away empty delete messages in the UpsertSource(personally in >> favor of this option) >> - advantages >> - This makes sense in semantics. An empty table + delete message is >> still an empty table. Losing deletes does not affect the final results. >> - At present, the operators or functions in flink are assumed to >> process the add message first and then delete. Throw away >> empty deletes in >> source, so that the downstream operators do not need to >> consider the empty >> deletes. >> - disadvantages >> - Maintaining the state in source is expensive, especially for some >> simple sql like: UpsertSource -> Calc -> UpsertSink. >> 2. Throw away empty delete messages when source generate >> retractions, otherwise pass empty delete messages down >> - advantages >> - Downstream operator does not need to consider empty delete messages >> when the source generates retraction. >> - Performance is better since source don't have to maintain state >> if it doesn't generate retractions. >> - disadvantages >> - The judgment that whether the downstream operator will receive >> empty delete messages is complicated. Not only take source into >> consideration, but also should consider the operators that >> are followed by >> source. Take join as an example, for the sql: upsert_source >> -> upsert_join, >> the join receives empty deletes while in sql(upsert_source -> >> group_by -> >> upsert_join), the join doesn't since empty deletes are ingested by >> group_by. >> - The semantics of how to process empty deletes are not clear. >> Users may be difficult to understand, because sometimes empty >> deletes are >> passed down, but sometimes don't. >> 3. Pass empty delete messages down always >> - advantages >> - Performance is better since source don't have to maintain state if >> it doesn't generate retractions. >> - disadvantages >> - All table operators and functions in flink need to consider empty >> deletes. >> >> *Another related problem* >> Another related problem is FLINK-9528 Incorrect results: Filter does not >> treat Upsert messages correctly >> <https://issues.apache.org/jira/browse/FLINK-9528> which I think should be >> considered together. >> The problem in FLINK-9528 is, for sql like upsert_source -> filter -> >> upsert_sink, when the data of a key changes from non-filtering to >> filtering, the filter only removes the upsert message such that the >> previous version remains in the result. >> >> 1. One way to solve the problem is to make UpserSource generates >> retractions. >> 2. Another way is to make a filter aware of the update semantics >> (retract or upsert) and convert the upsert message into a delete message if >> the predicate evaluates to false. >> >> The second way will also generate many empty delete messages. To avoid too >> many empty deletes, the solution is to maintain a filter state at sink to >> prevent the empty deletes from causing devastating pressure on the physical >> database. However, if UpsertSource can also output empty deletes, these >> empty deletes will be more difficult to control. We don't know where these >> deletes come from, and whether should be filtered out. The ambiguity of the >> semantics of processing empty deletes makes the user unable to judge >> whether there will be empty deletes output. >> >> *My personal opinion* >> From my point of view, I think the first option(Throw away empty delete >> messages in the UpsertSource) is the best, not only because the semantics >> are more clear but also the processing logic of the entire table layer can >> be more simple thus more efficient. Furthermore the performance loss is >> acceptable (We can even only store key in state when source doesn't >> generate retraction). >> >> Any suggestions are greatly appreciated! >> >> Best, Hequn > > |
For the record I was also in favour of option 3. regarding empty deletes.
About stateful Filters on upsert stream. Hequn, I don’t fully understand this: > However, if UpsertSource can also output empty deletes, these > empty deletes will be more difficult to control. We don't know where these > deletes come from, and whether should be filtered out. The ambiguity of the > semantics of processing empty deletes makes the user unable to judge > whether there will be empty deletes output. What’s the problem with best effort “filtering out subsequent/redundant deletes”? Our best effort deletes cache could have semantic: always pass first delete, try to filter out subsequent ones. Also, instead of doing this deletes caching inside Filter, shouldn’t this be implemented as some separate operator, that we can place inside the plan wherever we deem best (one at the end of the pipeline just before sink?). It could give us more elasticity in the future, including things like extending it into “frequent updates cache”. Regarding filter following GROUP BY. We might need to think this through. I might be wrong here (haven’t thought this through myself), but isn’t it true, that all filters are pushed down until they reach one of the following: - source - aggregation - join/correlate (?) - set operations (like intersect/difference) Except of the source, all of them already have a state, thus “best effort cache” could be merged with them. Another thing to consider would be primary key issue of the upsert stream. For example, if the primary key matches aggregation key of the GROUP BY, such aggregation is trivial and could be converted to simple projection. In Fabian’s example: SELECT user, count(*) FROM clicks GROUP BY user HAVING count(*) < 10 If the primary key is user, query is trivial - count will always be at most 1. If primary key differs, to deduplicate deletes we would need to add separate index to handle deduplication efficiently and in this case, such index’s size might be equal to state needed to fully deduplicate deletes in separate operator. Piotrek > On 21 Aug 2018, at 16:17, Xingcan Cui <[hidden email]> wrote: > > Hi Hequn, > > Thanks for this discussion. > > Personally, I’m also in favor of option 3. There are two reasons for that: > > (1) A proctime-based upsert table source does not guarantee the records’ order, which means empty delete messages may not really be "empty". Simply discarding them may cause semantics problems. > (2) Materializing the table in the source doesn't sound like an efficient solution, especially considering the downstream operators may also need to materialize the immediate tables many times. > > Therefore, why not choosing a "lazy strategy", i.e., just forward the messages and let the operators that are sensitive with empty delete to tackle them. > > As for the filtering problem, maybe the best approach would be to cache all the keys that meet the criteria and send a retract message when it changes. > > BTW, recently, I’m getting a more and more intense feeling that maybe we should merge the retract message and upsert message into a unified “update message”. (Append Stream VS Update Stream). > > Best, > Xingcan > >> On Aug 20, 2018, at 7:51 PM, Piotr Nowojski <[hidden email]> wrote: >> >> Hi, >> >> Thanks for bringing up this issue here. >> >> I’m not sure whether sometimes swallowing empty deletes could be a problem or always swallowing/forwarding them is better. I guess for most use cases it doesn't matter. Maybe the best for now would be to always forward them, since if they are a problem, user could handle them somehow, either in custom sink wrapper or in system that’s downstream from Flink. Also maybe we could have this configurable in the future. >> >> However this thing seems to me like a much lower priority compared to performance implications. Forcing upsert source to always keep all of the keys on the state is not only costly, but in many cases it can be a blocker from executing a query at all. Not only for the UpsertSource -> Calc -> UpsertSink, but also for example in the future for joins or ORDER BY (especially with LIMIT) as well. >> >> I would apply same reasoning to FLINK-9528. >> >> Piotrek >> >>> On 19 Aug 2018, at 08:21, Hequn Cheng <[hidden email]> wrote: >>> >>> Hi all, >>> >>> Currently, I am working on FLINK-8577 Implement proctime DataStream to >>> Table upsert conversion <https://issues.apache.org/jira/browse/FLINK-8577>. >>> And a design doc can be found here >>> <https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing>. >>> It received many valuable suggestions. Many thanks to all of you. >>> However there are some problems I think may need more discussion. >>> >>> *Terms* >>> >>> 1. *Upsert Stream:* Stream that include a key definition and will be >>> updated. Message types include insert, update and delete. >>> 2. *Upsert Source:* Source that ingest Upsert Stream. >>> 3. *Empty Delete:* For a specific key, the first message is a delete >>> message. >>> >>> *Problem to be discussed* >>> How to handle empty deletes for UpsertSource? >>> >>> *Ways to solve the problem* >>> >>> 1. Throw away empty delete messages in the UpsertSource(personally in >>> favor of this option) >>> - advantages >>> - This makes sense in semantics. An empty table + delete message is >>> still an empty table. Losing deletes does not affect the final results. >>> - At present, the operators or functions in flink are assumed to >>> process the add message first and then delete. Throw away >>> empty deletes in >>> source, so that the downstream operators do not need to >>> consider the empty >>> deletes. >>> - disadvantages >>> - Maintaining the state in source is expensive, especially for some >>> simple sql like: UpsertSource -> Calc -> UpsertSink. >>> 2. Throw away empty delete messages when source generate >>> retractions, otherwise pass empty delete messages down >>> - advantages >>> - Downstream operator does not need to consider empty delete messages >>> when the source generates retraction. >>> - Performance is better since source don't have to maintain state >>> if it doesn't generate retractions. >>> - disadvantages >>> - The judgment that whether the downstream operator will receive >>> empty delete messages is complicated. Not only take source into >>> consideration, but also should consider the operators that >>> are followed by >>> source. Take join as an example, for the sql: upsert_source >>> -> upsert_join, >>> the join receives empty deletes while in sql(upsert_source -> >>> group_by -> >>> upsert_join), the join doesn't since empty deletes are ingested by >>> group_by. >>> - The semantics of how to process empty deletes are not clear. >>> Users may be difficult to understand, because sometimes empty >>> deletes are >>> passed down, but sometimes don't. >>> 3. Pass empty delete messages down always >>> - advantages >>> - Performance is better since source don't have to maintain state if >>> it doesn't generate retractions. >>> - disadvantages >>> - All table operators and functions in flink need to consider empty >>> deletes. >>> >>> *Another related problem* >>> Another related problem is FLINK-9528 Incorrect results: Filter does not >>> treat Upsert messages correctly >>> <https://issues.apache.org/jira/browse/FLINK-9528> which I think should be >>> considered together. >>> The problem in FLINK-9528 is, for sql like upsert_source -> filter -> >>> upsert_sink, when the data of a key changes from non-filtering to >>> filtering, the filter only removes the upsert message such that the >>> previous version remains in the result. >>> >>> 1. One way to solve the problem is to make UpserSource generates >>> retractions. >>> 2. Another way is to make a filter aware of the update semantics >>> (retract or upsert) and convert the upsert message into a delete message if >>> the predicate evaluates to false. >>> >>> The second way will also generate many empty delete messages. To avoid too >>> many empty deletes, the solution is to maintain a filter state at sink to >>> prevent the empty deletes from causing devastating pressure on the physical >>> database. However, if UpsertSource can also output empty deletes, these >>> empty deletes will be more difficult to control. We don't know where these >>> deletes come from, and whether should be filtered out. The ambiguity of the >>> semantics of processing empty deletes makes the user unable to judge >>> whether there will be empty deletes output. >>> >>> *My personal opinion* >>> From my point of view, I think the first option(Throw away empty delete >>> messages in the UpsertSource) is the best, not only because the semantics >>> are more clear but also the processing logic of the entire table layer can >>> be more simple thus more efficient. Furthermore the performance loss is >>> acceptable (We can even only store key in state when source doesn't >>> generate retraction). >>> >>> Any suggestions are greatly appreciated! >>> >>> Best, Hequn >> >> > |
In reply to this post by xingcanc
Hi all,
Thanks a lot for your replies, @Piotr Nowojski <[hidden email]> @Fabian Hueske <[hidden email]> @Xingcan Cui <[hidden email]> Option 3 will send empty deletes to downstream operators, but downstream operators seems don't know how to deal with it. Assume that upsert source forward deletes by default. Let's see what problems we have. *Tables* Suppose we have two upsert tables with key. One is category(key: cate_id), the other is item(key: item_id). -----category----- cate_id, cate_v (delete a, 1) -----item----- item_id, cate_id, item_v (add 2, a, 1) (add 2, a, 2) (delete 1, a, 1) (add 1, a, 2) *Cases* case 1: source > insert into xxx > select * from item; In this case, forward the deletes and remove data from the sink table. There seems to be no problem here. case 2: join > insert into xxx > select * from item join category on item.cate_id = category.cate_id; Same to case1, would user want to delete all items of category 'a'? For record (delete a, 1) from category, should it be filtered by join or be joined with all items from item table? If be filtered, it is semantically conflict with case 1. If not be filtered, it means we have to storage delete messages in state of join since data from item table may come later to join. Also, how to deal with these deletes in state is a problem. I think the logic of join will be very complicated. case 3: aggregate > insert into xxx > select sum(item_v) from item What's the result of sum, 4 or 5? If the answer is 4, how can the downstream group_by tell which delete is an empty delete or a retract delete? PS: the changelog from item table to group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably can add a message type to solve the problem, but it definitely will bring large changes and I don't think it is time to do it. If the answer is 3, the result is definitely wrong. case 4: Filter > insert into xxx > select item_id, item_v from item where item_v < 2 and item_id = 1 For item of item_id=1, if Filter also generates empty deletes, these two kinds of deletes can not be distinguished by the sink. The problem is: should we filter empty deletes in sink? If yes, it is semantically conflicting with case 1. If no, deletes generated by Filter can not be filtered, this can cause devastating pressure on the physical database. Furthermore, I don't think we can use best effort. Considering the anti-spamming scenario, users probably just want to get top 1% data from the result, the rest 99% of the data will all become delete messages after the Filter. This would be a disaster for a storage. Especially for the case, most of the coming keys are new ones and can not be swallowed by a cache with best effort. We can not release a version of flink that take a chance to bring a disaster to our user, even the change is small. Thus, I don't think we should allow empty deletes output from source. It not only extends a new feature, but also extends a new concept that a dynamic table allows empty deletes in it. Once there are empty deletes in a dynamic table, we need to support processing empty deletes in all operators of table. We also have to define clear semantics on it. Thanks again for all your replies and patience. Best, Hequn On Tue, Aug 21, 2018 at 10:17 PM Xingcan Cui <[hidden email]> wrote: > Hi Hequn, > > Thanks for this discussion. > > Personally, I’m also in favor of option 3. There are two reasons for that: > > (1) A proctime-based upsert table source does not guarantee the records’ > order, which means empty delete messages may not really be "empty". Simply > discarding them may cause semantics problems. > (2) Materializing the table in the source doesn't sound like an efficient > solution, especially considering the downstream operators may also need to > materialize the immediate tables many times. > > Therefore, why not choosing a "lazy strategy", i.e., just forward the > messages and let the operators that are sensitive with empty delete to > tackle them. > > As for the filtering problem, maybe the best approach would be to cache > all the keys that meet the criteria and send a retract message when it > changes. > > BTW, recently, I’m getting a more and more intense feeling that maybe we > should merge the retract message and upsert message into a unified “update > message”. (Append Stream VS Update Stream). > > Best, > Xingcan > > > On Aug 20, 2018, at 7:51 PM, Piotr Nowojski <[hidden email]> > wrote: > > > > Hi, > > > > Thanks for bringing up this issue here. > > > > I’m not sure whether sometimes swallowing empty deletes could be a > problem or always swallowing/forwarding them is better. I guess for most > use cases it doesn't matter. Maybe the best for now would be to always > forward them, since if they are a problem, user could handle them somehow, > either in custom sink wrapper or in system that’s downstream from Flink. > Also maybe we could have this configurable in the future. > > > > However this thing seems to me like a much lower priority compared to > performance implications. Forcing upsert source to always keep all of the > keys on the state is not only costly, but in many cases it can be a blocker > from executing a query at all. Not only for the UpsertSource -> Calc -> > UpsertSink, but also for example in the future for joins or ORDER BY > (especially with LIMIT) as well. > > > > I would apply same reasoning to FLINK-9528. > > > > Piotrek > > > >> On 19 Aug 2018, at 08:21, Hequn Cheng <[hidden email]> wrote: > >> > >> Hi all, > >> > >> Currently, I am working on FLINK-8577 Implement proctime DataStream to > >> Table upsert conversion < > https://issues.apache.org/jira/browse/FLINK-8577>. > >> And a design doc can be found here > >> < > https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing > >. > >> It received many valuable suggestions. Many thanks to all of you. > >> However there are some problems I think may need more discussion. > >> > >> *Terms* > >> > >> 1. *Upsert Stream:* Stream that include a key definition and will be > >> updated. Message types include insert, update and delete. > >> 2. *Upsert Source:* Source that ingest Upsert Stream. > >> 3. *Empty Delete:* For a specific key, the first message is a delete > >> message. > >> > >> *Problem to be discussed* > >> How to handle empty deletes for UpsertSource? > >> > >> *Ways to solve the problem* > >> > >> 1. Throw away empty delete messages in the UpsertSource(personally in > >> favor of this option) > >> - advantages > >> - This makes sense in semantics. An empty table + delete message is > >> still an empty table. Losing deletes does not affect the final > results. > >> - At present, the operators or functions in flink are assumed to > >> process the add message first and then delete. Throw away > >> empty deletes in > >> source, so that the downstream operators do not need to > >> consider the empty > >> deletes. > >> - disadvantages > >> - Maintaining the state in source is expensive, especially for some > >> simple sql like: UpsertSource -> Calc -> UpsertSink. > >> 2. Throw away empty delete messages when source generate > >> retractions, otherwise pass empty delete messages down > >> - advantages > >> - Downstream operator does not need to consider empty delete > messages > >> when the source generates retraction. > >> - Performance is better since source don't have to maintain state > >> if it doesn't generate retractions. > >> - disadvantages > >> - The judgment that whether the downstream operator will receive > >> empty delete messages is complicated. Not only take source into > >> consideration, but also should consider the operators that > >> are followed by > >> source. Take join as an example, for the sql: upsert_source > >> -> upsert_join, > >> the join receives empty deletes while in sql(upsert_source -> > >> group_by -> > >> upsert_join), the join doesn't since empty deletes are ingested > by > >> group_by. > >> - The semantics of how to process empty deletes are not clear. > >> Users may be difficult to understand, because sometimes empty > >> deletes are > >> passed down, but sometimes don't. > >> 3. Pass empty delete messages down always > >> - advantages > >> - Performance is better since source don't have to maintain state if > >> it doesn't generate retractions. > >> - disadvantages > >> - All table operators and functions in flink need to consider empty > >> deletes. > >> > >> *Another related problem* > >> Another related problem is FLINK-9528 Incorrect results: Filter does not > >> treat Upsert messages correctly > >> <https://issues.apache.org/jira/browse/FLINK-9528> which I think > should be > >> considered together. > >> The problem in FLINK-9528 is, for sql like upsert_source -> filter -> > >> upsert_sink, when the data of a key changes from non-filtering to > >> filtering, the filter only removes the upsert message such that the > >> previous version remains in the result. > >> > >> 1. One way to solve the problem is to make UpserSource generates > >> retractions. > >> 2. Another way is to make a filter aware of the update semantics > >> (retract or upsert) and convert the upsert message into a delete > message if > >> the predicate evaluates to false. > >> > >> The second way will also generate many empty delete messages. To avoid > too > >> many empty deletes, the solution is to maintain a filter state at sink > to > >> prevent the empty deletes from causing devastating pressure on the > physical > >> database. However, if UpsertSource can also output empty deletes, these > >> empty deletes will be more difficult to control. We don't know where > these > >> deletes come from, and whether should be filtered out. The ambiguity of > the > >> semantics of processing empty deletes makes the user unable to judge > >> whether there will be empty deletes output. > >> > >> *My personal opinion* > >> From my point of view, I think the first option(Throw away empty delete > >> messages in the UpsertSource) is the best, not only because the > semantics > >> are more clear but also the processing logic of the entire table layer > can > >> be more simple thus more efficient. Furthermore the performance loss is > >> acceptable (We can even only store key in state when source doesn't > >> generate retraction). > >> > >> Any suggestions are greatly appreciated! > >> > >> Best, Hequn > > > > > > |
Hi,
Thanks for very detailed explanation :) Please check my inlined responses. > On 22 Aug 2018, at 14:28, Hequn Cheng <[hidden email]> wrote: > > Hi all, > Thanks a lot for your replies, @Piotr Nowojski <mailto:[hidden email]> @Fabian Hueske <mailto:[hidden email]> @Xingcan Cui <mailto:[hidden email]> > > Option 3 will send empty deletes to downstream operators, but downstream operators seems don't know how to deal with it. > Assume that upsert source forward deletes by default. Let's see what problems we have. > > Tables > Suppose we have two upsert tables with key. One is category(key: cate_id), the other is item(key: item_id). > -----category----- > cate_id, cate_v > (delete a, 1) > > -----item----- > item_id, cate_id, item_v > (add 2, a, 1) > (add 2, a, 2) > (delete 1, a, 1) > (add 1, a, 2) I’m assuming that we are talking about event time and that `(delete 1, a, 1)` happened before `(add 1, a, 2)`, right? > Cases > case 2: join > insert into xxx > select * from item join category on item.cate_id = category.cate_id; > Same to case1, would user want to delete all items of category 'a'? For record (delete a, 1) from category, should it be filtered by join or be joined with all items from item table? > If be filtered, it is semantically conflict with case 1. > If not be filtered, it means we have to storage delete messages in state of join since data from item table may come later to join. Also, how to deal with these deletes in state is a problem. I think the logic of join will be very complicated. Maybe we should say, that’s unspecified behaviour and Flink can either forward empty deletes or filter them out depending on what’s more efficient/easier. In the end this whole topic is very fishy. Look at it from the Flink’s perspective. User is trying to remove the data, that was never there in the first place. If it wasn’t there, why should we retract/produce deletes for future records/join results? Should Flink “imagine” that there was a record that it hasn’t seen? Should we apply this logic to other records that Flink “hasn’t seen”? Another potential issue. Previously I assumed that upsert deletes do not have to carry the data that they are removing, just delete flag and primary key. If that’s the case, if there is an aggregation after the join, how could we even handle empty delete? We wouldn’t be able to produce valid retract message for the follow up aggregation. > > case 3: aggregate > insert into xxx > select sum(item_v) from item > What's the result of sum, 4 or 5? > If the answer is 4, how can the downstream group_by tell which delete is an empty delete or a retract delete? PS: the changelog from item table to group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably can add a message type to solve the problem, but it definitely will bring large changes and I don't think it is time to do it. > If the answer is 3, the result is definitely wrong. If item was an upsert stream with item_id as a primary key, the answer can only be 4. However I do not see any problem here, since aggregations require updates as retractions anyway, thus we can handle empty deletes in UpsertToRetractionsOperator anyway we wish - filtering them out here or marking them as “empty deletes” for forwarding. Marking as “empty deletes” would have to be handled by aggregation in special way - forward it, but do not update the state. > > case 4: Filter > insert into xxx > select item_id, item_v from item where item_v < 2 and item_id = 1 I see also different problems here. What if again, as I mentioned previously, delete message has no content? What if delete message has incorrect content, that doesn’t match to the latest update? Shouldn’t filter always pass delete message without looking into it’s content? > For item of item_id=1, if Filter also generates empty deletes, these two kinds of deletes can not be distinguished by the sink. The problem is: should we filter empty deletes in sink? > If yes, it is semantically conflicting with case 1. > If no, deletes generated by Filter can not be filtered, this can cause devastating pressure on the physical database. Again, I fail to see semantics problem here. The data was not there in the first place, so whether we forward delete or not shouldn’t matter from semantics perspective: end result is still the same. If it does matter, user should let Flink ingest the stream from some valid/consistent point, not in the middle of it. For me, the only valid concern regarding empty deletes is performance perspective. > Furthermore, I don't think we can use best effort. Considering the anti-spamming scenario, users probably just want to get top 1% data from the result, the rest 99% of the data will all become delete messages after the Filter. This would be a disaster for a storage. Especially for the case, most of the coming keys are new ones and can not be swallowed by a cache with best effort. We can not release a version of flink that take a chance to bring a disaster to our user, even the change is small. What you are saying is highly use case specific. If you enforce full empty deletions removal in this case, you will end up with terrible performance in other cases/setups. I think configurable best effort cache is still the way to go for the long run. It could be configured with number of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect cache. However I’m not even sure how high priority it should get. > Thus, I don't think we should allow empty deletes output from source. If we decide to go this way, why even bother with upsert streams and supporting them inside our pipeline? Why not convert everything to retractions in sources and work always on retractions? It would simplify our code and we wouldn’t need the logic from `DataStreamRetractionRules`. To sum up, I really fail to see any semantic problems here. Regardless if we forward/filter out empty deletes the end result is always the same: there was no record there in the first place. In set theory following operations give the same outcome: {1, 2, 3} \ {4} {1, 2, 3} \ {} {1, 2, 3} \ {42} If there was a record there because user started Flink in a middle of a stream, the result is still undefined (the join case that you mentioned), since Flink could skip or ingest extra any number of messages (deletes or not). Piotrek |
Hi Piotrek,
Great to see your replies, and really thanks for all your suggestions. Inline is a good way, i will do it same as you :-) *> I’m assuming that we are talking about event time and that `(delete 1, a, 1)` happened before `(add 1, a, 2)`, right?* We are talking about processing time(FLINK-8577 <https://issues.apache.org/jira/browse/FLINK-8577>). Event time is the next topic(FLINK-8578 <https://issues.apache.org/jira/browse/FLINK-8578>). And `(delete 1, a, 1)` is an empty delete message comes before `(add 1, a, 2)`. However, (add 2, a, 1) & (add 2, a, 2) may come before or after (delete 1, a, 1) &(add 1, a, 2). *> Maybe we should say, that’s unspecified behaviour and Flink can either forward empty deletes or filter them out depending on what’s more efficient/easier. In the end this whole topic is very fishy. Look at it from the Flink’s perspective. User is trying to remove the data, that was never there in the first place. If it wasn’t there, why should we retract/produce deletes for future records/join results? Should Flink “imagine” that there was a record that it hasn’t seen? Should we apply this logic to other records that Flink “hasn’t seen”?* I don't think unspecified behavior is a good way. Yes, from the Flink's perspective, I think we should not retract/produce for future records/join results if it wasn't there, and this behavior should be consistent among all operators including upsert source. We should not imagine that there was a record that it hasn't seen, thus throw it away from the source. *> Another potential issue. Previously I assumed that upsert deletes do not have to carry the data that they are removing, just delete flag and primary key. If that’s the case, if there is an aggregation after the join, how could we even handle empty delete? We wouldn’t be able to produce valid retract message for the follow up aggregation. * Yes, It is true that we can't handle empty deletes in flink. As I said, downstream operators even don't know how to deal with it. *> If item was an upsert stream with item_id as a primary key, the answer can only be 4. However I do not see any problem here, since aggregations require updates as retractions anyway, thus we can handle empty deletes in UpsertToRetractionsOperator anyway we wish - filtering them out here or marking them as “empty deletes” for forwarding. Marking as “empty deletes” would have to be handled by aggregation in special way - forward it, but do not update the state.* Agree, the answer should only be 4. As you are in favor of option 3, I think you are in favor of forwarding empty deletes in UpsertToRetractionsOperator since the operator is part of upsert source.(UpsertToRetractionsOperator is one way to implement upsert source, there are other ways.) And yes we can add a message type to distinguish the empty deletes from retractions. What I concern is we have to adapt all operators to the message type. What's more these empty deletes are useless for downstream operators(even for sink), because of the changes of key dimension. *> I see also different problems here. What if again, as I mentioned previously, delete message has no content? What if delete message has incorrect content, that doesn’t match to the latest update? Shouldn’t filter always pass delete message without looking into it’s content?* No, Filter should not always pass delete messages without looking into it's content. For sql like: > insert into xxx > select sum(item_v) from item where item_v < 2 and item_id = 1 what's the result of sum? -1? -2? PS, the changelog is: > add 1 (item_v=1, item_id = 2) be filtered > retract delete 1 (item_v=1, item_id = 2) no filtered if always pass > delete > add 2 (item_v=2, item_id = 2) be filtered > empty delete 1 (item_v=1, item_id = 1) no filtered if always pass > delete > add 2 (item_v=2, item_id = 1) be filtered *> Again, I fail to see semantics problem here. The data was not there in the first place, so whether we forward delete or not shouldn’t matter from semantics perspective: end result is still the same. If it does matter, user should let Flink ingest the stream from some valid/consistent point, not in the middle of it. For me, the only valid concern regarding empty deletes is performance perspective.* The semantics problem here is: case 1 will pass empty deletes, but case 4 won't, just because of adding a filter. Whether we forward delete or not the result is not the same. Users can perceive this, since data in his physical storage is different. *> What you are saying is highly use case specific. If you enforce full empty deletions removal in this case, you will end up with terrible performance in other cases/setups. I think configurable best effort cache is still the way to go for the long run. It could be configured with number of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect cache. However I’m not even sure how high priority it should get.* I think this is a normal use case for big data analysis, with diversity keys but focus on small part of it. To take a step back, we should not let users take this risk. I agree keeping a state in source will influence the performance, but I don't think it is terrible. We can even only store key in state when source doesn't generate retraction. The performance of the source is much better than the aggregations. Furthermore, there are other ways to improve the performance, for example adding a bloom filter. *> If we decide to go this way, why even bother with upsert streams and supporting them inside our pipeline? Why not convert everything to retractions in sources and work always on retractions? It would simplify our code and we wouldn’t need the logic from `DataStreamRetractionRules`.* I think these are two different things. One is how to handle empty deletes, the other is how to handle updates. *> If there was a record there because user started Flink in a middle of a stream, the result is still undefined (the join case that you mentioned), since Flink could skip or ingest extra any number of messages (deletes or not).* I think the result is clear if we clearly define that the upsert source ignore empty deletes. Flink only skip or ingest messages according to the sql/table-api provided by users. Thanks, Hequn On Thu, Aug 23, 2018 at 6:04 PM Piotr Nowojski <[hidden email]> wrote: > Hi, > > Thanks for very detailed explanation :) Please check my inlined responses. > > On 22 Aug 2018, at 14:28, Hequn Cheng <[hidden email]> wrote: > > Hi all, > Thanks a lot for your replies, @Piotr Nowojski <[hidden email]> @Fabian > Hueske <[hidden email]> @Xingcan Cui <[hidden email]> > > Option 3 will send empty deletes to downstream operators, but downstream > operators seems don't know how to deal with it. > Assume that upsert source forward deletes by default. Let's see what > problems we have. > > *Tables* > Suppose we have two upsert tables with key. One is category(key: cate_id), > the other is item(key: item_id). > -----category----- > cate_id, cate_v > (delete a, 1) > > -----item----- > item_id, cate_id, item_v > (add 2, a, 1) > (add 2, a, 2) > (delete 1, a, 1) > (add 1, a, 2) > > > I’m assuming that we are talking about event time and that `(delete 1, a, > 1)` happened before `(add 1, a, 2)`, right? > > *Cases* > case 2: join > >> insert into xxx >> select * from item join category on item.cate_id = category.cate_id; > > Same to case1, would user want to delete all items of category 'a'? For > record (delete a, 1) from category, should it be filtered by join or be > joined with all items from item table? > If be filtered, it is semantically conflict with case 1. > If not be filtered, it means we have to storage delete messages in state > of join since data from item table may come later to join. Also, how to > deal with these deletes in state is a problem. I think the logic of join > will be very complicated. > > > Maybe we should say, that’s unspecified behaviour and Flink can either > forward empty deletes or filter them out depending on what’s more > efficient/easier. In the end this whole topic is very fishy. Look at it > from the Flink’s perspective. User is trying to remove the data, that was > never there in the first place. If it wasn’t there, why should we > retract/produce deletes for future records/join results? Should Flink > “imagine” that there was a record that it hasn’t seen? Should we apply this > logic to other records that Flink “hasn’t seen”? > > Another potential issue. Previously I assumed that upsert deletes do not > have to carry the data that they are removing, just delete flag and primary > key. If that’s the case, if there is an aggregation after the join, how > could we even handle empty delete? We wouldn’t be able to produce valid > retract message for the follow up aggregation. > > > case 3: aggregate > >> insert into xxx >> select sum(item_v) from item > > What's the result of sum, 4 or 5? > If the answer is 4, how can the downstream group_by tell which delete is > an empty delete or a retract delete? PS: the changelog from item table to > group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably > can add a message type to solve the problem, but it definitely will bring > large changes and I don't think it is time to do it. > If the answer is 3, the result is definitely wrong. > > > If item was an upsert stream with item_id as a primary key, the answer can > only be 4. However I do not see any problem here, since aggregations > require updates as retractions anyway, thus we can handle empty deletes in > UpsertToRetractionsOperator anyway we wish - filtering them out here or > marking them as “empty deletes” for forwarding. Marking as “empty deletes” > would have to be handled by aggregation in special way - forward it, but do > not update the state. > > > case 4: Filter > >> insert into xxx >> select item_id, item_v from item where item_v < 2 and item_id = 1 > > > I see also different problems here. What if again, as I mentioned > previously, delete message has no content? What if delete message has > incorrect content, that doesn’t match to the latest update? Shouldn’t > filter always pass delete message without looking into it’s content? > > For item of item_id=1, if Filter also generates empty deletes, these two > kinds of deletes can not be distinguished by the sink. The problem is: > should we filter empty deletes in sink? > If yes, it is semantically conflicting with case 1. > If no, deletes generated by Filter can not be filtered, this can cause > devastating pressure on the physical database. > > > Again, I fail to see semantics problem here. The data was not there in the > first place, so whether we forward delete or not shouldn’t matter from > semantics perspective: end result is still the same. If it does matter, > user should let Flink ingest the stream from some valid/consistent point, > not in the middle of it. For me, the only valid concern regarding empty > deletes is performance perspective. > > Furthermore, I don't think we can use best effort. Considering the > anti-spamming scenario, users probably just want to get top 1% data from > the result, the rest 99% of the data will all become delete messages after > the Filter. This would be a disaster for a storage. Especially for the > case, most of the coming keys are new ones and can not be swallowed by a > cache with best effort. We can not release a version of flink that take a > chance to bring a disaster to our user, even the change is small. > > > What you are saying is highly use case specific. If you enforce full empty > deletions removal in this case, you will end up with terrible performance > in other cases/setups. I think configurable best effort cache is still the > way to go for the long run. It could be configured with number of > entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect > cache. However I’m not even sure how high priority it should get. > > Thus, I don't think we should allow empty deletes output from source. > > > If we decide to go this way, why even bother with upsert streams and > supporting them inside our pipeline? Why not convert everything to > retractions in sources and work always on retractions? It would simplify > our code and we wouldn’t need the logic from `DataStreamRetractionRules`. > > To sum up, I really fail to see any semantic problems here. Regardless if > we forward/filter out empty deletes the end result is always the same: > there was no record there in the first place. In set theory following > operations give the same outcome: > > {1, 2, 3} \ {4} > {1, 2, 3} \ {} > {1, 2, 3} \ {42} > > If there was a record there because user started Flink in a middle of a > stream, the result is still undefined (the join case that you mentioned), > since Flink could skip or ingest extra any number of messages (deletes or > not). > > Piotrek > > |
Hi Hequn, hi Piotr,
Thanks for pushing this discussion forward and sorry for not responding earlier. After reading the thread, I agree that we do not *need* to (but may) forward empty deletes. As I pointed out before, I see empty deletes not as a semantic problem that needs to be exactly solved but rather as a performance problem that can be optimized (trade-off state costs vs. handling empty delete). Piotr raised a good point. An upsert delete message may consist only of the key fields and the delete flag. Having no data but the keys means we *cannot* handle them as regular records during a join, filter, projection, or aggregation. Upsert streams are only present, if the stream is received by an operator that is able to correctly interpret the upsert messages. Right now, there is only the UpsertSink operator that can handle upsert streams. Join and Aggregation might to support upsert inputs in the future as well. There are the following cases: 1. If there is no operator, that can handle an upsert stream, upserts need to be converted into retractions. Filtering out empty deletes while converting to retractions comes for free. 2. If the receiving operator is a Join or Aggregation, it has all necessary state to check whether the delete is empty or not. In case of an empty delete, it is simply dropped. In both cases (retract stream conversion and stateful upsert operator) we can filter empty deletes for free. The only case left are UpsertSinks. These do not have built-in state, since it is maintained in an external system. As I said before, empty deletes are not a semantic problem. We could forward all empty deletes and the result would still be consistent. However, I understand that empty deletes can cause severe a performance issues. We can address the performance issue with different measures such as best-effort (approximate) filtering or exact state-backed filtering. I think in many cases we can handle empty deletes from upsert sources without adding additional state. As soon as the upsert messages are converted into retraction messages or consumed by a join or aggregation, they can be filtered for free. We only need to add state, if we have an upsert sink AND if that sink wants to remove all empty deletes. There is one more thing that needs to be discussed. How upsert messages are handled by Calc operators. A Calc (projection and/or filter) that receives (and produces) an upsert stream (because it is in front of a Join, Aggregation, UpsertSink) should handle messages as follows: - upsert message/flag=true: upsert messages are handled as regular message. If the predicate evaluates to false, all but the key fields are set to null and the message is forwarded as a delete message - delete message/flag=false: delete messages are converted to the output schema (padded with nulls) and forwarded. What do you think, Fabian Am Fr., 24. Aug. 2018 um 07:33 Uhr schrieb Hequn Cheng <[hidden email] >: > Hi Piotrek, > > Great to see your replies, and really thanks for all your suggestions. > Inline is a good way, i will do it same as you :-) > > *> I’m assuming that we are talking about event time and that `(delete 1, > a, 1)` happened before `(add 1, a, 2)`, right?* > > We are talking about processing time(FLINK-8577 > <https://issues.apache.org/jira/browse/FLINK-8577>). Event time is the > next topic(FLINK-8578 <https://issues.apache.org/jira/browse/FLINK-8578>). > And `(delete 1, a, 1)` is an empty delete message comes before `(add 1, a, > 2)`. However, (add 2, a, 1) & (add 2, a, 2) may come before or after (delete > 1, a, 1) &(add 1, a, 2). > > *> Maybe we should say, that’s unspecified behaviour and Flink can either > forward empty deletes or filter them out depending on what’s more > efficient/easier. In the end this whole topic is very fishy. Look at it > from the Flink’s perspective. User is trying to remove the data, that was > never there in the first place. If it wasn’t there, why should we > retract/produce deletes for future records/join results? Should Flink > “imagine” that there was a record that it hasn’t seen? Should we apply this > logic to other records that Flink “hasn’t seen”?* > > I don't think unspecified behavior is a good way. Yes, from the Flink's > perspective, I think we should not retract/produce for future records/join > results if it wasn't there, and this behavior should be consistent among > all operators including upsert source. We should not imagine that there was > a record that it hasn't seen, thus throw it away from the source. > > *> Another potential issue. Previously I assumed that upsert deletes do > not have to carry the data that they are removing, just delete flag and > primary key. If that’s the case, if there is an aggregation after the join, > how could we even handle empty delete? We wouldn’t be able to produce valid > retract message for the follow up aggregation. * > > Yes, It is true that we can't handle empty deletes in flink. As I said, > downstream operators even don't know how to deal with it. > > *> If item was an upsert stream with item_id as a primary key, the answer > can only be 4. However I do not see any problem here, since aggregations > require updates as retractions anyway, thus we can handle empty deletes in > UpsertToRetractionsOperator anyway we wish - filtering them out here or > marking them as “empty deletes” for forwarding. Marking as “empty deletes” > would have to be handled by aggregation in special way - forward it, but do > not update the state.* > > Agree, the answer should only be 4. As you are in favor of option 3, I > think you are in favor of forwarding empty deletes in > UpsertToRetractionsOperator since the operator is part of upsert > source.(UpsertToRetractionsOperator is one way to implement upsert source, > there are other ways.) > And yes we can add a message type to distinguish the empty deletes from > retractions. What I concern is we have to adapt all operators to the > message type. What's more these empty deletes are useless for downstream > operators(even for sink), because of the changes of key dimension. > > *> I see also different problems here. What if again, as I mentioned > previously, delete message has no content? What if delete message has > incorrect content, that doesn’t match to the latest update? Shouldn’t > filter always pass delete message without looking into it’s content?* > > No, Filter should not always pass delete messages without looking into > it's content. For sql like: > >> insert into xxx >> select sum(item_v) from item where item_v < 2 and item_id = 1 > > what's the result of sum? -1? -2? PS, the changelog is: > >> add 1 (item_v=1, item_id = 2) be filtered >> retract delete 1 (item_v=1, item_id = 2) no filtered if always pass >> delete >> add 2 (item_v=2, item_id = 2) be filtered >> empty delete 1 (item_v=1, item_id = 1) no filtered if always pass >> delete >> add 2 (item_v=2, item_id = 1) be filtered > > > *> Again, I fail to see semantics problem here. The data was not there in > the first place, so whether we forward delete or not shouldn’t matter from > semantics perspective: end result is still the same. If it does matter, > user should let Flink ingest the stream from some valid/consistent point, > not in the middle of it. For me, the only valid concern regarding empty > deletes is performance perspective.* > > The semantics problem here is: case 1 will pass empty deletes, but case 4 > won't, just because of adding a filter. Whether we forward delete or not > the result is not the same. Users can perceive this, since data in his > physical storage is different. > > *> What you are saying is highly use case specific. If you enforce full > empty deletions removal in this case, you will end up with terrible > performance in other cases/setups. I think configurable best effort cache > is still the way to go for the long run. It could be configured with number > of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect > cache. However I’m not even sure how high priority it should get.* > > I think this is a normal use case for big data analysis, with diversity > keys but focus on small part of it. To take a step back, we should not let > users take this risk. I agree keeping a state in source will influence the > performance, but I don't think it is terrible. We can even only store key > in state when source doesn't generate retraction. The performance of the > source is much better than the aggregations. Furthermore, there are other > ways to improve the performance, for example adding a bloom filter. > > *> If we decide to go this way, why even bother with upsert streams and > supporting them inside our pipeline? Why not convert everything to > retractions in sources and work always on retractions? It would simplify > our code and we wouldn’t need the logic from `DataStreamRetractionRules`.* > > I think these are two different things. One is how to handle empty > deletes, the other is how to handle updates. > > *> If there was a record there because user started Flink in a middle of a > stream, the result is still undefined (the join case that you mentioned), > since Flink could skip or ingest extra any number of messages (deletes or > not).* > > I think the result is clear if we clearly define that the upsert source > ignore empty deletes. Flink only skip or ingest messages according to the > sql/table-api provided by users. > > Thanks, Hequn > > > On Thu, Aug 23, 2018 at 6:04 PM Piotr Nowojski <[hidden email]> > wrote: > >> Hi, >> >> Thanks for very detailed explanation :) Please check my inlined responses. >> >> On 22 Aug 2018, at 14:28, Hequn Cheng <[hidden email]> wrote: >> >> Hi all, >> Thanks a lot for your replies, @Piotr Nowojski <[hidden email]> @Fabian >> Hueske <[hidden email]> @Xingcan Cui <[hidden email]> >> >> Option 3 will send empty deletes to downstream operators, but downstream >> operators seems don't know how to deal with it. >> Assume that upsert source forward deletes by default. Let's see what >> problems we have. >> >> *Tables* >> Suppose we have two upsert tables with key. One is category(key: >> cate_id), the other is item(key: item_id). >> -----category----- >> cate_id, cate_v >> (delete a, 1) >> >> -----item----- >> item_id, cate_id, item_v >> (add 2, a, 1) >> (add 2, a, 2) >> (delete 1, a, 1) >> (add 1, a, 2) >> >> >> I’m assuming that we are talking about event time and that `(delete 1, a, >> 1)` happened before `(add 1, a, 2)`, right? >> >> *Cases* >> case 2: join >> >>> insert into xxx >>> select * from item join category on item.cate_id = category.cate_id; >> >> Same to case1, would user want to delete all items of category 'a'? For >> record (delete a, 1) from category, should it be filtered by join or be >> joined with all items from item table? >> If be filtered, it is semantically conflict with case 1. >> If not be filtered, it means we have to storage delete messages in state >> of join since data from item table may come later to join. Also, how to >> deal with these deletes in state is a problem. I think the logic of join >> will be very complicated. >> >> >> Maybe we should say, that’s unspecified behaviour and Flink can either >> forward empty deletes or filter them out depending on what’s more >> efficient/easier. In the end this whole topic is very fishy. Look at it >> from the Flink’s perspective. User is trying to remove the data, that was >> never there in the first place. If it wasn’t there, why should we >> retract/produce deletes for future records/join results? Should Flink >> “imagine” that there was a record that it hasn’t seen? Should we apply this >> logic to other records that Flink “hasn’t seen”? >> >> Another potential issue. Previously I assumed that upsert deletes do not >> have to carry the data that they are removing, just delete flag and primary >> key. If that’s the case, if there is an aggregation after the join, how >> could we even handle empty delete? We wouldn’t be able to produce valid >> retract message for the follow up aggregation. >> >> >> case 3: aggregate >> >>> insert into xxx >>> select sum(item_v) from item >> >> What's the result of sum, 4 or 5? >> If the answer is 4, how can the downstream group_by tell which delete is >> an empty delete or a retract delete? PS: the changelog from item table to >> group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably >> can add a message type to solve the problem, but it definitely will bring >> large changes and I don't think it is time to do it. >> If the answer is 3, the result is definitely wrong. >> >> >> If item was an upsert stream with item_id as a primary key, the answer >> can only be 4. However I do not see any problem here, since aggregations >> require updates as retractions anyway, thus we can handle empty deletes in >> UpsertToRetractionsOperator anyway we wish - filtering them out here or >> marking them as “empty deletes” for forwarding. Marking as “empty deletes” >> would have to be handled by aggregation in special way - forward it, but do >> not update the state. >> >> >> case 4: Filter >> >>> insert into xxx >>> select item_id, item_v from item where item_v < 2 and item_id = 1 >> >> >> I see also different problems here. What if again, as I mentioned >> previously, delete message has no content? What if delete message has >> incorrect content, that doesn’t match to the latest update? Shouldn’t >> filter always pass delete message without looking into it’s content? >> >> For item of item_id=1, if Filter also generates empty deletes, these two >> kinds of deletes can not be distinguished by the sink. The problem is: >> should we filter empty deletes in sink? >> If yes, it is semantically conflicting with case 1. >> If no, deletes generated by Filter can not be filtered, this can cause >> devastating pressure on the physical database. >> >> >> Again, I fail to see semantics problem here. The data was not there in >> the first place, so whether we forward delete or not shouldn’t matter from >> semantics perspective: end result is still the same. If it does matter, >> user should let Flink ingest the stream from some valid/consistent point, >> not in the middle of it. For me, the only valid concern regarding empty >> deletes is performance perspective. >> >> Furthermore, I don't think we can use best effort. Considering the >> anti-spamming scenario, users probably just want to get top 1% data from >> the result, the rest 99% of the data will all become delete messages after >> the Filter. This would be a disaster for a storage. Especially for the >> case, most of the coming keys are new ones and can not be swallowed by a >> cache with best effort. We can not release a version of flink that take a >> chance to bring a disaster to our user, even the change is small. >> >> >> What you are saying is highly use case specific. If you enforce full >> empty deletions removal in this case, you will end up with terrible >> performance in other cases/setups. I think configurable best effort cache >> is still the way to go for the long run. It could be configured with number >> of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect >> cache. However I’m not even sure how high priority it should get. >> >> Thus, I don't think we should allow empty deletes output from source. >> >> >> If we decide to go this way, why even bother with upsert streams and >> supporting them inside our pipeline? Why not convert everything to >> retractions in sources and work always on retractions? It would simplify >> our code and we wouldn’t need the logic from `DataStreamRetractionRules`. >> >> To sum up, I really fail to see any semantic problems here. Regardless if >> we forward/filter out empty deletes the end result is always the same: >> there was no record there in the first place. In set theory following >> operations give the same outcome: >> >> {1, 2, 3} \ {4} >> {1, 2, 3} \ {} >> {1, 2, 3} \ {42} >> >> If there was a record there because user started Flink in a middle of a >> stream, the result is still undefined (the join case that you mentioned), >> since Flink could skip or ingest extra any number of messages (deletes or >> not). >> >> Piotrek >> >> |
Hi Fabian
Thanks for your update. The opinions on upsert streams are highly enlightening. I think now I am agree with you that we can choose option 2 to solve the problem: Throw away empty deletes when source generate retractions, otherwise pass empty deletes down. As for the UpsertSink, I think we don't need to filter empty deletes in it, unless the external system should not receive empty deletes. It would be good to provide an optional parameter in the StreamQueryConfig to indicate whether filter empty deletes in upsert sinks(i.e., this is a job configuration). In this way, we can also solve the Filter problems(FLINK-9528). I will create another subtask about UpsertSink later. Thanks again for all the suggestions. It really helps me a lot. Best, Hequn. On Tue, Aug 28, 2018 at 9:47 PM Fabian Hueske <[hidden email]> wrote: > Hi Hequn, hi Piotr, > > Thanks for pushing this discussion forward and sorry for not responding > earlier. > > After reading the thread, I agree that we do not *need* to (but may) > forward empty deletes. > As I pointed out before, I see empty deletes not as a semantic problem > that needs to be exactly solved but rather as a performance problem that > can be optimized (trade-off state costs vs. handling empty delete). > > Piotr raised a good point. An upsert delete message may consist only of > the key fields and the delete flag. > Having no data but the keys means we *cannot* handle them as regular > records during a join, filter, projection, or aggregation. > > Upsert streams are only present, if the stream is received by an operator > that is able to correctly interpret the upsert messages. > Right now, there is only the UpsertSink operator that can handle upsert > streams. Join and Aggregation might to support upsert inputs in the future > as well. > There are the following cases: > > 1. If there is no operator, that can handle an upsert stream, upserts need > to be converted into retractions. > Filtering out empty deletes while converting to retractions comes for free. > 2. If the receiving operator is a Join or Aggregation, it has all > necessary state to check whether the delete is empty or not. > In case of an empty delete, it is simply dropped. > > In both cases (retract stream conversion and stateful upsert operator) we > can filter empty deletes for free. > > The only case left are UpsertSinks. These do not have built-in state, > since it is maintained in an external system. > As I said before, empty deletes are not a semantic problem. We could > forward all empty deletes and the result would still be consistent. > However, I understand that empty deletes can cause severe a performance > issues. > We can address the performance issue with different measures such as > best-effort (approximate) filtering or exact state-backed filtering. > > I think in many cases we can handle empty deletes from upsert sources > without adding additional state. > As soon as the upsert messages are converted into retraction messages or > consumed by a join or aggregation, they can be filtered for free. > We only need to add state, if we have an upsert sink AND if that sink > wants to remove all empty deletes. > > There is one more thing that needs to be discussed. How upsert messages > are handled by Calc operators. > A Calc (projection and/or filter) that receives (and produces) an upsert > stream (because it is in front of a Join, Aggregation, UpsertSink) should > handle messages as follows: > - upsert message/flag=true: upsert messages are handled as regular > message. If the predicate evaluates to false, all but the key fields are > set to null and the message is forwarded as a delete message > - delete message/flag=false: delete messages are converted to the output > schema (padded with nulls) and forwarded. > > What do you think, > Fabian > > > > > > > Am Fr., 24. Aug. 2018 um 07:33 Uhr schrieb Hequn Cheng < > [hidden email]>: > >> Hi Piotrek, >> >> Great to see your replies, and really thanks for all your suggestions. >> Inline is a good way, i will do it same as you :-) >> >> *> I’m assuming that we are talking about event time and that `(delete 1, >> a, 1)` happened before `(add 1, a, 2)`, right?* >> >> We are talking about processing time(FLINK-8577 >> <https://issues.apache.org/jira/browse/FLINK-8577>). Event time is the >> next topic(FLINK-8578 <https://issues.apache.org/jira/browse/FLINK-8578>). >> And `(delete 1, a, 1)` is an empty delete message comes before `(add 1, a, >> 2)`. However, (add 2, a, 1) & (add 2, a, 2) may come before or after (delete >> 1, a, 1) &(add 1, a, 2). >> >> *> Maybe we should say, that’s unspecified behaviour and Flink can either >> forward empty deletes or filter them out depending on what’s more >> efficient/easier. In the end this whole topic is very fishy. Look at it >> from the Flink’s perspective. User is trying to remove the data, that was >> never there in the first place. If it wasn’t there, why should we >> retract/produce deletes for future records/join results? Should Flink >> “imagine” that there was a record that it hasn’t seen? Should we apply this >> logic to other records that Flink “hasn’t seen”?* >> >> I don't think unspecified behavior is a good way. Yes, from the Flink's >> perspective, I think we should not retract/produce for future records/join >> results if it wasn't there, and this behavior should be consistent among >> all operators including upsert source. We should not imagine that there was >> a record that it hasn't seen, thus throw it away from the source. >> >> *> Another potential issue. Previously I assumed that upsert deletes do >> not have to carry the data that they are removing, just delete flag and >> primary key. If that’s the case, if there is an aggregation after the join, >> how could we even handle empty delete? We wouldn’t be able to produce valid >> retract message for the follow up aggregation. * >> >> Yes, It is true that we can't handle empty deletes in flink. As I said, >> downstream operators even don't know how to deal with it. >> >> *> If item was an upsert stream with item_id as a primary key, the answer >> can only be 4. However I do not see any problem here, since aggregations >> require updates as retractions anyway, thus we can handle empty deletes in >> UpsertToRetractionsOperator anyway we wish - filtering them out here or >> marking them as “empty deletes” for forwarding. Marking as “empty deletes” >> would have to be handled by aggregation in special way - forward it, but do >> not update the state.* >> >> Agree, the answer should only be 4. As you are in favor of option 3, I >> think you are in favor of forwarding empty deletes in >> UpsertToRetractionsOperator since the operator is part of upsert >> source.(UpsertToRetractionsOperator is one way to implement upsert source, >> there are other ways.) >> And yes we can add a message type to distinguish the empty deletes from >> retractions. What I concern is we have to adapt all operators to the >> message type. What's more these empty deletes are useless for downstream >> operators(even for sink), because of the changes of key dimension. >> >> *> I see also different problems here. What if again, as I mentioned >> previously, delete message has no content? What if delete message has >> incorrect content, that doesn’t match to the latest update? Shouldn’t >> filter always pass delete message without looking into it’s content?* >> >> No, Filter should not always pass delete messages without looking into >> it's content. For sql like: >> >>> insert into xxx >>> select sum(item_v) from item where item_v < 2 and item_id = 1 >> >> what's the result of sum? -1? -2? PS, the changelog is: >> >>> add 1 (item_v=1, item_id = 2) be filtered >>> retract delete 1 (item_v=1, item_id = 2) no filtered if always pass >>> delete >>> add 2 (item_v=2, item_id = 2) be filtered >>> empty delete 1 (item_v=1, item_id = 1) no filtered if always pass >>> delete >>> add 2 (item_v=2, item_id = 1) be filtered >> >> >> *> Again, I fail to see semantics problem here. The data was not there in >> the first place, so whether we forward delete or not shouldn’t matter from >> semantics perspective: end result is still the same. If it does matter, >> user should let Flink ingest the stream from some valid/consistent point, >> not in the middle of it. For me, the only valid concern regarding empty >> deletes is performance perspective.* >> >> The semantics problem here is: case 1 will pass empty deletes, but case 4 >> won't, just because of adding a filter. Whether we forward delete or not >> the result is not the same. Users can perceive this, since data in his >> physical storage is different. >> >> *> What you are saying is highly use case specific. If you enforce full >> empty deletions removal in this case, you will end up with terrible >> performance in other cases/setups. I think configurable best effort cache >> is still the way to go for the long run. It could be configured with number >> of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect >> cache. However I’m not even sure how high priority it should get.* >> >> I think this is a normal use case for big data analysis, with diversity >> keys but focus on small part of it. To take a step back, we should not let >> users take this risk. I agree keeping a state in source will influence the >> performance, but I don't think it is terrible. We can even only store key >> in state when source doesn't generate retraction. The performance of the >> source is much better than the aggregations. Furthermore, there are other >> ways to improve the performance, for example adding a bloom filter. >> >> *> If we decide to go this way, why even bother with upsert streams and >> supporting them inside our pipeline? Why not convert everything to >> retractions in sources and work always on retractions? It would simplify >> our code and we wouldn’t need the logic from `DataStreamRetractionRules`.* >> >> I think these are two different things. One is how to handle empty >> deletes, the other is how to handle updates. >> >> *> If there was a record there because user started Flink in a middle of >> a stream, the result is still undefined (the join case that you mentioned), >> since Flink could skip or ingest extra any number of messages (deletes or >> not).* >> >> I think the result is clear if we clearly define that the upsert source >> ignore empty deletes. Flink only skip or ingest messages according to the >> sql/table-api provided by users. >> >> Thanks, Hequn >> >> >> On Thu, Aug 23, 2018 at 6:04 PM Piotr Nowojski <[hidden email]> >> wrote: >> >>> Hi, >>> >>> Thanks for very detailed explanation :) Please check my inlined >>> responses. >>> >>> On 22 Aug 2018, at 14:28, Hequn Cheng <[hidden email]> wrote: >>> >>> Hi all, >>> Thanks a lot for your replies, @Piotr Nowojski <[hidden email]> >>> @Fabian Hueske <[hidden email]> @Xingcan Cui <[hidden email]> >>> >>> Option 3 will send empty deletes to downstream operators, but downstream >>> operators seems don't know how to deal with it. >>> Assume that upsert source forward deletes by default. Let's see what >>> problems we have. >>> >>> *Tables* >>> Suppose we have two upsert tables with key. One is category(key: >>> cate_id), the other is item(key: item_id). >>> -----category----- >>> cate_id, cate_v >>> (delete a, 1) >>> >>> -----item----- >>> item_id, cate_id, item_v >>> (add 2, a, 1) >>> (add 2, a, 2) >>> (delete 1, a, 1) >>> (add 1, a, 2) >>> >>> >>> I’m assuming that we are talking about event time and that `(delete 1, >>> a, 1)` happened before `(add 1, a, 2)`, right? >>> >>> *Cases* >>> case 2: join >>> >>>> insert into xxx >>>> select * from item join category on item.cate_id = category.cate_id; >>> >>> Same to case1, would user want to delete all items of category 'a'? For >>> record (delete a, 1) from category, should it be filtered by join or >>> be joined with all items from item table? >>> If be filtered, it is semantically conflict with case 1. >>> If not be filtered, it means we have to storage delete messages in state >>> of join since data from item table may come later to join. Also, how to >>> deal with these deletes in state is a problem. I think the logic of join >>> will be very complicated. >>> >>> >>> Maybe we should say, that’s unspecified behaviour and Flink can either >>> forward empty deletes or filter them out depending on what’s more >>> efficient/easier. In the end this whole topic is very fishy. Look at it >>> from the Flink’s perspective. User is trying to remove the data, that was >>> never there in the first place. If it wasn’t there, why should we >>> retract/produce deletes for future records/join results? Should Flink >>> “imagine” that there was a record that it hasn’t seen? Should we apply this >>> logic to other records that Flink “hasn’t seen”? >>> >>> Another potential issue. Previously I assumed that upsert deletes do not >>> have to carry the data that they are removing, just delete flag and primary >>> key. If that’s the case, if there is an aggregation after the join, how >>> could we even handle empty delete? We wouldn’t be able to produce valid >>> retract message for the follow up aggregation. >>> >>> >>> case 3: aggregate >>> >>>> insert into xxx >>>> select sum(item_v) from item >>> >>> What's the result of sum, 4 or 5? >>> If the answer is 4, how can the downstream group_by tell which delete is >>> an empty delete or a retract delete? PS: the changelog from item table to >>> group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We probably >>> can add a message type to solve the problem, but it definitely will bring >>> large changes and I don't think it is time to do it. >>> If the answer is 3, the result is definitely wrong. >>> >>> >>> If item was an upsert stream with item_id as a primary key, the answer >>> can only be 4. However I do not see any problem here, since aggregations >>> require updates as retractions anyway, thus we can handle empty deletes in >>> UpsertToRetractionsOperator anyway we wish - filtering them out here or >>> marking them as “empty deletes” for forwarding. Marking as “empty deletes” >>> would have to be handled by aggregation in special way - forward it, but do >>> not update the state. >>> >>> >>> case 4: Filter >>> >>>> insert into xxx >>>> select item_id, item_v from item where item_v < 2 and item_id = 1 >>> >>> >>> I see also different problems here. What if again, as I mentioned >>> previously, delete message has no content? What if delete message has >>> incorrect content, that doesn’t match to the latest update? Shouldn’t >>> filter always pass delete message without looking into it’s content? >>> >>> For item of item_id=1, if Filter also generates empty deletes, these two >>> kinds of deletes can not be distinguished by the sink. The problem is: >>> should we filter empty deletes in sink? >>> If yes, it is semantically conflicting with case 1. >>> If no, deletes generated by Filter can not be filtered, this can cause >>> devastating pressure on the physical database. >>> >>> >>> Again, I fail to see semantics problem here. The data was not there in >>> the first place, so whether we forward delete or not shouldn’t matter from >>> semantics perspective: end result is still the same. If it does matter, >>> user should let Flink ingest the stream from some valid/consistent point, >>> not in the middle of it. For me, the only valid concern regarding empty >>> deletes is performance perspective. >>> >>> Furthermore, I don't think we can use best effort. Considering the >>> anti-spamming scenario, users probably just want to get top 1% data from >>> the result, the rest 99% of the data will all become delete messages after >>> the Filter. This would be a disaster for a storage. Especially for the >>> case, most of the coming keys are new ones and can not be swallowed by >>> a cache with best effort. We can not release a version of flink that take a >>> chance to bring a disaster to our user, even the change is small. >>> >>> >>> What you are saying is highly use case specific. If you enforce full >>> empty deletions removal in this case, you will end up with terrible >>> performance in other cases/setups. I think configurable best effort cache >>> is still the way to go for the long run. It could be configured with number >>> of entries/bytes stored, where 0 means no-op, +INFINITY means full/perfect >>> cache. However I’m not even sure how high priority it should get. >>> >>> Thus, I don't think we should allow empty deletes output from source. >>> >>> >>> If we decide to go this way, why even bother with upsert streams and >>> supporting them inside our pipeline? Why not convert everything to >>> retractions in sources and work always on retractions? It would simplify >>> our code and we wouldn’t need the logic from `DataStreamRetractionRules`. >>> >>> To sum up, I really fail to see any semantic problems here. Regardless >>> if we forward/filter out empty deletes the end result is always the same: >>> there was no record there in the first place. In set theory following >>> operations give the same outcome: >>> >>> {1, 2, 3} \ {4} >>> {1, 2, 3} \ {} >>> {1, 2, 3} \ {42} >>> >>> If there was a record there because user started Flink in a middle of a >>> stream, the result is still undefined (the join case that you mentioned), >>> since Flink could skip or ingest extra any number of messages (deletes or >>> not). >>> >>> Piotrek >>> >>> |
Hi Hequn,
That's great! Yes, let's go with option 2 (from the source's point of view) and later extend Join and Aggregation to discard empty deletes. I agree that the filtering at the sink should be optional and configurable via the query configuration. Again, thanks for starting this discussion. I think it helped us all to get a better understanding of how upserts work. Best, Fabian Am Mi., 29. Aug. 2018 um 17:29 Uhr schrieb Hequn Cheng <[hidden email] >: > Hi Fabian > > Thanks for your update. The opinions on upsert streams are highly > enlightening. I think now I am agree with you that we can choose option 2 > to solve the problem: Throw away empty deletes when source generate > retractions, otherwise pass empty deletes down. > > As for the UpsertSink, I think we don't need to filter empty deletes in it, > unless the external system should not receive empty deletes. It would be > good to provide an optional parameter in the StreamQueryConfig to indicate > whether filter empty deletes in upsert sinks(i.e., this is a job > configuration). In this way, we can also solve the Filter > problems(FLINK-9528). I will create another subtask about UpsertSink later. > > Thanks again for all the suggestions. It really helps me a lot. > Best, Hequn. > > > On Tue, Aug 28, 2018 at 9:47 PM Fabian Hueske <[hidden email]> wrote: > > > Hi Hequn, hi Piotr, > > > > Thanks for pushing this discussion forward and sorry for not responding > > earlier. > > > > After reading the thread, I agree that we do not *need* to (but may) > > forward empty deletes. > > As I pointed out before, I see empty deletes not as a semantic problem > > that needs to be exactly solved but rather as a performance problem that > > can be optimized (trade-off state costs vs. handling empty delete). > > > > Piotr raised a good point. An upsert delete message may consist only of > > the key fields and the delete flag. > > Having no data but the keys means we *cannot* handle them as regular > > records during a join, filter, projection, or aggregation. > > > > Upsert streams are only present, if the stream is received by an operator > > that is able to correctly interpret the upsert messages. > > Right now, there is only the UpsertSink operator that can handle upsert > > streams. Join and Aggregation might to support upsert inputs in the > future > > as well. > > There are the following cases: > > > > 1. If there is no operator, that can handle an upsert stream, upserts > need > > to be converted into retractions. > > Filtering out empty deletes while converting to retractions comes for > free. > > 2. If the receiving operator is a Join or Aggregation, it has all > > necessary state to check whether the delete is empty or not. > > In case of an empty delete, it is simply dropped. > > > > In both cases (retract stream conversion and stateful upsert operator) we > > can filter empty deletes for free. > > > > The only case left are UpsertSinks. These do not have built-in state, > > since it is maintained in an external system. > > As I said before, empty deletes are not a semantic problem. We could > > forward all empty deletes and the result would still be consistent. > > However, I understand that empty deletes can cause severe a performance > > issues. > > We can address the performance issue with different measures such as > > best-effort (approximate) filtering or exact state-backed filtering. > > > > I think in many cases we can handle empty deletes from upsert sources > > without adding additional state. > > As soon as the upsert messages are converted into retraction messages or > > consumed by a join or aggregation, they can be filtered for free. > > We only need to add state, if we have an upsert sink AND if that sink > > wants to remove all empty deletes. > > > > There is one more thing that needs to be discussed. How upsert messages > > are handled by Calc operators. > > A Calc (projection and/or filter) that receives (and produces) an upsert > > stream (because it is in front of a Join, Aggregation, UpsertSink) should > > handle messages as follows: > > - upsert message/flag=true: upsert messages are handled as regular > > message. If the predicate evaluates to false, all but the key fields are > > set to null and the message is forwarded as a delete message > > - delete message/flag=false: delete messages are converted to the output > > schema (padded with nulls) and forwarded. > > > > What do you think, > > Fabian > > > > > > > > > > > > > > Am Fr., 24. Aug. 2018 um 07:33 Uhr schrieb Hequn Cheng < > > [hidden email]>: > > > >> Hi Piotrek, > >> > >> Great to see your replies, and really thanks for all your suggestions. > >> Inline is a good way, i will do it same as you :-) > >> > >> *> I’m assuming that we are talking about event time and that `(delete > 1, > >> a, 1)` happened before `(add 1, a, 2)`, right?* > >> > >> We are talking about processing time(FLINK-8577 > >> <https://issues.apache.org/jira/browse/FLINK-8577>). Event time is the > >> next topic(FLINK-8578 <https://issues.apache.org/jira/browse/FLINK-8578 > >). > >> And `(delete 1, a, 1)` is an empty delete message comes before `(add 1, > a, > >> 2)`. However, (add 2, a, 1) & (add 2, a, 2) may come before or > after (delete > >> 1, a, 1) &(add 1, a, 2). > >> > >> *> Maybe we should say, that’s unspecified behaviour and Flink can > either > >> forward empty deletes or filter them out depending on what’s more > >> efficient/easier. In the end this whole topic is very fishy. Look at it > >> from the Flink’s perspective. User is trying to remove the data, that > was > >> never there in the first place. If it wasn’t there, why should we > >> retract/produce deletes for future records/join results? Should Flink > >> “imagine” that there was a record that it hasn’t seen? Should we apply > this > >> logic to other records that Flink “hasn’t seen”?* > >> > >> I don't think unspecified behavior is a good way. Yes, from the Flink's > >> perspective, I think we should not retract/produce for future > records/join > >> results if it wasn't there, and this behavior should be consistent among > >> all operators including upsert source. We should not imagine that there > was > >> a record that it hasn't seen, thus throw it away from the source. > >> > >> *> Another potential issue. Previously I assumed that upsert deletes do > >> not have to carry the data that they are removing, just delete flag and > >> primary key. If that’s the case, if there is an aggregation after the > join, > >> how could we even handle empty delete? We wouldn’t be able to produce > valid > >> retract message for the follow up aggregation. * > >> > >> Yes, It is true that we can't handle empty deletes in flink. As I said, > >> downstream operators even don't know how to deal with it. > >> > >> *> If item was an upsert stream with item_id as a primary key, the > answer > >> can only be 4. However I do not see any problem here, since aggregations > >> require updates as retractions anyway, thus we can handle empty deletes > in > >> UpsertToRetractionsOperator anyway we wish - filtering them out here or > >> marking them as “empty deletes” for forwarding. Marking as “empty > deletes” > >> would have to be handled by aggregation in special way - forward it, > but do > >> not update the state.* > >> > >> Agree, the answer should only be 4. As you are in favor of option 3, I > >> think you are in favor of forwarding empty deletes in > >> UpsertToRetractionsOperator since the operator is part of upsert > >> source.(UpsertToRetractionsOperator is one way to implement upsert > source, > >> there are other ways.) > >> And yes we can add a message type to distinguish the empty deletes from > >> retractions. What I concern is we have to adapt all operators to the > >> message type. What's more these empty deletes are useless for downstream > >> operators(even for sink), because of the changes of key dimension. > >> > >> *> I see also different problems here. What if again, as I mentioned > >> previously, delete message has no content? What if delete message has > >> incorrect content, that doesn’t match to the latest update? Shouldn’t > >> filter always pass delete message without looking into it’s content?* > >> > >> No, Filter should not always pass delete messages without looking into > >> it's content. For sql like: > >> > >>> insert into xxx > >>> select sum(item_v) from item where item_v < 2 and item_id = 1 > >> > >> what's the result of sum? -1? -2? PS, the changelog is: > >> > >>> add 1 (item_v=1, item_id = 2) be filtered > >>> retract delete 1 (item_v=1, item_id = 2) no filtered if always > pass > >>> delete > >>> add 2 (item_v=2, item_id = 2) be filtered > >>> empty delete 1 (item_v=1, item_id = 1) no filtered if always pass > >>> delete > >>> add 2 (item_v=2, item_id = 1) be filtered > >> > >> > >> *> Again, I fail to see semantics problem here. The data was not there > in > >> the first place, so whether we forward delete or not shouldn’t matter > from > >> semantics perspective: end result is still the same. If it does matter, > >> user should let Flink ingest the stream from some valid/consistent > point, > >> not in the middle of it. For me, the only valid concern regarding empty > >> deletes is performance perspective.* > >> > >> The semantics problem here is: case 1 will pass empty deletes, but case > 4 > >> won't, just because of adding a filter. Whether we forward delete or not > >> the result is not the same. Users can perceive this, since data in his > >> physical storage is different. > >> > >> *> What you are saying is highly use case specific. If you enforce full > >> empty deletions removal in this case, you will end up with terrible > >> performance in other cases/setups. I think configurable best effort > cache > >> is still the way to go for the long run. It could be configured with > number > >> of entries/bytes stored, where 0 means no-op, +INFINITY means > full/perfect > >> cache. However I’m not even sure how high priority it should get.* > >> > >> I think this is a normal use case for big data analysis, with diversity > >> keys but focus on small part of it. To take a step back, we should not > let > >> users take this risk. I agree keeping a state in source will influence > the > >> performance, but I don't think it is terrible. We can even only store > key > >> in state when source doesn't generate retraction. The performance of the > >> source is much better than the aggregations. Furthermore, there are > other > >> ways to improve the performance, for example adding a bloom filter. > >> > >> *> If we decide to go this way, why even bother with upsert streams and > >> supporting them inside our pipeline? Why not convert everything to > >> retractions in sources and work always on retractions? It would simplify > >> our code and we wouldn’t need the logic from > `DataStreamRetractionRules`.* > >> > >> I think these are two different things. One is how to handle empty > >> deletes, the other is how to handle updates. > >> > >> *> If there was a record there because user started Flink in a middle of > >> a stream, the result is still undefined (the join case that you > mentioned), > >> since Flink could skip or ingest extra any number of messages (deletes > or > >> not).* > >> > >> I think the result is clear if we clearly define that the upsert source > >> ignore empty deletes. Flink only skip or ingest messages according to > the > >> sql/table-api provided by users. > >> > >> Thanks, Hequn > >> > >> > >> On Thu, Aug 23, 2018 at 6:04 PM Piotr Nowojski <[hidden email] > > > >> wrote: > >> > >>> Hi, > >>> > >>> Thanks for very detailed explanation :) Please check my inlined > >>> responses. > >>> > >>> On 22 Aug 2018, at 14:28, Hequn Cheng <[hidden email]> wrote: > >>> > >>> Hi all, > >>> Thanks a lot for your replies, @Piotr Nowojski < > [hidden email]> > >>> @Fabian Hueske <[hidden email]> @Xingcan Cui <[hidden email]> > >>> > >>> Option 3 will send empty deletes to downstream operators, but > downstream > >>> operators seems don't know how to deal with it. > >>> Assume that upsert source forward deletes by default. Let's see what > >>> problems we have. > >>> > >>> *Tables* > >>> Suppose we have two upsert tables with key. One is category(key: > >>> cate_id), the other is item(key: item_id). > >>> -----category----- > >>> cate_id, cate_v > >>> (delete a, 1) > >>> > >>> -----item----- > >>> item_id, cate_id, item_v > >>> (add 2, a, 1) > >>> (add 2, a, 2) > >>> (delete 1, a, 1) > >>> (add 1, a, 2) > >>> > >>> > >>> I’m assuming that we are talking about event time and that `(delete 1, > >>> a, 1)` happened before `(add 1, a, 2)`, right? > >>> > >>> *Cases* > >>> case 2: join > >>> > >>>> insert into xxx > >>>> select * from item join category on item.cate_id = category.cate_id; > >>> > >>> Same to case1, would user want to delete all items of category 'a'? For > >>> record (delete a, 1) from category, should it be filtered by join or > >>> be joined with all items from item table? > >>> If be filtered, it is semantically conflict with case 1. > >>> If not be filtered, it means we have to storage delete messages in > state > >>> of join since data from item table may come later to join. Also, how to > >>> deal with these deletes in state is a problem. I think the logic of > join > >>> will be very complicated. > >>> > >>> > >>> Maybe we should say, that’s unspecified behaviour and Flink can either > >>> forward empty deletes or filter them out depending on what’s more > >>> efficient/easier. In the end this whole topic is very fishy. Look at it > >>> from the Flink’s perspective. User is trying to remove the data, that > was > >>> never there in the first place. If it wasn’t there, why should we > >>> retract/produce deletes for future records/join results? Should Flink > >>> “imagine” that there was a record that it hasn’t seen? Should we apply > this > >>> logic to other records that Flink “hasn’t seen”? > >>> > >>> Another potential issue. Previously I assumed that upsert deletes do > not > >>> have to carry the data that they are removing, just delete flag and > primary > >>> key. If that’s the case, if there is an aggregation after the join, how > >>> could we even handle empty delete? We wouldn’t be able to produce valid > >>> retract message for the follow up aggregation. > >>> > >>> > >>> case 3: aggregate > >>> > >>>> insert into xxx > >>>> select sum(item_v) from item > >>> > >>> What's the result of sum, 4 or 5? > >>> If the answer is 4, how can the downstream group_by tell which delete > is > >>> an empty delete or a retract delete? PS: the changelog from item table > to > >>> group_by is: add(1) retract(1) add(2), empty delete(1), add(2). We > probably > >>> can add a message type to solve the problem, but it definitely will > bring > >>> large changes and I don't think it is time to do it. > >>> If the answer is 3, the result is definitely wrong. > >>> > >>> > >>> If item was an upsert stream with item_id as a primary key, the answer > >>> can only be 4. However I do not see any problem here, since > aggregations > >>> require updates as retractions anyway, thus we can handle empty > deletes in > >>> UpsertToRetractionsOperator anyway we wish - filtering them out here or > >>> marking them as “empty deletes” for forwarding. Marking as “empty > deletes” > >>> would have to be handled by aggregation in special way - forward it, > but do > >>> not update the state. > >>> > >>> > >>> case 4: Filter > >>> > >>>> insert into xxx > >>>> select item_id, item_v from item where item_v < 2 and item_id = 1 > >>> > >>> > >>> I see also different problems here. What if again, as I mentioned > >>> previously, delete message has no content? What if delete message has > >>> incorrect content, that doesn’t match to the latest update? Shouldn’t > >>> filter always pass delete message without looking into it’s content? > >>> > >>> For item of item_id=1, if Filter also generates empty deletes, these > two > >>> kinds of deletes can not be distinguished by the sink. The problem is: > >>> should we filter empty deletes in sink? > >>> If yes, it is semantically conflicting with case 1. > >>> If no, deletes generated by Filter can not be filtered, this can cause > >>> devastating pressure on the physical database. > >>> > >>> > >>> Again, I fail to see semantics problem here. The data was not there in > >>> the first place, so whether we forward delete or not shouldn’t matter > from > >>> semantics perspective: end result is still the same. If it does matter, > >>> user should let Flink ingest the stream from some valid/consistent > point, > >>> not in the middle of it. For me, the only valid concern regarding empty > >>> deletes is performance perspective. > >>> > >>> Furthermore, I don't think we can use best effort. Considering the > >>> anti-spamming scenario, users probably just want to get top 1% data > from > >>> the result, the rest 99% of the data will all become delete messages > after > >>> the Filter. This would be a disaster for a storage. Especially for the > >>> case, most of the coming keys are new ones and can not be swallowed by > >>> a cache with best effort. We can not release a version of flink that > take a > >>> chance to bring a disaster to our user, even the change is small. > >>> > >>> > >>> What you are saying is highly use case specific. If you enforce full > >>> empty deletions removal in this case, you will end up with terrible > >>> performance in other cases/setups. I think configurable best effort > cache > >>> is still the way to go for the long run. It could be configured with > number > >>> of entries/bytes stored, where 0 means no-op, +INFINITY means > full/perfect > >>> cache. However I’m not even sure how high priority it should get. > >>> > >>> Thus, I don't think we should allow empty deletes output from source. > >>> > >>> > >>> If we decide to go this way, why even bother with upsert streams and > >>> supporting them inside our pipeline? Why not convert everything to > >>> retractions in sources and work always on retractions? It would > simplify > >>> our code and we wouldn’t need the logic from > `DataStreamRetractionRules`. > >>> > >>> To sum up, I really fail to see any semantic problems here. Regardless > >>> if we forward/filter out empty deletes the end result is always the > same: > >>> there was no record there in the first place. In set theory following > >>> operations give the same outcome: > >>> > >>> {1, 2, 3} \ {4} > >>> {1, 2, 3} \ {} > >>> {1, 2, 3} \ {42} > >>> > >>> If there was a record there because user started Flink in a middle of a > >>> stream, the result is still undefined (the join case that you > mentioned), > >>> since Flink could skip or ingest extra any number of messages (deletes > or > >>> not). > >>> > >>> Piotrek > >>> > >>> > |
Free forum by Nabble | Edit this page |