* I mistyped the rejected_query, it should be
CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM *currency_rates CREATE VIEW AS rejected_query SELECT ...FROM transactions AS t JOIN post_agg_stream FOR SYSTEM_TIME AS OF t.transactionTime AS r ON r.currency = t.currency On Mon, Jul 6, 2020 at 11:29 AM Seth Wiesman <[hidden email]> wrote: > Hey Leonard, > > Agreed, this is a fun discussion! > > (1) For support changelog source backed CDC tools, a problem is that can >> we use the temporal table as a general source table which may followed by >> some aggregation operations, more accurate is wether the aggregation >> operator can use the DELETE record that we just updated the “correct” >> operation time to retract a record, maybe not. This will pull us back to >> the discussion of operation time VS event time, it’s a real cool but >> complicated topic see above discussion from mine and @Jark’s. >> > > I fully agree this is a complicated topic, however, I don't think its > actually a problem that needs to be solved for the first version of this > feature. My proposal is to disallow using upsert streams as temporal tables > if an aggregation operation has been applied. Going back to my notion that > temporal tables are a tool for performing streaming star schema > denormalization, the dimension table in a star schema is rarely aggregated > pre-join. In the case of a CDC stream of currency rates joined to > transactions, the CDC stream only needs to support filter pushdowns and > map-like transformations before being joined. I believe this is a > reasonable limitation we can impose that will unblock a large percentage of > use cases, and once we better understand the semantics of the correct > operation in a retraction the limitation can be removed in future versions > while remaining backward compatible. > > > > CREATE TABLE currency_rates ( > currencyId BIGINT PRIMARY KEY, > rate DECIMAL(10, 2)) WITH ( > 'connector' = 'kafka', > 'format' = 'debezium-json' > ) > *CREATE* TABLE transactions ( > currencyId BIGINT, > transactionTime TIMESTAMP(3)) WITH ( > > ) > > -- Uner my proposal this query would be supported because the currency_rates > > -- table is used in a temporal join without any aggregations having been applied > > CREATE VIEW AS working_query > SELECT > ...FROM > transactions AS t > JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r > ON r.currency = t.currencyId > > -- However, this query would be rejected by the planner until we determine the proper time semantics of a retacation > > CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM *currency_rates > > CREATE VIEW AS rejected_query > SELECT > ...FROM > transactions AS t > JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r > ON r.currency = t.currency > > > > (2) For upsert source that defines PRIMARY KEY and may contains multiple >> records under a PK, the main problem is the PK semantic,the multiple >> records under a PK broke the unique semantic on a table. We need to walk >> around this by (a) Adding another primary key keyword and explain the >> upsert semantic (b) Creating temporal table base on a view that is the >> deduplicated result of source table[2]. >> > > This feels like more of a bikeshedding question than a blocker and I look > forward to seeing what you come up with! > > Seth > > On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <[hidden email]> wrote: > >> Hi everyone, >> >> Thanks a lot for the great discussions so far. >> >> After reading through the long discussion, I still have one question. >> Currently the temporal table function supports both event time and proc >> time joining. >> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in >> DDL, >> does it mean we can only use temporal table function join with event time? >> If we can, how do we distinguish it with current temporal table (also >> known as dimension table)? >> >> Maybe I'm missing something here. Correct me if I'm wrong. >> >> Leonard Xu <[hidden email]> 于2020年7月6日周一 下午11:34写道: >> >>> Hi, Seth >>> >>> Thanks for your explanation of user cases, and you’re wright the look up >>> join/table is one kind of temporal table join/table which tracks latest >>> snapshot of external DB-like tables, it's why we proposed use same >>> temporal join syntax. >>> >>> In fact, I have invested and checked Debezuim format and Canal format >>> more these days, and we can obtain the extract DML operation time from >>> their meta information which comes from DB bin-log. Although extracting >>> meta information from record is a part of FLIP-107 scope[1], at least we >>> have a way to extract the correct operation time. Event we can obtain the >>> expected operation time, there’re some problems. >>> >>> (1) For support changelog source backed CDC tools, a problem is that can >>> we use the temporal table as a general source table which may followed by >>> some aggregation operations, more accurate is wether the aggregation >>> operator can use the DELETE record that we just updated the “correct” >>> operation time to retract a record, maybe not. This will pull us back to >>> the discussion of operation time VS event time, it’s a real cool but >>> complicated topic see above discussion from mine and @Jark’s. >>> >>> (2) For upsert source that defines PRIMARY KEY and may contains multiple >>> records under a PK, the main problem is the PK semantic,the multiple >>> records under a PK broke the unique semantic on a table. We need to walk >>> around this by (a) Adding another primary key keyword and explain the >>> upsert semantic (b) Creating temporal table base on a view that is the >>> deduplicated result of source table[2]. >>> >>> I’m working on (2), and if we want to support(1)i.e. support DELETE >>> entirely, that’s really a big challenge but I also think wright thing for >>> long term. >>> >>> If we decide to do (1), we need import operation time concept firstly, >>> we need change the codebase for deal the operation time header in many >>> places secondly, and finally explain and tell users how to understand and >>> use temporal table. >>> >>> I’m a little worried about it’s valuable enough, I proposed only support >>> (2) because it is a good replacement of current Temporal Table Function and >>> will not introduce more concept and works. >>> >>> Jark, Jingsong, Konstantin, WDYT? >>> >>> >>> Best, >>> Leonard Xu >>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition >>> < >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition >>> > >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>> < >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>> > >>> >>> >>> > 在 2020年7月6日,22:02,Seth Wiesman <[hidden email]> 写道: >>> > >>> > As an aside, I conceptually view temporal table joins to be >>> semantically equivalent to look up table joins. They are just two different >>> ways of consuming the same data. >>> > >>> > Seth >>> > >>> > On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <[hidden email] >>> <mailto:[hidden email]>> wrote: >>> > Hi Leonard, >>> > >>> > Regarding DELETE operations I tend to have the opposite reaction. I >>> spend a lot of time working with production Flink users across a large >>> number of organizations and to say we don't support temporal tables that >>> include DELETEs will be a blocker for adoption. Even organizations that >>> claim to never delete rows still occasionally due so per GDPR requests or >>> other regulations. >>> > >>> > I actually do think users will understand the limitations. Flink today >>> has a very clear value proposition around correctness, your results are as >>> correct as the input data provided. This does not change under support for >>> DELETE records. Flink is providing the most correct results possible based >>> on the resolution of the fields as generated by 3rd party systems. As >>> Debezium and other CDC libraries become more accurate, so will Flink. >>> > >>> > Seth >>> > >>> > On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <[hidden email] <mailto: >>> [hidden email]>> wrote: >>> > Hi, Konstantin >>> > >>> >> . Would we support a temporal join with a changelog stream with >>> >> event time semantics by ignoring DELETE messages or would it be >>> completed >>> >> unsupported. >>> > >>> > I don’t know the percentage of this feature in temporal scenarios. >>> > >>> > Comparing to support the approximate event time join by ignoring >>> DELETE message or by extracting an approximate event time for DELET >>> message, I’m not sure is this acceptable for user even if we have >>> explained the limitation of approximate event time join, I tend to do not >>> support this feature, because we can not ensure the semantic of event time >>> and it may lead an incorrect result for user in some scenarios. >>> > >>> > If the percentage is highly enough and most user cases can accept the >>> approximate event time, I'm ok to support it for usability although it >>> doesn’t implements the event time semantic strictly. >>> > >>> > Cheers, >>> > Leonard Xu >>> > >>> > >>> >>> >> >> -- >> >> Best, >> Benchao Li >> > |
Hi, all
I open a new discussion of FLIP-132[1] which based on our consensus on current thread. Let me keep communication in the new thread, please let me know if you have any concerns. Best Leonard [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html> > 在 2020年7月7日,00:31,Seth Wiesman <[hidden email]> 写道: > > * I mistyped the rejected_query, it should be > > CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* > FROM *currency_rates > > CREATE VIEW AS rejected_query > SELECT > ...FROM > transactions AS t > JOIN post_agg_stream FOR SYSTEM_TIME AS OF t.transactionTime AS r > ON r.currency = t.currency > > > On Mon, Jul 6, 2020 at 11:29 AM Seth Wiesman <[hidden email]> wrote: > >> Hey Leonard, >> >> Agreed, this is a fun discussion! >> >> (1) For support changelog source backed CDC tools, a problem is that can >>> we use the temporal table as a general source table which may followed by >>> some aggregation operations, more accurate is wether the aggregation >>> operator can use the DELETE record that we just updated the “correct” >>> operation time to retract a record, maybe not. This will pull us back to >>> the discussion of operation time VS event time, it’s a real cool but >>> complicated topic see above discussion from mine and @Jark’s. >>> >> >> I fully agree this is a complicated topic, however, I don't think its >> actually a problem that needs to be solved for the first version of this >> feature. My proposal is to disallow using upsert streams as temporal tables >> if an aggregation operation has been applied. Going back to my notion that >> temporal tables are a tool for performing streaming star schema >> denormalization, the dimension table in a star schema is rarely aggregated >> pre-join. In the case of a CDC stream of currency rates joined to >> transactions, the CDC stream only needs to support filter pushdowns and >> map-like transformations before being joined. I believe this is a >> reasonable limitation we can impose that will unblock a large percentage of >> use cases, and once we better understand the semantics of the correct >> operation in a retraction the limitation can be removed in future versions >> while remaining backward compatible. >> >> >> >> CREATE TABLE currency_rates ( >> currencyId BIGINT PRIMARY KEY, >> rate DECIMAL(10, 2)) WITH ( >> 'connector' = 'kafka', >> 'format' = 'debezium-json' >> ) >> *CREATE* TABLE transactions ( >> currencyId BIGINT, >> transactionTime TIMESTAMP(3)) WITH ( >> >> ) >> >> -- Uner my proposal this query would be supported because the currency_rates >> >> -- table is used in a temporal join without any aggregations having been applied >> >> CREATE VIEW AS working_query >> SELECT >> ...FROM >> transactions AS t >> JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r >> ON r.currency = t.currencyId >> >> -- However, this query would be rejected by the planner until we determine the proper time semantics of a retacation >> >> CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM *currency_rates >> >> CREATE VIEW AS rejected_query >> SELECT >> ...FROM >> transactions AS t >> JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r >> ON r.currency = t.currency >> >> >> >> (2) For upsert source that defines PRIMARY KEY and may contains multiple >>> records under a PK, the main problem is the PK semantic,the multiple >>> records under a PK broke the unique semantic on a table. We need to walk >>> around this by (a) Adding another primary key keyword and explain the >>> upsert semantic (b) Creating temporal table base on a view that is the >>> deduplicated result of source table[2]. >>> >> >> This feels like more of a bikeshedding question than a blocker and I look >> forward to seeing what you come up with! >> >> Seth >> >> On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <[hidden email]> wrote: >> >>> Hi everyone, >>> >>> Thanks a lot for the great discussions so far. >>> >>> After reading through the long discussion, I still have one question. >>> Currently the temporal table function supports both event time and proc >>> time joining. >>> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in >>> DDL, >>> does it mean we can only use temporal table function join with event time? >>> If we can, how do we distinguish it with current temporal table (also >>> known as dimension table)? >>> >>> Maybe I'm missing something here. Correct me if I'm wrong. >>> >>> Leonard Xu <[hidden email]> 于2020年7月6日周一 下午11:34写道: >>> >>>> Hi, Seth >>>> >>>> Thanks for your explanation of user cases, and you’re wright the look up >>>> join/table is one kind of temporal table join/table which tracks latest >>>> snapshot of external DB-like tables, it's why we proposed use same >>>> temporal join syntax. >>>> >>>> In fact, I have invested and checked Debezuim format and Canal format >>>> more these days, and we can obtain the extract DML operation time from >>>> their meta information which comes from DB bin-log. Although extracting >>>> meta information from record is a part of FLIP-107 scope[1], at least we >>>> have a way to extract the correct operation time. Event we can obtain the >>>> expected operation time, there’re some problems. >>>> >>>> (1) For support changelog source backed CDC tools, a problem is that can >>>> we use the temporal table as a general source table which may followed by >>>> some aggregation operations, more accurate is wether the aggregation >>>> operator can use the DELETE record that we just updated the “correct” >>>> operation time to retract a record, maybe not. This will pull us back to >>>> the discussion of operation time VS event time, it’s a real cool but >>>> complicated topic see above discussion from mine and @Jark’s. >>>> >>>> (2) For upsert source that defines PRIMARY KEY and may contains multiple >>>> records under a PK, the main problem is the PK semantic,the multiple >>>> records under a PK broke the unique semantic on a table. We need to walk >>>> around this by (a) Adding another primary key keyword and explain the >>>> upsert semantic (b) Creating temporal table base on a view that is the >>>> deduplicated result of source table[2]. >>>> >>>> I’m working on (2), and if we want to support(1)i.e. support DELETE >>>> entirely, that’s really a big challenge but I also think wright thing for >>>> long term. >>>> >>>> If we decide to do (1), we need import operation time concept firstly, >>>> we need change the codebase for deal the operation time header in many >>>> places secondly, and finally explain and tell users how to understand and >>>> use temporal table. >>>> >>>> I’m a little worried about it’s valuable enough, I proposed only support >>>> (2) because it is a good replacement of current Temporal Table Function and >>>> will not introduce more concept and works. >>>> >>>> Jark, Jingsong, Konstantin, WDYT? >>>> >>>> >>>> Best, >>>> Leonard Xu >>>> [1] >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition >>>> < >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition >>>>> >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>>> < >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>>>> >>>> >>>> >>>>> 在 2020年7月6日,22:02,Seth Wiesman <[hidden email]> 写道: >>>>> >>>>> As an aside, I conceptually view temporal table joins to be >>>> semantically equivalent to look up table joins. They are just two different >>>> ways of consuming the same data. >>>>> >>>>> Seth >>>>> >>>>> On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <[hidden email] >>>> <mailto:[hidden email]>> wrote: >>>>> Hi Leonard, >>>>> >>>>> Regarding DELETE operations I tend to have the opposite reaction. I >>>> spend a lot of time working with production Flink users across a large >>>> number of organizations and to say we don't support temporal tables that >>>> include DELETEs will be a blocker for adoption. Even organizations that >>>> claim to never delete rows still occasionally due so per GDPR requests or >>>> other regulations. >>>>> >>>>> I actually do think users will understand the limitations. Flink today >>>> has a very clear value proposition around correctness, your results are as >>>> correct as the input data provided. This does not change under support for >>>> DELETE records. Flink is providing the most correct results possible based >>>> on the resolution of the fields as generated by 3rd party systems. As >>>> Debezium and other CDC libraries become more accurate, so will Flink. >>>>> >>>>> Seth >>>>> >>>>> On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <[hidden email] <mailto: >>>> [hidden email]>> wrote: >>>>> Hi, Konstantin >>>>> >>>>>> . Would we support a temporal join with a changelog stream with >>>>>> event time semantics by ignoring DELETE messages or would it be >>>> completed >>>>>> unsupported. >>>>> >>>>> I don’t know the percentage of this feature in temporal scenarios. >>>>> >>>>> Comparing to support the approximate event time join by ignoring >>>> DELETE message or by extracting an approximate event time for DELET >>>> message, I’m not sure is this acceptable for user even if we have >>>> explained the limitation of approximate event time join, I tend to do not >>>> support this feature, because we can not ensure the semantic of event time >>>> and it may lead an incorrect result for user in some scenarios. >>>>> >>>>> If the percentage is highly enough and most user cases can accept the >>>> approximate event time, I'm ok to support it for usability although it >>>> doesn’t implements the event time semantic strictly. >>>>> >>>>> Cheers, >>>>> Leonard Xu >>>>> >>>>> >>>> >>>> >>> >>> -- >>> >>> Best, >>> Benchao Li >>> >> |
Free forum by Nabble | Edit this page |