Hi everybody,
I'd like to propose and discuss some api changes to support DML operations like ‘insert into’ clause in TableAPI&SQL. Originally this was discussed with Fabian in the PR conversations(see https://github.com/apache/flink/pull/3829), considering it makes several api changes, so starting this mailing list to discuss it. # Motivation Currently in TableAPI there’s only registration method for source table, when we use SQL writing a streaming job, we should add additional code for the sink, like TableAPI does: val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" val t = StreamTestData.getSmall3TupleDataStream(env) tEnv.registerDataStream("MyTable", t) // one way: invoke tableAPI’s writeToSink method directly val result = tEnv.sql(sqlQuery) result.writeToSink(new YourStreamSink) // another way: convert to datastream first and then invoke addSink val result = tEnv.sql(sqlQuery).toDataStream[Row] result.addSink(new StreamITCase.StringSink) From the api we can see the sink table always be a derived table because its 'schema' is inferred from the result type of upstream query. Compare to traditional RDBMS which support DML syntax, a query with a target output could be written like this: insert into table target_table_name [(column_name [ ,...n ])] query The equivalent form of the example above is as follows: tEnv.registerTableSink("targetTable", new YourSink) val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" val result = tEnv.sql(sql) It is supported by Calcite’s grammar: insert:( INSERT | UPSERT ) INTO tablePrimary [ '(' column [, column ]* ')' ] query I'd like to extend Flink TableAPI to support such feature. # Proposed changes 1. support registering a sink table (like source table registration, and will do validation according to the registered table) /** * Registers an external [[TableSink]] in this [[TableEnvironment]]'s catalog. * Registered sink tables can be referenced in SQL DML clause. * * @param name The name under which the [[TableSink]] is registered. * @param tableSink The [[TableSink]] to register. */ def registerTableSink(name: String, tableSink: TableSink[_]): Unit 2. add two new methods to table.scala - def insertInto[T](tableSink: String): Unit - def insertInto[T](tableSink: String, conf: QueryConfig): Unit I propose to retain the current writeToSink method so that will not do a breaking change of the API. And in a sense, it is similar with ‘SQL CREATE TABLE AS statement’ usage in RDBMS(which creates a table from an existing table by copying the existing table's columns). 3. deprecate the current sql method and add two new methods to TableEnvironment - @deprecated def sql(sql: String): Table - def sqlQuery(sql: String): Table - def sqlUpdate(sql: String, config: QueryConfig): Unit I think the method sqlUpdate here is different from Jdbc's[1] executeUpdate which returns a int value, because sqlUpdate will not trigger an execution immediately, so keep the return value as Unit sounds reasonable and doesn't break down the consistency of Scala and Java APIs. Note that: A registered source table can not be update unless it registered as a sink table as well. So we need to add validation both in TableAPI and SQL for preventing query on sink table or insert into a source table. Do not support partial column insertion to a target table due to it hadn’t nullable property definition for now. Ref: [1] https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html doc link: https://goo.gl/n3phK5 What do you think? Best, Lincoln |
Hi Lincoln,
thank you for this proposal and discussing the motivation for this change. I think this makes a lot of sense (as you said, we discussed this before). I'd like to highlight the breaking change (among a several non-breaking changes) proposed here: We propose to deprecate TableEnvironment.sql(sql: String): Table and replace it by TableEnvironment.sqlQuery(sql: String): Table. The reasons for this change are: - We need a sqlUpdate() method that does not return a Table. For now the use case is "INSERT INTO x SELECT ..." but there are other DML statements as well. - In order to better distinguish query and updated functionality, we would like to rename the sql() method to sqlQuery(). - We want to call the SQL method similar to their JDBC counterparts. In JDBC the methods are executeQuery(): ResultSet and executeUpdate(): int. Since the Table API is not only SQL, we think that sqlQuery() and sqlUpdate() are good method names for the functionality. What do others think? Fabian 2017-07-31 6:50 GMT+02:00 Lin Li <[hidden email]>: > Hi everybody, > I'd like to propose and discuss some api changes to support DML > operations like ‘insert into’ clause in TableAPI&SQL. > Originally this was discussed with Fabian in the PR conversations(see > https://github.com/apache/flink/pull/3829), considering it makes several > api changes, so starting this mailing list to discuss it. > # Motivation > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional code for > the sink, like TableAPI does: > > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > > val t = StreamTestData.getSmall3TupleDataStream(env) > > tEnv.registerDataStream("MyTable", t) > > // one way: invoke tableAPI’s writeToSink method directly > > val result = tEnv.sql(sqlQuery) > > result.writeToSink(new YourStreamSink) > > // another way: convert to datastream first and then invoke addSink > > val result = tEnv.sql(sqlQuery).toDataStream[Row] > > result.addSink(new StreamITCase.StringSink) > > From the api we can see the sink table always be a derived table because > its 'schema' is inferred from the result type of upstream query. > > Compare to traditional RDBMS which support DML syntax, a query with a > target output could be written like this: > > insert into table target_table_name > > [(column_name [ ,...n ])] > > query > > The equivalent form of the example above is as follows: > > tEnv.registerTableSink("targetTable", new YourSink) > > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > > val result = tEnv.sql(sql) > > It is supported by Calcite’s grammar: > > insert:( INSERT | UPSERT ) INTO tablePrimary > > [ '(' column [, column ]* ')' ] > > query > > I'd like to extend Flink TableAPI to support such feature. > # Proposed changes > > 1. support registering a sink table (like source table registration, and > will do validation according to the registered table) > > /** > > * Registers an external [[TableSink]] in this [[TableEnvironment]]'s > catalog. > > * Registered sink tables can be referenced in SQL DML clause. > > * > > * @param name The name under which the [[TableSink]] is registered. > > * @param tableSink The [[TableSink]] to register. > > */ > > def registerTableSink(name: String, tableSink: TableSink[_]): Unit > > > 2. add two new methods to table.scala > > - > > def insertInto[T](tableSink: String): Unit > - > > def insertInto[T](tableSink: String, conf: QueryConfig): Unit > > I propose to retain the current writeToSink method so that will not do a > breaking change of the API. And in a sense, it is similar with ‘SQL CREATE > TABLE AS statement’ usage in RDBMS(which creates a table from an existing > table by copying the existing table's columns). > > 3. deprecate the current sql method and add two new methods to > TableEnvironment > > - > > @deprecated def sql(sql: String): Table > - > > def sqlQuery(sql: String): Table > - > > def sqlUpdate(sql: String, config: QueryConfig): Unit > > I think the method sqlUpdate here is different from Jdbc's[1] executeUpdate > which returns a int value, because sqlUpdate will not trigger an execution > immediately, so keep the return value as Unit sounds reasonable and doesn't > break down the consistency of Scala and Java APIs. > > Note that: > > A registered source table can not be update unless it registered as a sink > table as well. So we need to add validation both in TableAPI and SQL for > preventing query on sink table or insert into a source table. > > Do not support partial column insertion to a target table due to it hadn’t > nullable property definition for now. > > Ref: > [1] > https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html > > > doc link: https://goo.gl/n3phK5 > > What do you think? > > Best, Lincoln > |
+1 to support this change, as it makes the sql API more accurate and
elegant. I hope this will not introduce too much troubles in the release upgrading for the existing flink SQL users. On Mon, Jul 31, 2017 at 3:42 PM, Fabian Hueske <[hidden email]> wrote: > Hi Lincoln, > > thank you for this proposal and discussing the motivation for this change. > I think this makes a lot of sense (as you said, we discussed this before). > > I'd like to highlight the breaking change (among a several non-breaking > changes) proposed here: > > We propose to deprecate TableEnvironment.sql(sql: String): Table and > replace it by TableEnvironment.sqlQuery(sql: String): Table. > > The reasons for this change are: > - We need a sqlUpdate() method that does not return a Table. For now the > use case is "INSERT INTO x SELECT ..." but there are other DML statements > as well. > - In order to better distinguish query and updated functionality, we would > like to rename the sql() method to sqlQuery(). > - We want to call the SQL method similar to their JDBC counterparts. In > JDBC the methods are executeQuery(): ResultSet and executeUpdate(): int. > Since the Table API is not only SQL, we think that sqlQuery() and > sqlUpdate() are good method names for the functionality. > > What do others think? > > Fabian > > > > 2017-07-31 6:50 GMT+02:00 Lin Li <[hidden email]>: > > > Hi everybody, > > I'd like to propose and discuss some api changes to support DML > > operations like ‘insert into’ clause in TableAPI&SQL. > > Originally this was discussed with Fabian in the PR conversations(see > > https://github.com/apache/flink/pull/3829), considering it makes > several > > api changes, so starting this mailing list to discuss it. > > # Motivation > > > > Currently in TableAPI there’s only registration method for source table, > > when we use SQL writing a streaming job, we should add additional code > for > > the sink, like TableAPI does: > > > > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > > > > val t = StreamTestData.getSmall3TupleDataStream(env) > > > > tEnv.registerDataStream("MyTable", t) > > > > // one way: invoke tableAPI’s writeToSink method directly > > > > val result = tEnv.sql(sqlQuery) > > > > result.writeToSink(new YourStreamSink) > > > > // another way: convert to datastream first and then invoke addSink > > > > val result = tEnv.sql(sqlQuery).toDataStream[Row] > > > > result.addSink(new StreamITCase.StringSink) > > > > From the api we can see the sink table always be a derived table because > > its 'schema' is inferred from the result type of upstream query. > > > > Compare to traditional RDBMS which support DML syntax, a query with a > > target output could be written like this: > > > > insert into table target_table_name > > > > [(column_name [ ,...n ])] > > > > query > > > > The equivalent form of the example above is as follows: > > > > tEnv.registerTableSink("targetTable", new YourSink) > > > > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > > > > val result = tEnv.sql(sql) > > > > It is supported by Calcite’s grammar: > > > > insert:( INSERT | UPSERT ) INTO tablePrimary > > > > [ '(' column [, column ]* ')' ] > > > > query > > > > I'd like to extend Flink TableAPI to support such feature. > > # Proposed changes > > > > 1. support registering a sink table (like source table registration, and > > will do validation according to the registered table) > > > > /** > > > > * Registers an external [[TableSink]] in this [[TableEnvironment]]'s > > catalog. > > > > * Registered sink tables can be referenced in SQL DML clause. > > > > * > > > > * @param name The name under which the [[TableSink]] is registered. > > > > * @param tableSink The [[TableSink]] to register. > > > > */ > > > > def registerTableSink(name: String, tableSink: TableSink[_]): Unit > > > > > > 2. add two new methods to table.scala > > > > - > > > > def insertInto[T](tableSink: String): Unit > > - > > > > def insertInto[T](tableSink: String, conf: QueryConfig): Unit > > > > I propose to retain the current writeToSink method so that will not do a > > breaking change of the API. And in a sense, it is similar with ‘SQL > CREATE > > TABLE AS statement’ usage in RDBMS(which creates a table from an existing > > table by copying the existing table's columns). > > > > 3. deprecate the current sql method and add two new methods to > > TableEnvironment > > > > - > > > > @deprecated def sql(sql: String): Table > > - > > > > def sqlQuery(sql: String): Table > > - > > > > def sqlUpdate(sql: String, config: QueryConfig): Unit > > > > I think the method sqlUpdate here is different from Jdbc's[1] > executeUpdate > > which returns a int value, because sqlUpdate will not trigger an > execution > > immediately, so keep the return value as Unit sounds reasonable and > doesn't > > break down the consistency of Scala and Java APIs. > > > > Note that: > > > > A registered source table can not be update unless it registered as a > sink > > table as well. So we need to add validation both in TableAPI and SQL for > > preventing query on sink table or insert into a source table. > > > > Do not support partial column insertion to a target table due to it > hadn’t > > nullable property definition for now. > > > > Ref: > > [1] > > https://docs.oracle.com/javase/7/docs/api/java/sql/ > PreparedStatement.html > > > > > > doc link: https://goo.gl/n3phK5 > > > > What do you think? > > > > Best, Lincoln > > > |
Free forum by Nabble | Edit this page |