[DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

classic Classic list List threaded Threaded
42 messages Options
123
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

Seth Wiesman-4
* I mistyped the rejected_query, it should be

CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate*
FROM *currency_rates

CREATE VIEW AS rejected_query
SELECT
  ...FROM
  transactions AS t
  JOIN post_agg_stream FOR SYSTEM_TIME AS OF t.transactionTime AS r
  ON r.currency = t.currency


On Mon, Jul 6, 2020 at 11:29 AM Seth Wiesman <[hidden email]> wrote:

> Hey Leonard,
>
> Agreed, this is a fun discussion!
>
> (1) For support changelog source backed CDC tools, a problem is that can
>> we use the temporal table as a general source table which may followed by
>> some aggregation operations,  more accurate is wether the aggregation
>> operator can use the DELETE record that we just updated the “correct”
>> operation time to retract a record, maybe not. This will pull us back to
>> the discussion of operation time VS event time, it’s a real cool but
>> complicated topic see above discussion from mine and @Jark’s.
>>
>
> I fully agree this is a complicated topic, however, I don't think its
> actually a problem that needs to be solved for the first version of this
> feature. My proposal is to disallow using upsert streams as temporal tables
> if an aggregation operation has been applied. Going back to my notion that
> temporal tables are a tool for performing streaming star schema
> denormalization, the dimension table in a star schema is rarely aggregated
> pre-join. In the case of a CDC stream of currency rates joined to
> transactions, the CDC stream only needs to support filter pushdowns and
> map-like transformations before being joined. I believe this is a
> reasonable limitation we can impose that will unblock a large percentage of
> use cases, and once we better understand the semantics of the correct
> operation in a retraction the limitation can be removed in future versions
> while remaining backward compatible.
>
>
>
> CREATE TABLE currency_rates (
>   currencyId BIGINT PRIMARY KEY,
>   rate DECIMAL(10, 2)) WITH (
>  'connector' = 'kafka',
>  'format' = 'debezium-json'
> )
> *CREATE* TABLE transactions (
>   currencyId BIGINT,
>   transactionTime TIMESTAMP(3)) WITH (
>
> )
>
> -- Uner my proposal this query would be supported because the currency_rates
>
> -- table is used in a temporal join without any aggregations having been applied
>
> CREATE VIEW AS working_query
> SELECT
>   ...FROM
>   transactions AS t
>   JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
>   ON r.currency = t.currencyId
>
> -- However, this query would be rejected by the planner until we determine the proper time semantics of a retacation
>
> CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM *currency_rates
>
> CREATE VIEW AS rejected_query
> SELECT
>   ...FROM
>   transactions AS t
>   JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
>   ON r.currency = t.currency
>
>
>
> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>> records under a PK, the main problem is the  PK semantic,the multiple
>> records under a PK broke the unique semantic on a table. We need to walk
>> around this by (a) Adding another primary key keyword and explain the
>> upsert semantic (b) Creating temporal table base on a view that is the
>> deduplicated result of source table[2].
>>
>
> This feels like more of a bikeshedding question than a blocker and I look
> forward to seeing what you come up with!
>
> Seth
>
> On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <[hidden email]> wrote:
>
>> Hi everyone,
>>
>> Thanks a lot for the great discussions so far.
>>
>> After reading through the long discussion, I still have one question.
>> Currently the temporal table function supports both event time and proc
>> time joining.
>> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in
>> DDL,
>> does it mean we can only use temporal table function join with event time?
>> If we can, how do we distinguish it with current temporal table (also
>> known as dimension table)?
>>
>> Maybe I'm missing something here. Correct me if I'm wrong.
>>
>> Leonard Xu <[hidden email]> 于2020年7月6日周一 下午11:34写道:
>>
>>> Hi, Seth
>>>
>>> Thanks for your explanation of user cases, and you’re wright the look up
>>> join/table is one kind of temporal table join/table which tracks latest
>>> snapshot of external  DB-like tables, it's why we proposed use same
>>> temporal join syntax.
>>>
>>> In fact, I have invested and checked Debezuim format and Canal format
>>> more these days, and we can obtain the extract DML operation time from
>>> their meta information which comes from DB bin-log.  Although extracting
>>> meta information from record is a part of FLIP-107 scope[1], at least we
>>> have a way to extract the correct operation time. Event we can obtain the
>>> expected operation time, there’re some problems.
>>>
>>> (1) For support changelog source backed CDC tools, a problem is that can
>>> we use the temporal table as a general source table which may followed by
>>> some aggregation operations,  more accurate is wether the aggregation
>>> operator can use the DELETE record that we just updated the “correct”
>>> operation time to retract a record, maybe not. This will pull us back to
>>> the discussion of operation time VS event time, it’s a real cool but
>>> complicated topic see above discussion from mine and @Jark’s.
>>>
>>> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>>> records under a PK, the main problem is the  PK semantic,the multiple
>>> records under a PK broke the unique semantic on a table. We need to walk
>>> around this by (a) Adding another primary key keyword and explain the
>>> upsert semantic (b) Creating temporal table base on a view that is the
>>> deduplicated result of source table[2].
>>>
>>> I’m working on (2), and if we want to support(1)i.e. support DELETE
>>> entirely, that’s really a big challenge but I also think wright thing for
>>> long term.
>>>
>>> If we decide to do (1), we need import operation time concept firstly,
>>> we need change the codebase for deal the operation time header in many
>>> places secondly, and finally explain and tell users how to understand and
>>> use temporal table.
>>>
>>> I’m a little worried about it’s valuable enough, I proposed only support
>>> (2) because it is a good replacement of current Temporal Table Function and
>>> will not introduce more concept and works.
>>>
>>> Jark, Jingsong, Konstantin, WDYT?
>>>
>>>
>>> Best,
>>> Leonard Xu
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>>> <
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>>> >
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>> <
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>> >
>>>
>>>
>>> > 在 2020年7月6日,22:02,Seth Wiesman <[hidden email]> 写道:
>>> >
>>> > As an aside, I conceptually view temporal table joins to be
>>> semantically equivalent to look up table joins. They are just two different
>>> ways of consuming the same data.
>>> >
>>> > Seth
>>> >
>>> > On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <[hidden email]
>>> <mailto:[hidden email]>> wrote:
>>> > Hi Leonard,
>>> >
>>> > Regarding DELETE operations I tend to have the opposite reaction. I
>>> spend a lot of time working with production Flink users across a large
>>> number of organizations and to say we don't support temporal tables that
>>> include DELETEs will be a blocker for adoption. Even organizations that
>>> claim to never delete rows still occasionally due so per  GDPR requests or
>>> other regulations.
>>> >
>>> > I actually do think users will understand the limitations. Flink today
>>> has a very clear value proposition around correctness, your results are as
>>> correct as the input data provided. This does not change under support for
>>> DELETE records. Flink is providing the most correct results possible based
>>> on the resolution of the fields as generated by 3rd party systems. As
>>> Debezium and other CDC libraries become more accurate, so will Flink.
>>> >
>>> > Seth
>>> >
>>> > On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <[hidden email] <mailto:
>>> [hidden email]>> wrote:
>>> > Hi, Konstantin
>>> >
>>> >> . Would we support a temporal join with a changelog stream with
>>> >> event time semantics by ignoring DELETE messages or would it be
>>> completed
>>> >> unsupported.
>>> >
>>> > I don’t know the percentage of this feature in temporal scenarios.
>>> >
>>> > Comparing to support the approximate event time join by ignoring
>>> DELETE message or by extracting an approximate event time for DELET
>>> message,  I’m not sure is this acceptable for user even if we have
>>> explained the limitation of approximate event time join, I tend to do not
>>> support this feature, because we can not ensure the semantic of event time
>>> and it may lead an incorrect result for user in some scenarios.
>>> >
>>> > If the percentage is highly enough and most user cases can accept the
>>> approximate  event time, I'm ok to support it  for usability although it
>>> doesn’t implements the event time semantic strictly.
>>> >
>>> > Cheers,
>>> > Leonard Xu
>>> >
>>> >
>>>
>>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] [FLINK-16824] Creating Temporal Table Function via DDL

Leonard Xu
Hi, all

I open a new discussion of FLIP-132[1] which based on our consensus on current thread.

Let me keep communication in the new thread, please let me know if you have any concerns.

Best
Leonard
[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-132-Temporal-Table-DDL-td43483.html>

> 在 2020年7月7日,00:31,Seth Wiesman <[hidden email]> 写道:
>
> * I mistyped the rejected_query, it should be
>
> CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate*
> FROM *currency_rates
>
> CREATE VIEW AS rejected_query
> SELECT
>  ...FROM
>  transactions AS t
>  JOIN post_agg_stream FOR SYSTEM_TIME AS OF t.transactionTime AS r
>  ON r.currency = t.currency
>
>
> On Mon, Jul 6, 2020 at 11:29 AM Seth Wiesman <[hidden email]> wrote:
>
>> Hey Leonard,
>>
>> Agreed, this is a fun discussion!
>>
>> (1) For support changelog source backed CDC tools, a problem is that can
>>> we use the temporal table as a general source table which may followed by
>>> some aggregation operations,  more accurate is wether the aggregation
>>> operator can use the DELETE record that we just updated the “correct”
>>> operation time to retract a record, maybe not. This will pull us back to
>>> the discussion of operation time VS event time, it’s a real cool but
>>> complicated topic see above discussion from mine and @Jark’s.
>>>
>>
>> I fully agree this is a complicated topic, however, I don't think its
>> actually a problem that needs to be solved for the first version of this
>> feature. My proposal is to disallow using upsert streams as temporal tables
>> if an aggregation operation has been applied. Going back to my notion that
>> temporal tables are a tool for performing streaming star schema
>> denormalization, the dimension table in a star schema is rarely aggregated
>> pre-join. In the case of a CDC stream of currency rates joined to
>> transactions, the CDC stream only needs to support filter pushdowns and
>> map-like transformations before being joined. I believe this is a
>> reasonable limitation we can impose that will unblock a large percentage of
>> use cases, and once we better understand the semantics of the correct
>> operation in a retraction the limitation can be removed in future versions
>> while remaining backward compatible.
>>
>>
>>
>> CREATE TABLE currency_rates (
>>  currencyId BIGINT PRIMARY KEY,
>>  rate DECIMAL(10, 2)) WITH (
>> 'connector' = 'kafka',
>> 'format' = 'debezium-json'
>> )
>> *CREATE* TABLE transactions (
>>  currencyId BIGINT,
>>  transactionTime TIMESTAMP(3)) WITH (
>>
>> )
>>
>> -- Uner my proposal this query would be supported because the currency_rates
>>
>> -- table is used in a temporal join without any aggregations having been applied
>>
>> CREATE VIEW AS working_query
>> SELECT
>>  ...FROM
>>  transactions AS t
>>  JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
>>  ON r.currency = t.currencyId
>>
>> -- However, this query would be rejected by the planner until we determine the proper time semantics of a retacation
>>
>> CREATE VIEW AS post_agg_stream SELECT currencyId, AVG(rate)* as *rate* FROM *currency_rates
>>
>> CREATE VIEW AS rejected_query
>> SELECT
>>  ...FROM
>>  transactions AS t
>>  JOIN currency_rates FOR SYSTEM_TIME AS OF t.transactionTime AS r
>>  ON r.currency = t.currency
>>
>>
>>
>> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>>> records under a PK, the main problem is the  PK semantic,the multiple
>>> records under a PK broke the unique semantic on a table. We need to walk
>>> around this by (a) Adding another primary key keyword and explain the
>>> upsert semantic (b) Creating temporal table base on a view that is the
>>> deduplicated result of source table[2].
>>>
>>
>> This feels like more of a bikeshedding question than a blocker and I look
>> forward to seeing what you come up with!
>>
>> Seth
>>
>> On Mon, Jul 6, 2020 at 10:59 AM Benchao Li <[hidden email]> wrote:
>>
>>> Hi everyone,
>>>
>>> Thanks a lot for the great discussions so far.
>>>
>>> After reading through the long discussion, I still have one question.
>>> Currently the temporal table function supports both event time and proc
>>> time joining.
>>> If we use "FOR SYSTEM_TIME AS OF" syntax without "TEMPORAL" keyword in
>>> DDL,
>>> does it mean we can only use temporal table function join with event time?
>>> If we can, how do we distinguish it with current temporal table (also
>>> known as dimension table)?
>>>
>>> Maybe I'm missing something here. Correct me if I'm wrong.
>>>
>>> Leonard Xu <[hidden email]> 于2020年7月6日周一 下午11:34写道:
>>>
>>>> Hi, Seth
>>>>
>>>> Thanks for your explanation of user cases, and you’re wright the look up
>>>> join/table is one kind of temporal table join/table which tracks latest
>>>> snapshot of external  DB-like tables, it's why we proposed use same
>>>> temporal join syntax.
>>>>
>>>> In fact, I have invested and checked Debezuim format and Canal format
>>>> more these days, and we can obtain the extract DML operation time from
>>>> their meta information which comes from DB bin-log.  Although extracting
>>>> meta information from record is a part of FLIP-107 scope[1], at least we
>>>> have a way to extract the correct operation time. Event we can obtain the
>>>> expected operation time, there’re some problems.
>>>>
>>>> (1) For support changelog source backed CDC tools, a problem is that can
>>>> we use the temporal table as a general source table which may followed by
>>>> some aggregation operations,  more accurate is wether the aggregation
>>>> operator can use the DELETE record that we just updated the “correct”
>>>> operation time to retract a record, maybe not. This will pull us back to
>>>> the discussion of operation time VS event time, it’s a real cool but
>>>> complicated topic see above discussion from mine and @Jark’s.
>>>>
>>>> (2) For upsert source that defines PRIMARY KEY and may contains multiple
>>>> records under a PK, the main problem is the  PK semantic,the multiple
>>>> records under a PK broke the unique semantic on a table. We need to walk
>>>> around this by (a) Adding another primary key keyword and explain the
>>>> upsert semantic (b) Creating temporal table base on a view that is the
>>>> deduplicated result of source table[2].
>>>>
>>>> I’m working on (2), and if we want to support(1)i.e. support DELETE
>>>> entirely, that’s really a big challenge but I also think wright thing for
>>>> long term.
>>>>
>>>> If we decide to do (1), we need import operation time concept firstly,
>>>> we need change the codebase for deal the operation time header in many
>>>> places secondly, and finally explain and tell users how to understand and
>>>> use temporal table.
>>>>
>>>> I’m a little worried about it’s valuable enough, I proposed only support
>>>> (2) because it is a good replacement of current Temporal Table Function and
>>>> will not introduce more concept and works.
>>>>
>>>> Jark, Jingsong, Konstantin, WDYT?
>>>>
>>>>
>>>> Best,
>>>> Leonard Xu
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>>>> <
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Accessread-onlymetadatae.g.partition
>>>>>
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>>> <
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>>>>
>>>>
>>>>
>>>>> 在 2020年7月6日,22:02,Seth Wiesman <[hidden email]> 写道:
>>>>>
>>>>> As an aside, I conceptually view temporal table joins to be
>>>> semantically equivalent to look up table joins. They are just two different
>>>> ways of consuming the same data.
>>>>>
>>>>> Seth
>>>>>
>>>>> On Mon, Jul 6, 2020 at 8:56 AM Seth Wiesman <[hidden email]
>>>> <mailto:[hidden email]>> wrote:
>>>>> Hi Leonard,
>>>>>
>>>>> Regarding DELETE operations I tend to have the opposite reaction. I
>>>> spend a lot of time working with production Flink users across a large
>>>> number of organizations and to say we don't support temporal tables that
>>>> include DELETEs will be a blocker for adoption. Even organizations that
>>>> claim to never delete rows still occasionally due so per  GDPR requests or
>>>> other regulations.
>>>>>
>>>>> I actually do think users will understand the limitations. Flink today
>>>> has a very clear value proposition around correctness, your results are as
>>>> correct as the input data provided. This does not change under support for
>>>> DELETE records. Flink is providing the most correct results possible based
>>>> on the resolution of the fields as generated by 3rd party systems. As
>>>> Debezium and other CDC libraries become more accurate, so will Flink.
>>>>>
>>>>> Seth
>>>>>
>>>>> On Fri, Jul 3, 2020 at 11:00 PM Leonard Xu <[hidden email] <mailto:
>>>> [hidden email]>> wrote:
>>>>> Hi, Konstantin
>>>>>
>>>>>> . Would we support a temporal join with a changelog stream with
>>>>>> event time semantics by ignoring DELETE messages or would it be
>>>> completed
>>>>>> unsupported.
>>>>>
>>>>> I don’t know the percentage of this feature in temporal scenarios.
>>>>>
>>>>> Comparing to support the approximate event time join by ignoring
>>>> DELETE message or by extracting an approximate event time for DELET
>>>> message,  I’m not sure is this acceptable for user even if we have
>>>> explained the limitation of approximate event time join, I tend to do not
>>>> support this feature, because we can not ensure the semantic of event time
>>>> and it may lead an incorrect result for user in some scenarios.
>>>>>
>>>>> If the percentage is highly enough and most user cases can accept the
>>>> approximate  event time, I'm ok to support it  for usability although it
>>>> doesn’t implements the event time semantic strictly.
>>>>>
>>>>> Cheers,
>>>>> Leonard Xu
>>>>>
>>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>

123