Hi, dear Flink community ~
The data integration is one of the most popular use case for Flink, from the point of SQL, we aim to design an API for SQL users in data ingestion. We have referenced some RDBMS and there is a classic CTAS syntax for table data copy [1][2][3][4]. We decide to use the CTAS syntax for Flink data ingestion use cases too here. *The Syntax* CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( <like_options> )] ] [AS query ]<query>: TABLE table_name | select_statement Explanation: - AS TABLE: copy source table data into target, schema evolution is supported - AS select_statement: copy source table data into target, schema evolution is disabled, for e.g, the schema is solid for compile time - SELECT statement should alias each expression, if no explicit alias is supported, an error throws (auto-gen names are hard to use and confusing) - AS and LIKE should not both exists because the functionality conflicts - By default, the WITH options are not copied, because most of the time, the options are different for source and sink. We disable the schema evolution for AS SELECT to support use non-evolution cases and to avoid unclear semantics for SQL transformation, for example, a SELECT * in a Join query. *The Use Case* Here is an example: create table dlf_catalog.default.hudi_sinkPARTITIONED BY (ts)WITH ( 'connector'='hudi', 'table.type'='MERGE_ON_READ')AS TABLE mysql_cdc_catalog.default.mysql_source; This statement creates a hudi sink table with same schema of MYSQL source table, the partition field is "ts". Here is a use case: CREATE CATALOG mysql_cdc WITH (type=mysql, instance=instance1, db=db1 ...);CREATE CATALOG dlf_datalake WITH (type = dlf, endpoint='http://xxx', accessKey='abc'); USE CATALOG dlf_datalake;CREATE TABLE ods_hudi_tablePARTITIONED BY (ts)WITH (connector='hudi', ...)AS TABLE mysql_table;// the sync pipeline was pulled up from here *About the Atomicity* The statements has 2 actions: 1. create the table 2. start the data sync pipeline The Flink catalog does not ensure the atomicity, thus we do not keep the atomicity here: - If a table exists when creation but [IF NOT EXISTS] keywords are not specified, throws - If the table is created but the write fails, the table is not dropped, user should drop it manually *Q & A* 1. some catalog does not support custom with options storage The catalog should throw directly thus the statement execute fails 2. how to write data into existing table with history data Declare [IF NOT EXISTS] keywords and we ignore the table creation but the pipeline still starts up 3. How to match sub-database and sub-table ? Use regex style source table name, such as: create table dlf_catalog.default.hudi_sinkPARTITIONED BY (ts)WITH ( 'connector'='hudi', 'table.type'='MERGE_ON_READ')AS TABLE `instant1.db1.*.user_*`; This statement would sync all the databases with "db1" as prefix and table name with "user_" as prefix into the target table. 4. If user defines new field with nullability as false and no default value specified, but we want schema evolution for that Throws because we do not know how to fill the defaults with nullability as false. Waiting for your nice ideas, thanks ~ [1] postgresql: https://www.postgresql.org/docs/13/sql-createtableas.html [2] spark3.0: https://spark.apache.org/docs/3.0.0/sql-ref-syntax-ddl-create-table-datasource.html [3] Hive 3.0: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTableCreate/Drop/TruncateTable [4] MySQL 5.6: https://dev.mysql.com/doc/refman/5.6/en/create-table-select.html Best, Danny Chan |
Hi Danny,
Thanks for starting this discussion.
Big +1 for this feature. Both CTAS AND CREATE TABLE LIKE are very useful features. IMO, it is clear to separate them into two parts in the `syntax` character. 😀
First, I have two related problems: 1. Would `create table` in CTAS trigger to create a physical table in external storage system? For example, now normal `create table` would only define a connecting with an existed external Kafka topic instead of trigger to create a physical kafka topic in kafka cluster. Does this behavior still work for CTAS AND CREATE TABLE LIKE? 2. Would the data sync of CTAS run continuously if select works on a unbounded source? Since sub select query may works on unbounded source in Flink, which is different with other system (postgres, spark, hive, mysql). Does data sync continuously run or just sync the snapshot at the job submit?
Besides, I have some minor problems which is mentioned in your email.
> how to write data into existing table with history data declare [IF NOT EXISTS] keywords and we ignore the table creation but the pipeline still starts up
Maybe we should check old schema and new schema. What would happen if schema of existed table is different with new schema?
> How to match sub-database and sub-table ? Use regex style source table name
1. What would happen if schema of matched tables different with each other? 2. What orders to sync data of all matched table? Sync data from all matched tables one by one or at the same time?
> AS select_statement: copy source table data into target
User could explicitly specify the data type for each column in the CTAS, what happened when run the following example. The demo is from MySQL document, https://dev.mysql.com/doc/refman/5.6/en/create-table-select.html, the result is a bit unexpected, I wonder What the behavior would be in Flink.
Best, JING ZHANG |
Thanks Danny for starting the discussion of extending CTAS syntax.
I think this is a very useful feature for data integration and ETL jobs (a big use case of Flink). Many users complain a lot that manually defining schemas for sources and sinks is hard. CTAS helps users to write ETL jobs without defining any schemas of sources and sinks. CTAS automatically creates physical tables in external systems, and automatically maps external tables to Flink tables with the help of catalogs (e.g. PgCatalog, HiveCatalog). On the other hand, the schema of the SELECT query is fixed after compile time. CTAS TABLE extends the syntax which allows dynamic schema during runtime, semantically it streaming copies the up-to-date structure and data (if run in streaming mode). So I think CTAS TABLE is a major step forward for data integration, it defines a syntax which allows the underlying streaming pipeline automatically migrate schema evolution (e.g. ADD COLUMN) from source tables to sink tables without stopping jobs or updating SQLs. Therefore, I'm +1 for the feature. Best, Jark On Fri, 28 May 2021 at 16:22, JING ZHANG <[hidden email]> wrote: > Hi Danny, > > Thanks for starting this discussion. > > > > Big +1 for this feature. Both CTAS AND CREATE TABLE LIKE are very useful > features. IMO, it is clear to separate them into two parts in the `syntax` > character. 😀 > > > > First, I have two related problems: > > > 1. Would `create table` in CTAS trigger to create a physical table in > external storage system? > > For example, now normal `create table` would only define a connecting with > an existed external Kafka topic instead of trigger to create a physical > kafka topic in kafka cluster. Does this behavior still work for CTAS AND > CREATE TABLE LIKE? > > > 2. Would the data sync of CTAS run continuously if select works on a > unbounded source? > > Since sub select query may works on unbounded source in Flink, which is > different with other system (postgres, spark, hive, mysql). Does data sync > continuously run or just sync the snapshot at the job submit? > > > > Besides, I have some minor problems which is mentioned in your email. > > > > > how to write data into existing table with history data declare [IF NOT > EXISTS] keywords and we ignore the table creation but the pipeline still > starts up > > > > Maybe we should check old schema and new schema. What would happen if > schema of existed table is different with new schema? > > > > > How to match sub-database and sub-table ? Use regex style source table > name > > > > 1. What would happen if schema of matched tables different with each > other? > > 2. What orders to sync data of all matched table? Sync data from all > matched tables one by one or at the same time? > > > > > AS select_statement: copy source table data into target > > > > User could explicitly specify the data type for each column in the CTAS, > what happened when run the following example. The demo is from MySQL > document, https://dev.mysql.com/doc/refman/5.6/en/create-table-select.html > , the result is a bit unexpected, I wonder > > What the behavior would be in Flink. > > > [image: image.png] > > Best, > JING ZHANG > |
Hi everyone,
quick question for my understanding: how is this different to CREATE TABLE IF NOT EXISTS my_table ( ... ) WITH ( ... ); INSERT INTO my_table SELECT ...; ? Is it only about a) not having to specify the schema and b) a more condensed syntax? Cheers, Konstantin On Fri, May 28, 2021 at 11:30 AM Jark Wu <[hidden email]> wrote: > Thanks Danny for starting the discussion of extending CTAS syntax. > > I think this is a very useful feature for data integration and ETL jobs (a > big use case of Flink). > Many users complain a lot that manually defining schemas for sources and > sinks is hard. > CTAS helps users to write ETL jobs without defining any schemas of sources > and sinks. > CTAS automatically creates physical tables in external systems, and > automatically > maps external tables to Flink tables with the help of catalogs (e.g. > PgCatalog, HiveCatalog). > > On the other hand, the schema of the SELECT query is fixed after compile > time. > CTAS TABLE extends the syntax which allows dynamic schema during runtime, > semantically it streaming copies the up-to-date structure and data (if run > in streaming mode). > So I think CTAS TABLE is a major step forward for data integration, it > defines a syntax > which allows the underlying streaming pipeline automatically migrate schema > evolution > (e.g. ADD COLUMN) from source tables to sink tables without stopping jobs > or updating SQLs. > > Therefore, I'm +1 for the feature. > > Best, > Jark > > On Fri, 28 May 2021 at 16:22, JING ZHANG <[hidden email]> wrote: > > > Hi Danny, > > > > Thanks for starting this discussion. > > > > > > > > Big +1 for this feature. Both CTAS AND CREATE TABLE LIKE are very useful > > features. IMO, it is clear to separate them into two parts in the > `syntax` > > character. 😀 > > > > > > > > First, I have two related problems: > > > > > > 1. Would `create table` in CTAS trigger to create a physical table in > > external storage system? > > > > For example, now normal `create table` would only define a connecting > with > > an existed external Kafka topic instead of trigger to create a physical > > kafka topic in kafka cluster. Does this behavior still work for CTAS AND > > CREATE TABLE LIKE? > > > > > > 2. Would the data sync of CTAS run continuously if select works on a > > unbounded source? > > > > Since sub select query may works on unbounded source in Flink, which is > > different with other system (postgres, spark, hive, mysql). Does data > sync > > continuously run or just sync the snapshot at the job submit? > > > > > > > > Besides, I have some minor problems which is mentioned in your email. > > > > > > > > > how to write data into existing table with history data declare [IF NOT > > EXISTS] keywords and we ignore the table creation but the pipeline still > > starts up > > > > > > > > Maybe we should check old schema and new schema. What would happen if > > schema of existed table is different with new schema? > > > > > > > > > How to match sub-database and sub-table ? Use regex style source table > > name > > > > > > > > 1. What would happen if schema of matched tables different with each > > other? > > > > 2. What orders to sync data of all matched table? Sync data from all > > matched tables one by one or at the same time? > > > > > > > > > AS select_statement: copy source table data into target > > > > > > > > User could explicitly specify the data type for each column in the CTAS, > > what happened when run the following example. The demo is from MySQL > > document, > https://dev.mysql.com/doc/refman/5.6/en/create-table-select.html > > , the result is a bit unexpected, I wonder > > > > What the behavior would be in Flink. > > > > > > [image: image.png] > > > > Best, > > JING ZHANG > > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk |
Hi Konstantin,
From my understanding, this syntax has 2 major benefits: 1. Just like you said, it saves the effort to specify the schema, especially when involving hundreds of fields. 2. When using CREATE TABLE xx AS TABLE yy, it gives us the possibility to enable schema evolution, and it seems pretty natural to do so. Best, Kurt On Fri, May 28, 2021 at 5:44 PM Konstantin Knauf <[hidden email]> wrote: > Hi everyone, > > quick question for my understanding: how is this different to > > CREATE TABLE IF NOT EXISTS my_table ( > ... > ) WITH ( > ... > ); > INSERT INTO my_table SELECT ...; > > ? > > Is it only about a) not having to specify the schema and b) a more > condensed syntax? > > Cheers, > > Konstantin > > On Fri, May 28, 2021 at 11:30 AM Jark Wu <[hidden email]> wrote: > > > Thanks Danny for starting the discussion of extending CTAS syntax. > > > > I think this is a very useful feature for data integration and ETL jobs > (a > > big use case of Flink). > > Many users complain a lot that manually defining schemas for sources and > > sinks is hard. > > CTAS helps users to write ETL jobs without defining any schemas of > sources > > and sinks. > > CTAS automatically creates physical tables in external systems, and > > automatically > > maps external tables to Flink tables with the help of catalogs (e.g. > > PgCatalog, HiveCatalog). > > > > On the other hand, the schema of the SELECT query is fixed after compile > > time. > > CTAS TABLE extends the syntax which allows dynamic schema during runtime, > > semantically it streaming copies the up-to-date structure and data (if > run > > in streaming mode). > > So I think CTAS TABLE is a major step forward for data integration, it > > defines a syntax > > which allows the underlying streaming pipeline automatically migrate > schema > > evolution > > (e.g. ADD COLUMN) from source tables to sink tables without stopping jobs > > or updating SQLs. > > > > Therefore, I'm +1 for the feature. > > > > Best, > > Jark > > > > On Fri, 28 May 2021 at 16:22, JING ZHANG <[hidden email]> wrote: > > > > > Hi Danny, > > > > > > Thanks for starting this discussion. > > > > > > > > > > > > Big +1 for this feature. Both CTAS AND CREATE TABLE LIKE are very > useful > > > features. IMO, it is clear to separate them into two parts in the > > `syntax` > > > character. 😀 > > > > > > > > > > > > First, I have two related problems: > > > > > > > > > 1. Would `create table` in CTAS trigger to create a physical table in > > > external storage system? > > > > > > For example, now normal `create table` would only define a connecting > > with > > > an existed external Kafka topic instead of trigger to create a physical > > > kafka topic in kafka cluster. Does this behavior still work for CTAS > AND > > > CREATE TABLE LIKE? > > > > > > > > > 2. Would the data sync of CTAS run continuously if select works on a > > > unbounded source? > > > > > > Since sub select query may works on unbounded source in Flink, which is > > > different with other system (postgres, spark, hive, mysql). Does data > > sync > > > continuously run or just sync the snapshot at the job submit? > > > > > > > > > > > > Besides, I have some minor problems which is mentioned in your email. > > > > > > > > > > > > > how to write data into existing table with history data declare [IF > NOT > > > EXISTS] keywords and we ignore the table creation but the pipeline > still > > > starts up > > > > > > > > > > > > Maybe we should check old schema and new schema. What would happen if > > > schema of existed table is different with new schema? > > > > > > > > > > > > > How to match sub-database and sub-table ? Use regex style source > table > > > name > > > > > > > > > > > > 1. What would happen if schema of matched tables different with each > > > other? > > > > > > 2. What orders to sync data of all matched table? Sync data from all > > > matched tables one by one or at the same time? > > > > > > > > > > > > > AS select_statement: copy source table data into target > > > > > > > > > > > > User could explicitly specify the data type for each column in the > CTAS, > > > what happened when run the following example. The demo is from MySQL > > > document, > > https://dev.mysql.com/doc/refman/5.6/en/create-table-select.html > > > , the result is a bit unexpected, I wonder > > > > > > What the behavior would be in Flink. > > > > > > > > > [image: image.png] > > > > > > Best, > > > JING ZHANG > > > > > > > > -- > > Konstantin Knauf > > https://twitter.com/snntrable > > https://github.com/knaufk > |
Thanks Danny for starting the discussion.
+1 for this feature. I like schema evolution. A question: Can we support multiple pipelines? For example, I have three source tables and just one sink table. So, I will write three CTAS: - CTAS IF NOT EXISTS 1 - CTAS IF NOT EXISTS 2 - CTAS IF NOT EXISTS 3 Do I have a chance to configure different sink options? Something like dynamic table options? Best, Jingsong On Fri, May 28, 2021 at 10:10 PM Kurt Young <[hidden email]> wrote: > Hi Konstantin, > > From my understanding, this syntax has 2 major benefits: > > 1. Just like you said, it saves the effort to specify the schema, > especially when involving hundreds of fields. > 2. When using CREATE TABLE xx AS TABLE yy, it gives us the possibility to > enable schema evolution, and it seems pretty natural to do so. > > Best, > Kurt > > > On Fri, May 28, 2021 at 5:44 PM Konstantin Knauf <[hidden email]> > wrote: > > > Hi everyone, > > > > quick question for my understanding: how is this different to > > > > CREATE TABLE IF NOT EXISTS my_table ( > > ... > > ) WITH ( > > ... > > ); > > INSERT INTO my_table SELECT ...; > > > > ? > > > > Is it only about a) not having to specify the schema and b) a more > > condensed syntax? > > > > Cheers, > > > > Konstantin > > > > On Fri, May 28, 2021 at 11:30 AM Jark Wu <[hidden email]> wrote: > > > > > Thanks Danny for starting the discussion of extending CTAS syntax. > > > > > > I think this is a very useful feature for data integration and ETL jobs > > (a > > > big use case of Flink). > > > Many users complain a lot that manually defining schemas for sources > and > > > sinks is hard. > > > CTAS helps users to write ETL jobs without defining any schemas of > > sources > > > and sinks. > > > CTAS automatically creates physical tables in external systems, and > > > automatically > > > maps external tables to Flink tables with the help of catalogs (e.g. > > > PgCatalog, HiveCatalog). > > > > > > On the other hand, the schema of the SELECT query is fixed after > compile > > > time. > > > CTAS TABLE extends the syntax which allows dynamic schema during > runtime, > > > semantically it streaming copies the up-to-date structure and data (if > > run > > > in streaming mode). > > > So I think CTAS TABLE is a major step forward for data integration, it > > > defines a syntax > > > which allows the underlying streaming pipeline automatically migrate > > schema > > > evolution > > > (e.g. ADD COLUMN) from source tables to sink tables without stopping > jobs > > > or updating SQLs. > > > > > > Therefore, I'm +1 for the feature. > > > > > > Best, > > > Jark > > > > > > On Fri, 28 May 2021 at 16:22, JING ZHANG <[hidden email]> wrote: > > > > > > > Hi Danny, > > > > > > > > Thanks for starting this discussion. > > > > > > > > > > > > > > > > Big +1 for this feature. Both CTAS AND CREATE TABLE LIKE are very > > useful > > > > features. IMO, it is clear to separate them into two parts in the > > > `syntax` > > > > character. 😀 > > > > > > > > > > > > > > > > First, I have two related problems: > > > > > > > > > > > > 1. Would `create table` in CTAS trigger to create a physical table in > > > > external storage system? > > > > > > > > For example, now normal `create table` would only define a connecting > > > with > > > > an existed external Kafka topic instead of trigger to create a > physical > > > > kafka topic in kafka cluster. Does this behavior still work for CTAS > > AND > > > > CREATE TABLE LIKE? > > > > > > > > > > > > 2. Would the data sync of CTAS run continuously if select works on a > > > > unbounded source? > > > > > > > > Since sub select query may works on unbounded source in Flink, which > is > > > > different with other system (postgres, spark, hive, mysql). Does data > > > sync > > > > continuously run or just sync the snapshot at the job submit? > > > > > > > > > > > > > > > > Besides, I have some minor problems which is mentioned in your email. > > > > > > > > > > > > > > > > > how to write data into existing table with history data declare [IF > > NOT > > > > EXISTS] keywords and we ignore the table creation but the pipeline > > still > > > > starts up > > > > > > > > > > > > > > > > Maybe we should check old schema and new schema. What would happen if > > > > schema of existed table is different with new schema? > > > > > > > > > > > > > > > > > How to match sub-database and sub-table ? Use regex style source > > table > > > > name > > > > > > > > > > > > > > > > 1. What would happen if schema of matched tables different with > each > > > > other? > > > > > > > > 2. What orders to sync data of all matched table? Sync data from > all > > > > matched tables one by one or at the same time? > > > > > > > > > > > > > > > > > AS select_statement: copy source table data into target > > > > > > > > > > > > > > > > User could explicitly specify the data type for each column in the > > CTAS, > > > > what happened when run the following example. The demo is from MySQL > > > > document, > > > https://dev.mysql.com/doc/refman/5.6/en/create-table-select.html > > > > , the result is a bit unexpected, I wonder > > > > > > > > What the behavior would be in Flink. > > > > > > > > > > > > [image: image.png] > > > > > > > > Best, > > > > JING ZHANG > > > > > > > > > > > > > -- > > > > Konstantin Knauf > > > > https://twitter.com/snntrable > > > > https://github.com/knaufk > > > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |