Hi everyone,
Leonard and I prepared a FLIP about refactoring current Descriptor API, i.e. TableEnvironment#connect(). We would like to propose a new descriptor API to register connectors in Table API. Since Flink 1.9, the community focused more on the new SQL DDL feature. After a series of releases, the SQL DDL is powerful and has many rich features now. However, Descriptor API (the `TableEnvironment#connect()`) has been stagnant for a long time and missing lots of core features, such as computed columns and primary keys. That's frustrating for Table API users who want to register tables programmatically. Besides, currently, a connector must implement a corresponding Descriptor (e.g. `new Kafka()`) before using the "connect" API. Therefore, we hope to reduce this effort for connector developers, that custom source/sinks can be registered via the descriptor API without implementing a Descriptor. These are the problems we want to resolve in this FLIP. I'm looking forward to your comments. https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API Best, Jark |
Hi Jark, Thanks for starting the discussion. I think this is an important effort. I really like the general concept, but I have a few comments to the details of the FLIP. 1) I don't see any benefit in differentiating the computedColumn
vs column in the method name. It does add cognitive burden. Can we
simply have column in both cases? 2) I think we should use the Expression DSL for defining the expressions of computed columns instead of just pure strings. 3) I am not convinced of having the Schema#proctime and
Schema#watermarkFor#boundedOutOfOrderTimestamps methods. That
would again make it different from the SQL DDL where we do: proctime AS proctime()
respectively. Even if we do provide that helper methods I think the SQL way should be the recommended approach. I would rather see it as: Schema()
.column("proctime", proctime());
.watermarkFor("rowtime",
$("rowtime").minus(lit(3).seconds())) // or
.watermarkFor("rowtime",
$("rowtime").minus(Duration.ofSeconds(3))) once we properly
support interval types
4) I think the section about LIKE clause requires a second pass through. The example is wrong. Moreover I am not sure what is the LikeOption.INCLUDING.ALL? Is this a constant? Is this some kind of a builder? 5) The classes like TableDescriptor/Schema described in the FLIP
cover the user facing helper methods. They do not define though
the contract which the planner/TableEnvironment expects. How will
the planner use those classes to create actual tables? From the
point of view of the TableEnvironment#createTable or
TableEnvironment#from none of the methods listed in the FLIP are
necessary. At the same time there are no methods to retrieve the
Schema/Options etc. which are required to actually create the
Table. 6) Lastly, how about we try to remove the new keyword from the syntax? Personally I'd very much prefer to have it like: Schema.column() .column() or .type("kafka") // I am not strong on the "type" keyword here. I can also see for/of/... Best, Dawid On 07/07/2020 04:32, Jark Wu wrote:
Hi everyone, Leonard and I prepared a FLIP about refactoring current Descriptor API, i.e. TableEnvironment#connect(). We would like to propose a new descriptor API to register connectors in Table API. Since Flink 1.9, the community focused more on the new SQL DDL feature. After a series of releases, the SQL DDL is powerful and has many rich features now. However, Descriptor API (the `TableEnvironment#connect()`) has been stagnant for a long time and missing lots of core features, such as computed columns and primary keys. That's frustrating for Table API users who want to register tables programmatically. Besides, currently, a connector must implement a corresponding Descriptor (e.g. `new Kafka()`) before using the "connect" API. Therefore, we hope to reduce this effort for connector developers, that custom source/sinks can be registered via the descriptor API without implementing a Descriptor. These are the problems we want to resolve in this FLIP. I'm looking forward to your comments. https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API Best, Jark signature.asc (849 bytes) Download Attachment |
In reply to this post by Jark Wu-2
Correction to my point 4. The example is correct. I did not read it
carefully enough. Sorry for the confusion. Nevertheless I'd still like to see a bit more explanation on the LikeOptions. On 07/07/2020 04:32, Jark Wu wrote: > Hi everyone, > > Leonard and I prepared a FLIP about refactoring current Descriptor API, > i.e. TableEnvironment#connect(). We would like to propose a new descriptor > API to register connectors in Table API. > > Since Flink 1.9, the community focused more on the new SQL DDL feature. > After a series of releases, the SQL DDL is powerful and has many rich > features now. However, Descriptor API (the `TableEnvironment#connect()`) > has been stagnant for a long time and missing lots of core features, such > as computed columns and primary keys. That's frustrating for Table API > users who want to register tables programmatically. Besides, currently, a > connector must implement a corresponding Descriptor (e.g. `new Kafka()`) > before using the "connect" API. Therefore, we hope to reduce this effort > for connector developers, that custom source/sinks can be registered via > the descriptor API without implementing a Descriptor. > > These are the problems we want to resolve in this FLIP. I'm looking forward > to your comments. > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API > > Best, > Jark > signature.asc (849 bytes) Download Attachment |
Hi Dawid,
Thanks for the great feedback! Here are my responses: 1) computedColumn(..) vs column(..) I'm fine to use `column(..)` in both cases. 2) Expression DSL vs pure SQL string for computed columns This is a good point. Actually, I also prefer to use Expression DSL because this is more Table API style. However, this requires to modify TableSchema again to accept & expose Expression as computed columns. I'm not convinced about this, because AFAIK, we want to have a CatalogTableSchema to hold this information and don't want to extend TableSchema. Maybe Timo can give some points here. Besides, this will make the descriptor API can't be persisted in Catalog unless FLIP-80 is done. 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps The original intention behind these APIs are providing shortcut APIs for Table API users. But I'm also fine to only provide the DDL-like methods if you have concerns. We can discuss shortcuts in the future if users request. 4) LikeOption LikeOption.INCLUDING.ALL is a constant (enum values). I have added more description about this in the FLIP. 5) implementation? I don't want to mention too much about implementation details in the FLIP at the beginning, because the API is already very long. But I also added an "Implementation" section to explain them. 6) static method vs new keyword Personally I prefer the new keyword because it makes the API cleaner. If we want remove new keyword and use static methods, we have to: Either adding a `Schema.builder()/create()` method as the starting method, Or duplicating all the methods as static methods, e.g. we have 12 methods in `Kafka`, any of them can be a starting method, then we will have 24 methods in `Kafka`. Both are not good, and it's hard to keep all the descriptors having the same starting method name, but all the descriptors can start from the same new keyword. Best, Jark On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email]> wrote: > Correction to my point 4. The example is correct. I did not read it > carefully enough. Sorry for the confusion. Nevertheless I'd still like > to see a bit more explanation on the LikeOptions. > > On 07/07/2020 04:32, Jark Wu wrote: > > Hi everyone, > > > > Leonard and I prepared a FLIP about refactoring current Descriptor API, > > i.e. TableEnvironment#connect(). We would like to propose a new > descriptor > > API to register connectors in Table API. > > > > Since Flink 1.9, the community focused more on the new SQL DDL feature. > > After a series of releases, the SQL DDL is powerful and has many rich > > features now. However, Descriptor API (the `TableEnvironment#connect()`) > > has been stagnant for a long time and missing lots of core features, such > > as computed columns and primary keys. That's frustrating for Table API > > users who want to register tables programmatically. Besides, currently, a > > connector must implement a corresponding Descriptor (e.g. `new Kafka()`) > > before using the "connect" API. Therefore, we hope to reduce this effort > > for connector developers, that custom source/sinks can be registered via > > the descriptor API without implementing a Descriptor. > > > > These are the problems we want to resolve in this FLIP. I'm looking > forward > > to your comments. > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API > > > > Best, > > Jark > > > > |
Hi Jark,
thanks for working on this issue. It is time to fix this last part of inconsistency in the API. I also like the core parts of the FLIP, esp. that TableDescriptor is one entity that can be passed to different methods. Here is some feedback from my side: 1) +1 for just `column(...)` 2) Expression DSL vs pure SQL string for computed columns I agree with Dawid. Using the Expression DSL is desireable for a consistent API. Furthermore, otherwise people need to register functions if they want to use them in an expression. Refactoring TableSchema is definitely on the list for 1.12. Maybe we can come up with some intermediate solution where we transform the expression to a SQL expression for the catalog. Until the discussions around FLIP-80 and CatalogTableSchema have been finalized. 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps We should design the descriptor very close to the SQL syntax. The more similar the syntax the more likely it is too keep the new descriptor API stable. 6) static method vs new keyword Actually, the `new` keyword was one of the things that bothered me most in the old design. Fluent APIs avoid this nowadays. 7) make the descriptors immutable with builders The descriptors are some kind of builders already. But they are not called "builder". Instead of coming up with the new concept of a "descriptor", we should use terminology that people esp. Java/Scala users are familiar with already. We could make the descriptors immutable to pass them around easily. Btw "Connector" and "Format" should always be in the classname. This was also a mistake in the past. Instead of calling the descriptor just `Kafka` we could call it `KafkaConnector`. An entire example could look like: tEnv.createTemporaryTable( "OrdersInKafka", KafkaConnector.newBuilder() // builder pattern supported by IDE .topic("user_logs") .property("bootstrap.servers", "localhost:9092") .property("group.id", "test-group") .format(JsonFormat.newInstance()) // shortcut for no parameters .schema( Schema.newBuilder() .column("user_id", DataTypes.BIGINT()) .column("score", DataTypes.DECIMAL(10, 2)) .column("log_ts", DataTypes.TIMESTAMP(3)) .column("my_ts", toTimestamp($("log_ts")) .build() ) .build() ); Instead of refacoring the existing classes, we could also think about a completly new stack. I think this would avoid confusion for the old users. We could deprecate the entire `Kafka` class instead of dealing with backwards compatibility. 8) minor extensions A general `Connector.option(...)` class should also accept `ConfigOption` instead of only strings. A `Schema.column()` should accept `AbstractDataType` that can be resolved to a `DataType` by access to a `DataTypeFactory`. What do you think? Thanks, Timo On 09.07.20 18:51, Jark Wu wrote: > Hi Dawid, > > Thanks for the great feedback! Here are my responses: > > 1) computedColumn(..) vs column(..) > I'm fine to use `column(..)` in both cases. > > 2) Expression DSL vs pure SQL string for computed columns > This is a good point. Actually, I also prefer to use Expression DSL because > this is more Table API style. > However, this requires to modify TableSchema again to accept & expose > Expression as computed columns. > I'm not convinced about this, because AFAIK, we want to have a > CatalogTableSchema to hold this information > and don't want to extend TableSchema. Maybe Timo can give some points here. > Besides, this will make the descriptor API can't be persisted in Catalog > unless FLIP-80 is done. > > 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > The original intention behind these APIs are providing shortcut APIs for > Table API users. > But I'm also fine to only provide the DDL-like methods if you have > concerns. We can discuss shortcuts in the future if users request. > > 4) LikeOption > LikeOption.INCLUDING.ALL is a constant (enum values). I have added more > description about this in the FLIP. > > 5) implementation? > I don't want to mention too much about implementation details in the FLIP > at the beginning, because the API is already very long. > But I also added an "Implementation" section to explain them. > > 6) static method vs new keyword > Personally I prefer the new keyword because it makes the API cleaner. If we > want remove new keyword and use static methods, we have to: > Either adding a `Schema.builder()/create()` method as the starting method, > Or duplicating all the methods as static methods, e.g. we have 12 methods > in `Kafka`, any of them can be a starting method, then we will have 24 > methods in `Kafka`. > Both are not good, and it's hard to keep all the descriptors having the > same starting method name, but all the descriptors can start from the same > new keyword. > > Best, > Jark > > On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email]> > wrote: > >> Correction to my point 4. The example is correct. I did not read it >> carefully enough. Sorry for the confusion. Nevertheless I'd still like >> to see a bit more explanation on the LikeOptions. >> >> On 07/07/2020 04:32, Jark Wu wrote: >>> Hi everyone, >>> >>> Leonard and I prepared a FLIP about refactoring current Descriptor API, >>> i.e. TableEnvironment#connect(). We would like to propose a new >> descriptor >>> API to register connectors in Table API. >>> >>> Since Flink 1.9, the community focused more on the new SQL DDL feature. >>> After a series of releases, the SQL DDL is powerful and has many rich >>> features now. However, Descriptor API (the `TableEnvironment#connect()`) >>> has been stagnant for a long time and missing lots of core features, such >>> as computed columns and primary keys. That's frustrating for Table API >>> users who want to register tables programmatically. Besides, currently, a >>> connector must implement a corresponding Descriptor (e.g. `new Kafka()`) >>> before using the "connect" API. Therefore, we hope to reduce this effort >>> for connector developers, that custom source/sinks can be registered via >>> the descriptor API without implementing a Descriptor. >>> >>> These are the problems we want to resolve in this FLIP. I'm looking >> forward >>> to your comments. >>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >>> >>> Best, >>> Jark >>> >> >> > |
Thanks Jark bring this discussion and organize the FLIP document.
Thanks Dawid and Timo for the feedback. Here are my thoughts. 1) I’m +1 with using column() for both cases. 2) Expression DSL vs pure SQL string for computed columns I think we can support them both and implement the pure SQL String first, I agree that Expression DSL brings more possibility and flexibility, but using SQL string is a more unified way which can reuse most logic with DDL like validation and persist in Catalog, and Converting Expression DSL to SQL Expression is another big topic and I did not figure out a feasible idea until now. So, maybe we can postpone the Expression DSL support considered the reality. 3) Methods Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps +1 with Dawid’s proposal to offer SQL like methods. Schema() .column("proctime", proctime()); .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) And we can simplify watermarkFor(“colName”, Expression watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I think the later one has can express the meaning of “ WATERMARK FOR column_name AS watermark_strategy_expression“ well. 5)6)7) The new keyword vs the static method vs builder pattern I have not strong tendency, the new keyword and the static method on descriptor can nearly treated as a builder and do same things like builder. For the builder pattern, we will introduce six methods(connector.Builder()、connector.Builder.build(), format.Builder(), format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think we could reduce these unnecessary methods. I ‘m slightly +1 for new keyword if we need a choice. 8) `Connector.option(...)` class should also accept `ConfigOption` I’m slightly -1 for this, ConfigOption may not work because the key for format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of json, we need “json.fail-on-missing-field” rather than “fail-on-missing-field”. public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = ConfigOptions .key("fail-on-missing-field") .booleanType() .defaultValue(false) WDYT? Best, Leonard Xu > 在 2020年7月15日,16:37,Timo Walther <[hidden email]> 写道: > > Hi Jark, > > thanks for working on this issue. It is time to fix this last part of inconsistency in the API. I also like the core parts of the FLIP, esp. that TableDescriptor is one entity that can be passed to different methods. Here is some feedback from my side: > > 1) +1 for just `column(...)` > > 2) Expression DSL vs pure SQL string for computed columns > I agree with Dawid. Using the Expression DSL is desireable for a consistent API. Furthermore, otherwise people need to register functions if they want to use them in an expression. Refactoring TableSchema is definitely on the list for 1.12. Maybe we can come up with some intermediate solution where we transform the expression to a SQL expression for the catalog. Until the discussions around FLIP-80 and CatalogTableSchema have been finalized. > > 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > We should design the descriptor very close to the SQL syntax. The more similar the syntax the more likely it is too keep the new descriptor API stable. > > 6) static method vs new keyword > Actually, the `new` keyword was one of the things that bothered me most in the old design. Fluent APIs avoid this nowadays. > > 7) make the descriptors immutable with builders > The descriptors are some kind of builders already. But they are not called "builder". Instead of coming up with the new concept of a "descriptor", we should use terminology that people esp. Java/Scala users are familiar with already. > > We could make the descriptors immutable to pass them around easily. > > Btw "Connector" and "Format" should always be in the classname. This was also a mistake in the past. Instead of calling the descriptor just `Kafka` we could call it `KafkaConnector`. An entire example could look like: > > tEnv.createTemporaryTable( > "OrdersInKafka", > KafkaConnector.newBuilder() // builder pattern supported by IDE > .topic("user_logs") > .property("bootstrap.servers", "localhost:9092") > .property("group.id", "test-group") > .format(JsonFormat.newInstance()) // shortcut for no parameters > .schema( > Schema.newBuilder() > .column("user_id", DataTypes.BIGINT()) > .column("score", DataTypes.DECIMAL(10, 2)) > .column("log_ts", DataTypes.TIMESTAMP(3)) > .column("my_ts", toTimestamp($("log_ts")) > .build() > ) > .build() > ); > > Instead of refacoring the existing classes, we could also think about a completly new stack. I think this would avoid confusion for the old users. We could deprecate the entire `Kafka` class instead of dealing with backwards compatibility. > > 8) minor extensions > A general `Connector.option(...)` class should also accept `ConfigOption` instead of only strings. > A `Schema.column()` should accept `AbstractDataType` that can be resolved to a `DataType` by access to a `DataTypeFactory`. > > What do you think? > > Thanks, > Timo > > > On 09.07.20 18:51, Jark Wu wrote: >> Hi Dawid, >> Thanks for the great feedback! Here are my responses: >> 1) computedColumn(..) vs column(..) >> I'm fine to use `column(..)` in both cases. >> 2) Expression DSL vs pure SQL string for computed columns >> This is a good point. Actually, I also prefer to use Expression DSL because >> this is more Table API style. >> However, this requires to modify TableSchema again to accept & expose >> Expression as computed columns. >> I'm not convinced about this, because AFAIK, we want to have a >> CatalogTableSchema to hold this information >> and don't want to extend TableSchema. Maybe Timo can give some points here. >> Besides, this will make the descriptor API can't be persisted in Catalog >> unless FLIP-80 is done. >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >> The original intention behind these APIs are providing shortcut APIs for >> Table API users. >> But I'm also fine to only provide the DDL-like methods if you have >> concerns. We can discuss shortcuts in the future if users request. >> 4) LikeOption >> LikeOption.INCLUDING.ALL is a constant (enum values). I have added more >> description about this in the FLIP. >> 5) implementation? >> I don't want to mention too much about implementation details in the FLIP >> at the beginning, because the API is already very long. >> But I also added an "Implementation" section to explain them. >> 6) static method vs new keyword >> Personally I prefer the new keyword because it makes the API cleaner. If we >> want remove new keyword and use static methods, we have to: >> Either adding a `Schema.builder()/create()` method as the starting method, >> Or duplicating all the methods as static methods, e.g. we have 12 methods >> in `Kafka`, any of them can be a starting method, then we will have 24 >> methods in `Kafka`. >> Both are not good, and it's hard to keep all the descriptors having the >> same starting method name, but all the descriptors can start from the same >> new keyword. >> Best, >> Jark >> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email]> >> wrote: >>> Correction to my point 4. The example is correct. I did not read it >>> carefully enough. Sorry for the confusion. Nevertheless I'd still like >>> to see a bit more explanation on the LikeOptions. >>> >>> On 07/07/2020 04:32, Jark Wu wrote: >>>> Hi everyone, >>>> >>>> Leonard and I prepared a FLIP about refactoring current Descriptor API, >>>> i.e. TableEnvironment#connect(). We would like to propose a new >>> descriptor >>>> API to register connectors in Table API. >>>> >>>> Since Flink 1.9, the community focused more on the new SQL DDL feature. >>>> After a series of releases, the SQL DDL is powerful and has many rich >>>> features now. However, Descriptor API (the `TableEnvironment#connect()`) >>>> has been stagnant for a long time and missing lots of core features, such >>>> as computed columns and primary keys. That's frustrating for Table API >>>> users who want to register tables programmatically. Besides, currently, a >>>> connector must implement a corresponding Descriptor (e.g. `new Kafka()`) >>>> before using the "connect" API. Therefore, we hope to reduce this effort >>>> for connector developers, that custom source/sinks can be registered via >>>> the descriptor API without implementing a Descriptor. >>>> >>>> These are the problems we want to resolve in this FLIP. I'm looking >>> forward >>>> to your comments. >>>> >>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >>>> >>>> Best, >>>> Jark >>>> >>> >>> > |
Thanks for the discussion.
Descriptor lacks the watermark and the computed column is too long. 1) +1 for just `column(...)` 2) +1 for being consistent with Table API, the Java Table API should be Expression DSL. We don't need pure string support, users should just use DDL instead. I think this is just a schema descriptor? The schema descriptor should be consistent with DDL, so, definitely, it should contain computed columns information. 3) +1 for not containing Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in legacy apis. 6,7) +1 for removing "new" and builder and making it immutable, For Jark, the starting method is the static method, the others are not. 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. For Leonard, I don't think user needs “json.fail-on-missing-field” rather than “fail-on-missing-field”, user should need “fail-on-missing-field” rather than “json.fail-on-missing-field", the recommended way is "JsonFormat.newInstance().option(....)", should configure options in the format scope. Best, Jingsong On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> wrote: > Thanks Jark bring this discussion and organize the FLIP document. > > Thanks Dawid and Timo for the feedback. Here are my thoughts. > > 1) I’m +1 with using column() for both cases. > > 2) Expression DSL vs pure SQL string for computed columns > > I think we can support them both and implement the pure SQL String first, > I agree that Expression DSL brings more possibility and flexibility, but > using SQL string is a more unified way which can reuse most logic with DDL > like validation and persist in Catalog, > and Converting Expression DSL to SQL Expression is another big topic and I > did not figure out a feasible idea until now. > So, maybe we can postpone the Expression DSL support considered the > reality. > > 3) Methods Schema#proctime and > Schema#watermarkFor#boundedOutOfOrderTimestamps > > +1 with Dawid’s proposal to offer SQL like methods. > Schema() > .column("proctime", proctime()); > .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) > And we can simplify watermarkFor(“colName”, Expression > watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I > think the later one has can express the meaning of “ WATERMARK FOR > column_name AS watermark_strategy_expression“ well. > > 5)6)7) The new keyword vs the static method vs builder pattern > > I have not strong tendency, the new keyword and the static method on > descriptor can nearly treated as a builder and do same things like > builder. > For the builder pattern, we will introduce six > methods(connector.Builder()、connector.Builder.build(), format.Builder(), > format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think > we could reduce these unnecessary methods. I ‘m slightly +1 for new > keyword if we need a choice. > > 8) `Connector.option(...)` class should also accept `ConfigOption` > I’m slightly -1 for this, ConfigOption may not work because the key for > format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of > json, we need “json.fail-on-missing-field” rather than > “fail-on-missing-field”. > > public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = > ConfigOptions > .key("fail-on-missing-field") > .booleanType() > .defaultValue(false) > > WDYT? > > Best, > Leonard Xu > > > > 在 2020年7月15日,16:37,Timo Walther <[hidden email]> 写道: > > > > Hi Jark, > > > > thanks for working on this issue. It is time to fix this last part of > inconsistency in the API. I also like the core parts of the FLIP, esp. that > TableDescriptor is one entity that can be passed to different methods. Here > is some feedback from my side: > > > > 1) +1 for just `column(...)` > > > > 2) Expression DSL vs pure SQL string for computed columns > > I agree with Dawid. Using the Expression DSL is desireable for a > consistent API. Furthermore, otherwise people need to register functions if > they want to use them in an expression. Refactoring TableSchema is > definitely on the list for 1.12. Maybe we can come up with some > intermediate solution where we transform the expression to a SQL expression > for the catalog. Until the discussions around FLIP-80 and > CatalogTableSchema have been finalized. > > > > 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > > We should design the descriptor very close to the SQL syntax. The more > similar the syntax the more likely it is too keep the new descriptor API > stable. > > > > 6) static method vs new keyword > > Actually, the `new` keyword was one of the things that bothered me most > in the old design. Fluent APIs avoid this nowadays. > > > > 7) make the descriptors immutable with builders > > The descriptors are some kind of builders already. But they are not > called "builder". Instead of coming up with the new concept of a > "descriptor", we should use terminology that people esp. Java/Scala users > are familiar with already. > > > > We could make the descriptors immutable to pass them around easily. > > > > Btw "Connector" and "Format" should always be in the classname. This was > also a mistake in the past. Instead of calling the descriptor just `Kafka` > we could call it `KafkaConnector`. An entire example could look like: > > > > tEnv.createTemporaryTable( > > "OrdersInKafka", > > KafkaConnector.newBuilder() // builder pattern supported by IDE > > .topic("user_logs") > > .property("bootstrap.servers", "localhost:9092") > > .property("group.id", "test-group") > > .format(JsonFormat.newInstance()) // shortcut for no parameters > > .schema( > > Schema.newBuilder() > > .column("user_id", DataTypes.BIGINT()) > > .column("score", DataTypes.DECIMAL(10, 2)) > > .column("log_ts", DataTypes.TIMESTAMP(3)) > > .column("my_ts", toTimestamp($("log_ts")) > > .build() > > ) > > .build() > > ); > > > > Instead of refacoring the existing classes, we could also think about a > completly new stack. I think this would avoid confusion for the old users. > We could deprecate the entire `Kafka` class instead of dealing with > backwards compatibility. > > > > 8) minor extensions > > A general `Connector.option(...)` class should also accept > `ConfigOption` instead of only strings. > > A `Schema.column()` should accept `AbstractDataType` that can be > resolved to a `DataType` by access to a `DataTypeFactory`. > > > > What do you think? > > > > Thanks, > > Timo > > > > > > On 09.07.20 18:51, Jark Wu wrote: > >> Hi Dawid, > >> Thanks for the great feedback! Here are my responses: > >> 1) computedColumn(..) vs column(..) > >> I'm fine to use `column(..)` in both cases. > >> 2) Expression DSL vs pure SQL string for computed columns > >> This is a good point. Actually, I also prefer to use Expression DSL > because > >> this is more Table API style. > >> However, this requires to modify TableSchema again to accept & expose > >> Expression as computed columns. > >> I'm not convinced about this, because AFAIK, we want to have a > >> CatalogTableSchema to hold this information > >> and don't want to extend TableSchema. Maybe Timo can give some points > here. > >> Besides, this will make the descriptor API can't be persisted in Catalog > >> unless FLIP-80 is done. > >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > >> The original intention behind these APIs are providing shortcut APIs for > >> Table API users. > >> But I'm also fine to only provide the DDL-like methods if you have > >> concerns. We can discuss shortcuts in the future if users request. > >> 4) LikeOption > >> LikeOption.INCLUDING.ALL is a constant (enum values). I have added more > >> description about this in the FLIP. > >> 5) implementation? > >> I don't want to mention too much about implementation details in the > FLIP > >> at the beginning, because the API is already very long. > >> But I also added an "Implementation" section to explain them. > >> 6) static method vs new keyword > >> Personally I prefer the new keyword because it makes the API cleaner. > If we > >> want remove new keyword and use static methods, we have to: > >> Either adding a `Schema.builder()/create()` method as the starting > method, > >> Or duplicating all the methods as static methods, e.g. we have 12 > methods > >> in `Kafka`, any of them can be a starting method, then we will have 24 > >> methods in `Kafka`. > >> Both are not good, and it's hard to keep all the descriptors having the > >> same starting method name, but all the descriptors can start from the > same > >> new keyword. > >> Best, > >> Jark > >> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email]> > >> wrote: > >>> Correction to my point 4. The example is correct. I did not read it > >>> carefully enough. Sorry for the confusion. Nevertheless I'd still like > >>> to see a bit more explanation on the LikeOptions. > >>> > >>> On 07/07/2020 04:32, Jark Wu wrote: > >>>> Hi everyone, > >>>> > >>>> Leonard and I prepared a FLIP about refactoring current Descriptor > API, > >>>> i.e. TableEnvironment#connect(). We would like to propose a new > >>> descriptor > >>>> API to register connectors in Table API. > >>>> > >>>> Since Flink 1.9, the community focused more on the new SQL DDL > feature. > >>>> After a series of releases, the SQL DDL is powerful and has many rich > >>>> features now. However, Descriptor API (the > `TableEnvironment#connect()`) > >>>> has been stagnant for a long time and missing lots of core features, > such > >>>> as computed columns and primary keys. That's frustrating for Table API > >>>> users who want to register tables programmatically. Besides, > currently, a > >>>> connector must implement a corresponding Descriptor (e.g. `new > Kafka()`) > >>>> before using the "connect" API. Therefore, we hope to reduce this > effort > >>>> for connector developers, that custom source/sinks can be registered > via > >>>> the descriptor API without implementing a Descriptor. > >>>> > >>>> These are the problems we want to resolve in this FLIP. I'm looking > >>> forward > >>>> to your comments. > >>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API > >>>> > >>>> Best, > >>>> Jark > >>>> > >>> > >>> > > > > -- Best, Jingsong Lee |
Thank you all for the discussion!
Here are my comments: 2) I agree we should support Expression as a computed column. But I'm in favor of Leonard's point that maybe we can also support SQL string expression as a computed column. Because it also keeps aligned with DDL. The concern for Expression is that converting Expression to SQL string, or (de)serializing Expression is another topic not clear and may involve lots of work. Maybe we can support Expression later if time permits. 6,7) I still prefer the "new" keyword over builder. I don't think immutable is a strong reason. I care more about usability and experience from users and devs perspective. - Users need to type more words if using builder: `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` - It's more difficult for developers to write a descriptor. 2 classes (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, schema, partitionedBy, like, etc..). With the "new" keyword all the common methods are defined by the framework. - It's hard to have the same API style for different connectors, because the common methods are defined by users. For example, some may have `withSchema`, `partitionKey`, `withLike`, etc... 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`, but the generic `Connector#option`. This doesn't work when using format options. new Connector("kafka") .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, because "kafka" requires "json.ignore-parse-errors" as the option key, not the "ignore-parse-errors". ======================================== Hi Timo, regarding having a complete new stack, I have thought about that. But I still prefer to refactor the existing stack. Reasons: Because I think it will be more confusing if users will see two similar stacks and may have many problems if using the wrong class. For example, we may have two `Schema` and `TableDescriptor` classes. The `KafkaConnector` can't be used in legacy `connect()` API, the legacy `Kafka` class can't be used in the new `createTemporaryTable()` API. Besides, the existing API has been deprecated in 1.11, I think it's fine to remove them in 1.12. Best, Jark On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> wrote: > Thanks for the discussion. > > Descriptor lacks the watermark and the computed column is too long. > > 1) +1 for just `column(...)` > > 2) +1 for being consistent with Table API, the Java Table API should be > Expression DSL. We don't need pure string support, users should just use > DDL instead. I think this is just a schema descriptor? The schema > descriptor should be consistent with DDL, so, definitely, it should > contain computed columns information. > > 3) +1 for not containing Schema#proctime and > Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in > legacy apis. > > 6,7) +1 for removing "new" and builder and making it immutable, For Jark, > the starting method is the static method, the others are not. > > 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. > For Leonard, I don't think user needs “json.fail-on-missing-field” rather > than “fail-on-missing-field”, user should > need “fail-on-missing-field” rather than “json.fail-on-missing-field", the > recommended way is "JsonFormat.newInstance().option(....)", should > configure options in the format scope. > > Best, > Jingsong > > On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> wrote: > >> Thanks Jark bring this discussion and organize the FLIP document. >> >> Thanks Dawid and Timo for the feedback. Here are my thoughts. >> >> 1) I’m +1 with using column() for both cases. >> >> 2) Expression DSL vs pure SQL string for computed columns >> >> I think we can support them both and implement the pure SQL String first, >> I agree that Expression DSL brings more possibility and flexibility, but >> using SQL string is a more unified way which can reuse most logic with DDL >> like validation and persist in Catalog, >> and Converting Expression DSL to SQL Expression is another big topic and >> I did not figure out a feasible idea until now. >> So, maybe we can postpone the Expression DSL support considered the >> reality. >> >> 3) Methods Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps >> >> +1 with Dawid’s proposal to offer SQL like methods. >> Schema() >> .column("proctime", proctime()); >> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >> And we can simplify watermarkFor(“colName”, Expression >> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I >> think the later one has can express the meaning of “ WATERMARK FOR >> column_name AS watermark_strategy_expression“ well. >> >> 5)6)7) The new keyword vs the static method vs builder pattern >> >> I have not strong tendency, the new keyword and the static method on >> descriptor can nearly treated as a builder and do same things like >> builder. >> For the builder pattern, we will introduce six >> methods(connector.Builder()、connector.Builder.build(), format.Builder(), >> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think >> we could reduce these unnecessary methods. I ‘m slightly +1 for new >> keyword if we need a choice. >> >> 8) `Connector.option(...)` class should also accept `ConfigOption` >> I’m slightly -1 for this, ConfigOption may not work because the key for >> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of >> json, we need “json.fail-on-missing-field” rather than >> “fail-on-missing-field”. >> >> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >> ConfigOptions >> .key("fail-on-missing-field") >> .booleanType() >> .defaultValue(false) >> >> WDYT? >> >> Best, >> Leonard Xu >> >> >> > 在 2020年7月15日,16:37,Timo Walther <[hidden email]> 写道: >> > >> > Hi Jark, >> > >> > thanks for working on this issue. It is time to fix this last part of >> inconsistency in the API. I also like the core parts of the FLIP, esp. that >> TableDescriptor is one entity that can be passed to different methods. Here >> is some feedback from my side: >> > >> > 1) +1 for just `column(...)` >> > >> > 2) Expression DSL vs pure SQL string for computed columns >> > I agree with Dawid. Using the Expression DSL is desireable for a >> consistent API. Furthermore, otherwise people need to register functions if >> they want to use them in an expression. Refactoring TableSchema is >> definitely on the list for 1.12. Maybe we can come up with some >> intermediate solution where we transform the expression to a SQL expression >> for the catalog. Until the discussions around FLIP-80 and >> CatalogTableSchema have been finalized. >> > >> > 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >> > We should design the descriptor very close to the SQL syntax. The more >> similar the syntax the more likely it is too keep the new descriptor API >> stable. >> > >> > 6) static method vs new keyword >> > Actually, the `new` keyword was one of the things that bothered me most >> in the old design. Fluent APIs avoid this nowadays. >> > >> > 7) make the descriptors immutable with builders >> > The descriptors are some kind of builders already. But they are not >> called "builder". Instead of coming up with the new concept of a >> "descriptor", we should use terminology that people esp. Java/Scala users >> are familiar with already. >> > >> > We could make the descriptors immutable to pass them around easily. >> > >> > Btw "Connector" and "Format" should always be in the classname. This >> was also a mistake in the past. Instead of calling the descriptor just >> `Kafka` we could call it `KafkaConnector`. An entire example could look >> like: >> > >> > tEnv.createTemporaryTable( >> > "OrdersInKafka", >> > KafkaConnector.newBuilder() // builder pattern supported by IDE >> > .topic("user_logs") >> > .property("bootstrap.servers", "localhost:9092") >> > .property("group.id", "test-group") >> > .format(JsonFormat.newInstance()) // shortcut for no parameters >> > .schema( >> > Schema.newBuilder() >> > .column("user_id", DataTypes.BIGINT()) >> > .column("score", DataTypes.DECIMAL(10, 2)) >> > .column("log_ts", DataTypes.TIMESTAMP(3)) >> > .column("my_ts", toTimestamp($("log_ts")) >> > .build() >> > ) >> > .build() >> > ); >> > >> > Instead of refacoring the existing classes, we could also think about a >> completly new stack. I think this would avoid confusion for the old users. >> We could deprecate the entire `Kafka` class instead of dealing with >> backwards compatibility. >> > >> > 8) minor extensions >> > A general `Connector.option(...)` class should also accept >> `ConfigOption` instead of only strings. >> > A `Schema.column()` should accept `AbstractDataType` that can be >> resolved to a `DataType` by access to a `DataTypeFactory`. >> > >> > What do you think? >> > >> > Thanks, >> > Timo >> > >> > >> > On 09.07.20 18:51, Jark Wu wrote: >> >> Hi Dawid, >> >> Thanks for the great feedback! Here are my responses: >> >> 1) computedColumn(..) vs column(..) >> >> I'm fine to use `column(..)` in both cases. >> >> 2) Expression DSL vs pure SQL string for computed columns >> >> This is a good point. Actually, I also prefer to use Expression DSL >> because >> >> this is more Table API style. >> >> However, this requires to modify TableSchema again to accept & expose >> >> Expression as computed columns. >> >> I'm not convinced about this, because AFAIK, we want to have a >> >> CatalogTableSchema to hold this information >> >> and don't want to extend TableSchema. Maybe Timo can give some points >> here. >> >> Besides, this will make the descriptor API can't be persisted in >> Catalog >> >> unless FLIP-80 is done. >> >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >> >> The original intention behind these APIs are providing shortcut APIs >> for >> >> Table API users. >> >> But I'm also fine to only provide the DDL-like methods if you have >> >> concerns. We can discuss shortcuts in the future if users request. >> >> 4) LikeOption >> >> LikeOption.INCLUDING.ALL is a constant (enum values). I have added more >> >> description about this in the FLIP. >> >> 5) implementation? >> >> I don't want to mention too much about implementation details in the >> FLIP >> >> at the beginning, because the API is already very long. >> >> But I also added an "Implementation" section to explain them. >> >> 6) static method vs new keyword >> >> Personally I prefer the new keyword because it makes the API cleaner. >> If we >> >> want remove new keyword and use static methods, we have to: >> >> Either adding a `Schema.builder()/create()` method as the starting >> method, >> >> Or duplicating all the methods as static methods, e.g. we have 12 >> methods >> >> in `Kafka`, any of them can be a starting method, then we will have 24 >> >> methods in `Kafka`. >> >> Both are not good, and it's hard to keep all the descriptors having the >> >> same starting method name, but all the descriptors can start from the >> same >> >> new keyword. >> >> Best, >> >> Jark >> >> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email]> >> >> wrote: >> >>> Correction to my point 4. The example is correct. I did not read it >> >>> carefully enough. Sorry for the confusion. Nevertheless I'd still like >> >>> to see a bit more explanation on the LikeOptions. >> >>> >> >>> On 07/07/2020 04:32, Jark Wu wrote: >> >>>> Hi everyone, >> >>>> >> >>>> Leonard and I prepared a FLIP about refactoring current Descriptor >> API, >> >>>> i.e. TableEnvironment#connect(). We would like to propose a new >> >>> descriptor >> >>>> API to register connectors in Table API. >> >>>> >> >>>> Since Flink 1.9, the community focused more on the new SQL DDL >> feature. >> >>>> After a series of releases, the SQL DDL is powerful and has many rich >> >>>> features now. However, Descriptor API (the >> `TableEnvironment#connect()`) >> >>>> has been stagnant for a long time and missing lots of core features, >> such >> >>>> as computed columns and primary keys. That's frustrating for Table >> API >> >>>> users who want to register tables programmatically. Besides, >> currently, a >> >>>> connector must implement a corresponding Descriptor (e.g. `new >> Kafka()`) >> >>>> before using the "connect" API. Therefore, we hope to reduce this >> effort >> >>>> for connector developers, that custom source/sinks can be registered >> via >> >>>> the descriptor API without implementing a Descriptor. >> >>>> >> >>>> These are the problems we want to resolve in this FLIP. I'm looking >> >>> forward >> >>>> to your comments. >> >>>> >> >>>> >> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >> >>>> >> >>>> Best, >> >>>> Jark >> >>>> >> >>> >> >>> >> > >> >> > > -- > Best, Jingsong Lee > |
Hi all,
After some offline discussion with other people, I'm also fine with using the builder pattern now, even though I still think the `.build()` method is a little verbose in the user code. I have updated the FLIP with following changes: 1) use builder pattern instead of "new" keyword. In order to avoid duplicate code and reduce development burden for connector developers, I introduced abstract classes `TableDescriptorBuilder` and `FormatDescriptorBuilder`. All the common methods are pre-defined in the base builder class, all the custom descriptor builder should extend from the base builder classes. And we can add more methods into the base builder class in the future without changes in the connectors. 2) use Expression instead of SQL expression string for computed column and watermark strategy 3) use `watermark(rowtime, expr)` as the watermark method. 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` 5) drop Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps A full example will look like this: tEnv.createTemporaryTable( "MyTable", KafkaConnector.newBuilder() .version("0.11") .topic("user_logs") .property("bootstrap.servers", "localhost:9092") .property("group.id", "test-group") .startFromEarliest() .sinkPartitionerRoundRobin() .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) .schema( Schema.newBuilder() .column("user_id", DataTypes.BIGINT()) .column("user_name", DataTypes.STRING()) .column("score", DataTypes.DECIMAL(10, 2)) .column("log_ts", DataTypes.STRING()) .column("part_field_0", DataTypes.STRING()) .column("part_field_1", DataTypes.INT()) .column("proc", proctime()) // define a processing-time attribute with column name "proc" .column("ts", toTimestamp($("log_ts"))) .watermark("ts", $("ts").minus(lit(3).seconds())) .primaryKey("user_id") .build()) .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API .build() ); I hope this resolves all your concerns. Welcome for further feedback! Updated FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder POC: https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 Best, Jark On Thu, 16 Jul 2020 at 20:18, Jark Wu <[hidden email]> wrote: > Thank you all for the discussion! > > Here are my comments: > > 2) I agree we should support Expression as a computed column. But I'm in > favor of Leonard's point that maybe we can also support SQL string > expression as a computed column. > Because it also keeps aligned with DDL. The concern for Expression is that > converting Expression to SQL string, or (de)serializing Expression is > another topic not clear and may involve lots of work. > Maybe we can support Expression later if time permits. > > 6,7) I still prefer the "new" keyword over builder. I don't think > immutable is a strong reason. I care more about usability and experience > from users and devs perspective. > - Users need to type more words if using builder: > `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` > - It's more difficult for developers to write a descriptor. 2 classes > (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, > schema, partitionedBy, like, etc..). > With the "new" keyword all the common methods are defined by the > framework. > - It's hard to have the same API style for different connectors, because > the common methods are defined by users. For example, some may have > `withSchema`, `partitionKey`, `withLike`, etc... > > 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`, > but the generic `Connector#option`. This doesn't work when using format > options. > > new Connector("kafka") > .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, > because "kafka" requires "json.ignore-parse-errors" as the option key, not > the "ignore-parse-errors". > > > ======================================== > Hi Timo, regarding having a complete new stack, I have thought about that. > But I still prefer to refactor the existing stack. Reasons: > Because I think it will be more confusing if users will see two similar > stacks and may have many problems if using the wrong class. > For example, we may have two `Schema` and `TableDescriptor` classes. The > `KafkaConnector` can't be used in legacy `connect()` API, > the legacy `Kafka` class can't be used in the new `createTemporaryTable()` > API. > Besides, the existing API has been deprecated in 1.11, I think it's fine > to remove them in 1.12. > > > Best, > Jark > > > On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> wrote: > >> Thanks for the discussion. >> >> Descriptor lacks the watermark and the computed column is too long. >> >> 1) +1 for just `column(...)` >> >> 2) +1 for being consistent with Table API, the Java Table API should be >> Expression DSL. We don't need pure string support, users should just use >> DDL instead. I think this is just a schema descriptor? The schema >> descriptor should be consistent with DDL, so, definitely, it should >> contain computed columns information. >> >> 3) +1 for not containing Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in >> legacy apis. >> >> 6,7) +1 for removing "new" and builder and making it immutable, For Jark, >> the starting method is the static method, the others are not. >> >> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. >> For Leonard, I don't think user needs “json.fail-on-missing-field” rather >> than “fail-on-missing-field”, user should >> need “fail-on-missing-field” rather than “json.fail-on-missing-field", the >> recommended way is "JsonFormat.newInstance().option(....)", should >> configure options in the format scope. >> >> Best, >> Jingsong >> >> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> wrote: >> >>> Thanks Jark bring this discussion and organize the FLIP document. >>> >>> Thanks Dawid and Timo for the feedback. Here are my thoughts. >>> >>> 1) I’m +1 with using column() for both cases. >>> >>> 2) Expression DSL vs pure SQL string for computed columns >>> >>> I think we can support them both and implement the pure SQL String first, >>> I agree that Expression DSL brings more possibility and flexibility, but >>> using SQL string is a more unified way which can reuse most logic with DDL >>> like validation and persist in Catalog, >>> and Converting Expression DSL to SQL Expression is another big topic and >>> I did not figure out a feasible idea until now. >>> So, maybe we can postpone the Expression DSL support considered the >>> reality. >>> >>> 3) Methods Schema#proctime and >>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>> >>> +1 with Dawid’s proposal to offer SQL like methods. >>> Schema() >>> .column("proctime", proctime()); >>> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >>> And we can simplify watermarkFor(“colName”, Expression >>> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I >>> think the later one has can express the meaning of “ WATERMARK FOR >>> column_name AS watermark_strategy_expression“ well. >>> >>> 5)6)7) The new keyword vs the static method vs builder pattern >>> >>> I have not strong tendency, the new keyword and the static method on >>> descriptor can nearly treated as a builder and do same things like >>> builder. >>> For the builder pattern, we will introduce six >>> methods(connector.Builder()、connector.Builder.build(), format.Builder(), >>> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think >>> we could reduce these unnecessary methods. I ‘m slightly +1 for new >>> keyword if we need a choice. >>> >>> 8) `Connector.option(...)` class should also accept `ConfigOption` >>> I’m slightly -1 for this, ConfigOption may not work because the key for >>> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of >>> json, we need “json.fail-on-missing-field” rather than >>> “fail-on-missing-field”. >>> >>> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >>> ConfigOptions >>> .key("fail-on-missing-field") >>> .booleanType() >>> .defaultValue(false) >>> >>> WDYT? >>> >>> Best, >>> Leonard Xu >>> >>> >>> > 在 2020年7月15日,16:37,Timo Walther <[hidden email]> 写道: >>> > >>> > Hi Jark, >>> > >>> > thanks for working on this issue. It is time to fix this last part of >>> inconsistency in the API. I also like the core parts of the FLIP, esp. that >>> TableDescriptor is one entity that can be passed to different methods. Here >>> is some feedback from my side: >>> > >>> > 1) +1 for just `column(...)` >>> > >>> > 2) Expression DSL vs pure SQL string for computed columns >>> > I agree with Dawid. Using the Expression DSL is desireable for a >>> consistent API. Furthermore, otherwise people need to register functions if >>> they want to use them in an expression. Refactoring TableSchema is >>> definitely on the list for 1.12. Maybe we can come up with some >>> intermediate solution where we transform the expression to a SQL expression >>> for the catalog. Until the discussions around FLIP-80 and >>> CatalogTableSchema have been finalized. >>> > >>> > 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>> > We should design the descriptor very close to the SQL syntax. The more >>> similar the syntax the more likely it is too keep the new descriptor API >>> stable. >>> > >>> > 6) static method vs new keyword >>> > Actually, the `new` keyword was one of the things that bothered me >>> most in the old design. Fluent APIs avoid this nowadays. >>> > >>> > 7) make the descriptors immutable with builders >>> > The descriptors are some kind of builders already. But they are not >>> called "builder". Instead of coming up with the new concept of a >>> "descriptor", we should use terminology that people esp. Java/Scala users >>> are familiar with already. >>> > >>> > We could make the descriptors immutable to pass them around easily. >>> > >>> > Btw "Connector" and "Format" should always be in the classname. This >>> was also a mistake in the past. Instead of calling the descriptor just >>> `Kafka` we could call it `KafkaConnector`. An entire example could look >>> like: >>> > >>> > tEnv.createTemporaryTable( >>> > "OrdersInKafka", >>> > KafkaConnector.newBuilder() // builder pattern supported by IDE >>> > .topic("user_logs") >>> > .property("bootstrap.servers", "localhost:9092") >>> > .property("group.id", "test-group") >>> > .format(JsonFormat.newInstance()) // shortcut for no parameters >>> > .schema( >>> > Schema.newBuilder() >>> > .column("user_id", DataTypes.BIGINT()) >>> > .column("score", DataTypes.DECIMAL(10, 2)) >>> > .column("log_ts", DataTypes.TIMESTAMP(3)) >>> > .column("my_ts", toTimestamp($("log_ts")) >>> > .build() >>> > ) >>> > .build() >>> > ); >>> > >>> > Instead of refacoring the existing classes, we could also think about >>> a completly new stack. I think this would avoid confusion for the old >>> users. We could deprecate the entire `Kafka` class instead of dealing with >>> backwards compatibility. >>> > >>> > 8) minor extensions >>> > A general `Connector.option(...)` class should also accept >>> `ConfigOption` instead of only strings. >>> > A `Schema.column()` should accept `AbstractDataType` that can be >>> resolved to a `DataType` by access to a `DataTypeFactory`. >>> > >>> > What do you think? >>> > >>> > Thanks, >>> > Timo >>> > >>> > >>> > On 09.07.20 18:51, Jark Wu wrote: >>> >> Hi Dawid, >>> >> Thanks for the great feedback! Here are my responses: >>> >> 1) computedColumn(..) vs column(..) >>> >> I'm fine to use `column(..)` in both cases. >>> >> 2) Expression DSL vs pure SQL string for computed columns >>> >> This is a good point. Actually, I also prefer to use Expression DSL >>> because >>> >> this is more Table API style. >>> >> However, this requires to modify TableSchema again to accept & expose >>> >> Expression as computed columns. >>> >> I'm not convinced about this, because AFAIK, we want to have a >>> >> CatalogTableSchema to hold this information >>> >> and don't want to extend TableSchema. Maybe Timo can give some points >>> here. >>> >> Besides, this will make the descriptor API can't be persisted in >>> Catalog >>> >> unless FLIP-80 is done. >>> >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>> >> The original intention behind these APIs are providing shortcut APIs >>> for >>> >> Table API users. >>> >> But I'm also fine to only provide the DDL-like methods if you have >>> >> concerns. We can discuss shortcuts in the future if users request. >>> >> 4) LikeOption >>> >> LikeOption.INCLUDING.ALL is a constant (enum values). I have added >>> more >>> >> description about this in the FLIP. >>> >> 5) implementation? >>> >> I don't want to mention too much about implementation details in the >>> FLIP >>> >> at the beginning, because the API is already very long. >>> >> But I also added an "Implementation" section to explain them. >>> >> 6) static method vs new keyword >>> >> Personally I prefer the new keyword because it makes the API cleaner. >>> If we >>> >> want remove new keyword and use static methods, we have to: >>> >> Either adding a `Schema.builder()/create()` method as the starting >>> method, >>> >> Or duplicating all the methods as static methods, e.g. we have 12 >>> methods >>> >> in `Kafka`, any of them can be a starting method, then we will have 24 >>> >> methods in `Kafka`. >>> >> Both are not good, and it's hard to keep all the descriptors having >>> the >>> >> same starting method name, but all the descriptors can start from the >>> same >>> >> new keyword. >>> >> Best, >>> >> Jark >>> >> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email] >>> > >>> >> wrote: >>> >>> Correction to my point 4. The example is correct. I did not read it >>> >>> carefully enough. Sorry for the confusion. Nevertheless I'd still >>> like >>> >>> to see a bit more explanation on the LikeOptions. >>> >>> >>> >>> On 07/07/2020 04:32, Jark Wu wrote: >>> >>>> Hi everyone, >>> >>>> >>> >>>> Leonard and I prepared a FLIP about refactoring current Descriptor >>> API, >>> >>>> i.e. TableEnvironment#connect(). We would like to propose a new >>> >>> descriptor >>> >>>> API to register connectors in Table API. >>> >>>> >>> >>>> Since Flink 1.9, the community focused more on the new SQL DDL >>> feature. >>> >>>> After a series of releases, the SQL DDL is powerful and has many >>> rich >>> >>>> features now. However, Descriptor API (the >>> `TableEnvironment#connect()`) >>> >>>> has been stagnant for a long time and missing lots of core >>> features, such >>> >>>> as computed columns and primary keys. That's frustrating for Table >>> API >>> >>>> users who want to register tables programmatically. Besides, >>> currently, a >>> >>>> connector must implement a corresponding Descriptor (e.g. `new >>> Kafka()`) >>> >>>> before using the "connect" API. Therefore, we hope to reduce this >>> effort >>> >>>> for connector developers, that custom source/sinks can be >>> registered via >>> >>>> the descriptor API without implementing a Descriptor. >>> >>>> >>> >>>> These are the problems we want to resolve in this FLIP. I'm looking >>> >>> forward >>> >>>> to your comments. >>> >>>> >>> >>>> >>> >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >>> >>>> >>> >>>> Best, >>> >>>> Jark >>> >>>> >>> >>> >>> >>> >>> > >>> >>> >> >> -- >> Best, Jingsong Lee >> > |
Hi Jark,
thanks for the update. I think the FLIP is in a really good shape now and ready to be voted. If others have no further comments? I have one last comment around the methods of the descriptor builders. When refactoring classes such as `KafkaConnector` or `ElasticsearchConnector`. We should align the method names with the new property names introduced in FLIP-122: KafkaConnector.newBuilder() // similar to scan.startup.mode=earliest-offset .scanStartupModeEarliest() // similar to sink.partitioner=round-robin .sinkPartitionerRoundRobin() What do you think? Thanks for driving this, Timo On 22.07.20 17:26, Jark Wu wrote: > Hi all, > > After some offline discussion with other people, I'm also fine with using > the builder pattern now, > even though I still think the `.build()` method is a little verbose in the > user code. > > I have updated the FLIP with following changes: > > 1) use builder pattern instead of "new" keyword. In order to avoid > duplicate code and reduce development burden for connector developers, > I introduced abstract classes `TableDescriptorBuilder` and > `FormatDescriptorBuilder`. > All the common methods are pre-defined in the base builder class, all > the custom descriptor builder should extend from the base builder classes. > And we can add more methods into the base builder class in the future > without changes in the connectors. > 2) use Expression instead of SQL expression string for computed column and > watermark strategy > 3) use `watermark(rowtime, expr)` as the watermark method. > 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` > 5) drop Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > > A full example will look like this: > > tEnv.createTemporaryTable( > "MyTable", > KafkaConnector.newBuilder() > .version("0.11") > .topic("user_logs") > .property("bootstrap.servers", "localhost:9092") > .property("group.id", "test-group") > .startFromEarliest() > .sinkPartitionerRoundRobin() > .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) > .schema( > Schema.newBuilder() > .column("user_id", DataTypes.BIGINT()) > .column("user_name", DataTypes.STRING()) > .column("score", DataTypes.DECIMAL(10, 2)) > .column("log_ts", DataTypes.STRING()) > .column("part_field_0", DataTypes.STRING()) > .column("part_field_1", DataTypes.INT()) > .column("proc", proctime()) // define a processing-time > attribute with column name "proc" > .column("ts", toTimestamp($("log_ts"))) > .watermark("ts", $("ts").minus(lit(3).seconds())) > .primaryKey("user_id") > .build()) > .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't > support partitioned table yet, this is just an example for the API > .build() > ); > > I hope this resolves all your concerns. Welcome for further feedback! > > Updated FLIP: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder > > POC: > https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 > > Best, > Jark > > On Thu, 16 Jul 2020 at 20:18, Jark Wu <[hidden email]> wrote: > >> Thank you all for the discussion! >> >> Here are my comments: >> >> 2) I agree we should support Expression as a computed column. But I'm in >> favor of Leonard's point that maybe we can also support SQL string >> expression as a computed column. >> Because it also keeps aligned with DDL. The concern for Expression is that >> converting Expression to SQL string, or (de)serializing Expression is >> another topic not clear and may involve lots of work. >> Maybe we can support Expression later if time permits. >> >> 6,7) I still prefer the "new" keyword over builder. I don't think >> immutable is a strong reason. I care more about usability and experience >> from users and devs perspective. >> - Users need to type more words if using builder: >> `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` >> - It's more difficult for developers to write a descriptor. 2 classes >> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, >> schema, partitionedBy, like, etc..). >> With the "new" keyword all the common methods are defined by the >> framework. >> - It's hard to have the same API style for different connectors, because >> the common methods are defined by users. For example, some may have >> `withSchema`, `partitionKey`, `withLike`, etc... >> >> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`, >> but the generic `Connector#option`. This doesn't work when using format >> options. >> >> new Connector("kafka") >> .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, >> because "kafka" requires "json.ignore-parse-errors" as the option key, not >> the "ignore-parse-errors". >> >> >> ======================================== >> Hi Timo, regarding having a complete new stack, I have thought about that. >> But I still prefer to refactor the existing stack. Reasons: >> Because I think it will be more confusing if users will see two similar >> stacks and may have many problems if using the wrong class. >> For example, we may have two `Schema` and `TableDescriptor` classes. The >> `KafkaConnector` can't be used in legacy `connect()` API, >> the legacy `Kafka` class can't be used in the new `createTemporaryTable()` >> API. >> Besides, the existing API has been deprecated in 1.11, I think it's fine >> to remove them in 1.12. >> >> >> Best, >> Jark >> >> >> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> wrote: >> >>> Thanks for the discussion. >>> >>> Descriptor lacks the watermark and the computed column is too long. >>> >>> 1) +1 for just `column(...)` >>> >>> 2) +1 for being consistent with Table API, the Java Table API should be >>> Expression DSL. We don't need pure string support, users should just use >>> DDL instead. I think this is just a schema descriptor? The schema >>> descriptor should be consistent with DDL, so, definitely, it should >>> contain computed columns information. >>> >>> 3) +1 for not containing Schema#proctime and >>> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in >>> legacy apis. >>> >>> 6,7) +1 for removing "new" and builder and making it immutable, For Jark, >>> the starting method is the static method, the others are not. >>> >>> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. >>> For Leonard, I don't think user needs “json.fail-on-missing-field” rather >>> than “fail-on-missing-field”, user should >>> need “fail-on-missing-field” rather than “json.fail-on-missing-field", the >>> recommended way is "JsonFormat.newInstance().option(....)", should >>> configure options in the format scope. >>> >>> Best, >>> Jingsong >>> >>> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> wrote: >>> >>>> Thanks Jark bring this discussion and organize the FLIP document. >>>> >>>> Thanks Dawid and Timo for the feedback. Here are my thoughts. >>>> >>>> 1) I’m +1 with using column() for both cases. >>>> >>>> 2) Expression DSL vs pure SQL string for computed columns >>>> >>>> I think we can support them both and implement the pure SQL String first, >>>> I agree that Expression DSL brings more possibility and flexibility, but >>>> using SQL string is a more unified way which can reuse most logic with DDL >>>> like validation and persist in Catalog, >>>> and Converting Expression DSL to SQL Expression is another big topic and >>>> I did not figure out a feasible idea until now. >>>> So, maybe we can postpone the Expression DSL support considered the >>>> reality. >>>> >>>> 3) Methods Schema#proctime and >>>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>>> >>>> +1 with Dawid’s proposal to offer SQL like methods. >>>> Schema() >>>> .column("proctime", proctime()); >>>> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >>>> And we can simplify watermarkFor(“colName”, Expression >>>> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I >>>> think the later one has can express the meaning of “ WATERMARK FOR >>>> column_name AS watermark_strategy_expression“ well. >>>> >>>> 5)6)7) The new keyword vs the static method vs builder pattern >>>> >>>> I have not strong tendency, the new keyword and the static method on >>>> descriptor can nearly treated as a builder and do same things like >>>> builder. >>>> For the builder pattern, we will introduce six >>>> methods(connector.Builder()、connector.Builder.build(), format.Builder(), >>>> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think >>>> we could reduce these unnecessary methods. I ‘m slightly +1 for new >>>> keyword if we need a choice. >>>> >>>> 8) `Connector.option(...)` class should also accept `ConfigOption` >>>> I’m slightly -1 for this, ConfigOption may not work because the key for >>>> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of >>>> json, we need “json.fail-on-missing-field” rather than >>>> “fail-on-missing-field”. >>>> >>>> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >>>> ConfigOptions >>>> .key("fail-on-missing-field") >>>> .booleanType() >>>> .defaultValue(false) >>>> >>>> WDYT? >>>> >>>> Best, >>>> Leonard Xu >>>> >>>> >>>>> 在 2020年7月15日,16:37,Timo Walther <[hidden email]> 写道: >>>>> >>>>> Hi Jark, >>>>> >>>>> thanks for working on this issue. It is time to fix this last part of >>>> inconsistency in the API. I also like the core parts of the FLIP, esp. that >>>> TableDescriptor is one entity that can be passed to different methods. Here >>>> is some feedback from my side: >>>>> >>>>> 1) +1 for just `column(...)` >>>>> >>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>> I agree with Dawid. Using the Expression DSL is desireable for a >>>> consistent API. Furthermore, otherwise people need to register functions if >>>> they want to use them in an expression. Refactoring TableSchema is >>>> definitely on the list for 1.12. Maybe we can come up with some >>>> intermediate solution where we transform the expression to a SQL expression >>>> for the catalog. Until the discussions around FLIP-80 and >>>> CatalogTableSchema have been finalized. >>>>> >>>>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>> We should design the descriptor very close to the SQL syntax. The more >>>> similar the syntax the more likely it is too keep the new descriptor API >>>> stable. >>>>> >>>>> 6) static method vs new keyword >>>>> Actually, the `new` keyword was one of the things that bothered me >>>> most in the old design. Fluent APIs avoid this nowadays. >>>>> >>>>> 7) make the descriptors immutable with builders >>>>> The descriptors are some kind of builders already. But they are not >>>> called "builder". Instead of coming up with the new concept of a >>>> "descriptor", we should use terminology that people esp. Java/Scala users >>>> are familiar with already. >>>>> >>>>> We could make the descriptors immutable to pass them around easily. >>>>> >>>>> Btw "Connector" and "Format" should always be in the classname. This >>>> was also a mistake in the past. Instead of calling the descriptor just >>>> `Kafka` we could call it `KafkaConnector`. An entire example could look >>>> like: >>>>> >>>>> tEnv.createTemporaryTable( >>>>> "OrdersInKafka", >>>>> KafkaConnector.newBuilder() // builder pattern supported by IDE >>>>> .topic("user_logs") >>>>> .property("bootstrap.servers", "localhost:9092") >>>>> .property("group.id", "test-group") >>>>> .format(JsonFormat.newInstance()) // shortcut for no parameters >>>>> .schema( >>>>> Schema.newBuilder() >>>>> .column("user_id", DataTypes.BIGINT()) >>>>> .column("score", DataTypes.DECIMAL(10, 2)) >>>>> .column("log_ts", DataTypes.TIMESTAMP(3)) >>>>> .column("my_ts", toTimestamp($("log_ts")) >>>>> .build() >>>>> ) >>>>> .build() >>>>> ); >>>>> >>>>> Instead of refacoring the existing classes, we could also think about >>>> a completly new stack. I think this would avoid confusion for the old >>>> users. We could deprecate the entire `Kafka` class instead of dealing with >>>> backwards compatibility. >>>>> >>>>> 8) minor extensions >>>>> A general `Connector.option(...)` class should also accept >>>> `ConfigOption` instead of only strings. >>>>> A `Schema.column()` should accept `AbstractDataType` that can be >>>> resolved to a `DataType` by access to a `DataTypeFactory`. >>>>> >>>>> What do you think? >>>>> >>>>> Thanks, >>>>> Timo >>>>> >>>>> >>>>> On 09.07.20 18:51, Jark Wu wrote: >>>>>> Hi Dawid, >>>>>> Thanks for the great feedback! Here are my responses: >>>>>> 1) computedColumn(..) vs column(..) >>>>>> I'm fine to use `column(..)` in both cases. >>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>> This is a good point. Actually, I also prefer to use Expression DSL >>>> because >>>>>> this is more Table API style. >>>>>> However, this requires to modify TableSchema again to accept & expose >>>>>> Expression as computed columns. >>>>>> I'm not convinced about this, because AFAIK, we want to have a >>>>>> CatalogTableSchema to hold this information >>>>>> and don't want to extend TableSchema. Maybe Timo can give some points >>>> here. >>>>>> Besides, this will make the descriptor API can't be persisted in >>>> Catalog >>>>>> unless FLIP-80 is done. >>>>>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>> The original intention behind these APIs are providing shortcut APIs >>>> for >>>>>> Table API users. >>>>>> But I'm also fine to only provide the DDL-like methods if you have >>>>>> concerns. We can discuss shortcuts in the future if users request. >>>>>> 4) LikeOption >>>>>> LikeOption.INCLUDING.ALL is a constant (enum values). I have added >>>> more >>>>>> description about this in the FLIP. >>>>>> 5) implementation? >>>>>> I don't want to mention too much about implementation details in the >>>> FLIP >>>>>> at the beginning, because the API is already very long. >>>>>> But I also added an "Implementation" section to explain them. >>>>>> 6) static method vs new keyword >>>>>> Personally I prefer the new keyword because it makes the API cleaner. >>>> If we >>>>>> want remove new keyword and use static methods, we have to: >>>>>> Either adding a `Schema.builder()/create()` method as the starting >>>> method, >>>>>> Or duplicating all the methods as static methods, e.g. we have 12 >>>> methods >>>>>> in `Kafka`, any of them can be a starting method, then we will have 24 >>>>>> methods in `Kafka`. >>>>>> Both are not good, and it's hard to keep all the descriptors having >>>> the >>>>>> same starting method name, but all the descriptors can start from the >>>> same >>>>>> new keyword. >>>>>> Best, >>>>>> Jark >>>>>> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email] >>>>> >>>>>> wrote: >>>>>>> Correction to my point 4. The example is correct. I did not read it >>>>>>> carefully enough. Sorry for the confusion. Nevertheless I'd still >>>> like >>>>>>> to see a bit more explanation on the LikeOptions. >>>>>>> >>>>>>> On 07/07/2020 04:32, Jark Wu wrote: >>>>>>>> Hi everyone, >>>>>>>> >>>>>>>> Leonard and I prepared a FLIP about refactoring current Descriptor >>>> API, >>>>>>>> i.e. TableEnvironment#connect(). We would like to propose a new >>>>>>> descriptor >>>>>>>> API to register connectors in Table API. >>>>>>>> >>>>>>>> Since Flink 1.9, the community focused more on the new SQL DDL >>>> feature. >>>>>>>> After a series of releases, the SQL DDL is powerful and has many >>>> rich >>>>>>>> features now. However, Descriptor API (the >>>> `TableEnvironment#connect()`) >>>>>>>> has been stagnant for a long time and missing lots of core >>>> features, such >>>>>>>> as computed columns and primary keys. That's frustrating for Table >>>> API >>>>>>>> users who want to register tables programmatically. Besides, >>>> currently, a >>>>>>>> connector must implement a corresponding Descriptor (e.g. `new >>>> Kafka()`) >>>>>>>> before using the "connect" API. Therefore, we hope to reduce this >>>> effort >>>>>>>> for connector developers, that custom source/sinks can be >>>> registered via >>>>>>>> the descriptor API without implementing a Descriptor. >>>>>>>> >>>>>>>> These are the problems we want to resolve in this FLIP. I'm looking >>>>>>> forward >>>>>>>> to your comments. >>>>>>>> >>>>>>>> >>>>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >>>>>>>> >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>> >>>> >>> >>> -- >>> Best, Jingsong Lee >>> >> > |
Hi Jark, Thanks for the update. I think the FLIP looks really well on the high level. I have a few comments to the code structure in the FLIP: 1) I really don't like how the TableDescriptor exposes protected
fields. Moreover why do we need to extend from it? I don't think
we need KafkaConnector extends TableDescriptor and alike. We only
need the builders e.g. the KafkaConnectorBuilder. If I understand it correctly this is the interface needed from
the TableEnvironment perspective and it is the contract that the
TableEnvironment expects. I would suggest making it an interface:
@PublicEvolving public
interface TableDescriptor {
List<String>
getPartitionedFields(); Schema getSchema();
Map<String, String>
getOptions();
LikeOption[] getLikeOptions();
String getLikePath(); }
Then the TableDescriptorBuilder would work with an internal implementation of this interface @PublicEvolving public
abstract class TableDescriptorBuilder< BUILDER extends
TableDescriptorBuilder<BUILDER>>
{ private final
InternalTableDescriptor
descriptor = new InternalTableDescriptor(); /** * Returns the this builder instance in
the type of subclass. */ protected abstract BUILDER
self(); /** * Specifies the table schema. */ public BUILDER
schema(Schema schema) { descriptor.schema = schema; return self(); } /** * Specifies the partition keys of this
table. */ public BUILDER
partitionedBy(String... fieldNames) { checkArgument(descriptor.partitionedFields.isEmpty(),
"partitionedBy(...) shouldn't be
called more than once." ); descriptor.partitionedFields.addAll(Arrays.asList(fieldNames)); return self(); } /** * Extends some parts from the original
registered table path. */ public BUILDER
like(String tablePath, LikeOption... likeOptions) { descriptor.likePath = tablePath; descriptor.likeOptions = likeOptions; return self(); } protected BUILDER
option(String key, String value) { descriptor.options.put(key, value); return self(); } /** * Returns created table descriptor. */ public TableDescriptor build() { return descriptor; } }
2) I'm also not the biggest fun of how the LikeOptions are suggested in the doc. Can't we have something more like
3) TableEnvironment#from(descriptor) will register descriptor
under a system generated table path (just like TableImpl#toString)
first, and scan from the table path to derive the Table.
Table#executeInsert() does it in the similar way. I would try not to register the table under a generated table
path. Do we really need that? I am pretty sure we can use the
tables without registering them in a catalog. Similarly to the old
TableSourceQueryOperation. Otherwise looks good
On 23/07/2020 10:35, Timo Walther
wrote:
Hi Jark, signature.asc (849 bytes) Download Attachment |
Hi Timo,
That's a good point I missed in the design. I have updated the FLIP and added a note under the `KafkaConnector` to mention this. I will not list all the method names in the FLIP as the design doc is super long now. ================================================================ Hi Dawid, 1) KafkaConnector not extends TableDescriptor The reason why KafkaConnector extends TableDescriptor is that, a builder pattern "KafkaConnector.newBuilder()...build()" should return "KafkaConnector" in theory. So users can write something like the following code which might be more intuitive. KafkaConnector kafka = KafkaConnector.newBuilder()...build(); tEnv.createTemporaryTable("MyTable", kafka); But I agree connector implementation will be simpler if this is not strongly needed, e.g. we don't need the generic type for descriptor, we don't need to pass the descriptor class in the builder. So I'm also fine to not extend it if others don't against it. What's your opinion here @Timo Walther <[hidden email]> ? 2) LikeOptions I am not very satisfied with the new design. Because the API is not very fluent. Users will be interrupted to consider what the `overwrite()` parameter to be. And the API design doesn't protect users from using the wrong options before running the code. What about to list all possible options in one level? This will be more aligned with SQL DDL and easy to understand and use for users. public enum LikeOption { INCLUDING_ALL, INCLUDING_CONSTRAINTS, INCLUDING_GENERATED, INCLUDING_OPTIONS, INCLUDING_PARTITIONS, INCLUDING_WATERMARKS, EXCLUDING_ALL, EXCLUDING_CONSTRAINTS, EXCLUDING_GENERATED, EXCLUDING_OPTIONS, EXCLUDING_PARTITIONS, EXCLUDING_WATERMARKS, OVERWRITING_GENERATED, OVERWRITING_OPTIONS } 3) register the table under a generated table path I'm afraid we have to do that. The generated table path is still needed for `TableSourceTable#tableIdentifier` which is used to calculate the digest. This requires that the registered table must have an unique identifier. The old `TableSourceQueryOperation` will also generate the identifier according to the hashcode of the TableSource object. However, the generated identifier "Unregistered_TableSource_1234" is still possible to be in conflict with the user's table path. Therefore, I prefer to register the generated name in the (temporary) catalog to throw explicit exceptions, rather than generating a wrong plan. ================================================================ Hi @Leonard Xu <[hidden email]> and @Jingsong Li <[hidden email]> , Do you have other concerns on the latest FLIP and the above discussion? Best, Jark On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz <[hidden email]> wrote: > Hi Jark, > > Thanks for the update. I think the FLIP looks really well on the high > level. > > I have a few comments to the code structure in the FLIP: > > 1) I really don't like how the TableDescriptor exposes protected fields. > Moreover why do we need to extend from it? I don't think we need > KafkaConnector extends TableDescriptor and alike. We only need the builders > e.g. the KafkaConnectorBuilder. > > If I understand it correctly this is the interface needed from the > TableEnvironment perspective and it is the contract that the > TableEnvironment expects. I would suggest making it an interface: > @PublicEvolving > public interface TableDescriptor { > List<String> getPartitionedFields(); > Schema getSchema(); > Map<String, String> getOptions(); > LikeOption[] getLikeOptions(); > String getLikePath(); > } > > Then the TableDescriptorBuilder would work with an internal implementation > of this interface > @PublicEvolving > public abstract class TableDescriptorBuilder<BUILDER extends TableDescriptorBuilder<BUILDER>> > { > > private final InternalTableDescriptor descriptor = new > InternalTableDescriptor(); > > /** > * Returns the this builder instance in the type of subclass. > */ > protected abstract BUILDER self(); > > /** > * Specifies the table schema. > */ > public BUILDER schema(Schema schema) { > descriptor.schema = schema; > return self(); > } > > /** > * Specifies the partition keys of this table. > */ > public BUILDER partitionedBy(String... fieldNames) { > checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) > shouldn't be called more than once."); > descriptor.partitionedFields.addAll(Arrays.asList(fieldNames)); > return self(); > } > > /** > * Extends some parts from the original registered table path. > */ > public BUILDER like(String tablePath, LikeOption... likeOptions) { > descriptor.likePath = tablePath; > descriptor.likeOptions = likeOptions; > return self(); > } > > protected BUILDER option(String key, String value) { > descriptor.options.put(key, value); > return self(); > } > > /** > * Returns created table descriptor. > */ > public TableDescriptor build() { > return descriptor; > } > } > > > 2) I'm also not the biggest fun of how the LikeOptions are suggested in > the doc. Can't we have something more like > > class LikeOption { > > public enum MergingStrategy { > INCLUDING, > EXCLUDING, > OVERWRITING > } > > public enum FeatureOption { > ALL, > CONSTRAINTS, > GENERATED, > OPTIONS, > PARTITIONS, > WATERMARKS > } > > private final MergingStrategy mergingStrategy; > private final FeatureOption featureOption; > > > public static final LikeOption including(FeatureOption option) { > > return new LikeOption(MergingStrategy.INCLUDING, option); > > } > > public static final LikeOption overwriting(FeatureOption option) { > > Preconditions.checkArgument(option != ALL && ...); > > return new LikeOption(MergingStrategy.INCLUDING, option); > > } > > } > > > 3) TableEnvironment#from(descriptor) will register descriptor under a > system generated table path (just like TableImpl#toString) first, and scan > from the table path to derive the Table. Table#executeInsert() does it in > the similar way. > > I would try not to register the table under a generated table path. Do we > really need that? I am pretty sure we can use the tables without > registering them in a catalog. Similarly to the old > TableSourceQueryOperation. > > Otherwise looks good > > Best, > > Dawid > > On 23/07/2020 10:35, Timo Walther wrote: > > Hi Jark, > > thanks for the update. I think the FLIP is in a really good shape now and > ready to be voted. If others have no further comments? > > I have one last comment around the methods of the descriptor builders. > When refactoring classes such as `KafkaConnector` or > `ElasticsearchConnector`. We should align the method names with the new > property names introduced in FLIP-122: > > KafkaConnector.newBuilder() > // similar to scan.startup.mode=earliest-offset > .scanStartupModeEarliest() > // similar to sink.partitioner=round-robin > .sinkPartitionerRoundRobin() > > What do you think? > > Thanks for driving this, > Timo > > > On 22.07.20 17:26, Jark Wu wrote: > > Hi all, > > After some offline discussion with other people, I'm also fine with using > the builder pattern now, > even though I still think the `.build()` method is a little verbose in > the > user code. > > I have updated the FLIP with following changes: > > 1) use builder pattern instead of "new" keyword. In order to avoid > duplicate code and reduce development burden for connector developers, > I introduced abstract classes `TableDescriptorBuilder` and > `FormatDescriptorBuilder`. > All the common methods are pre-defined in the base builder class, all > the custom descriptor builder should extend from the base builder classes. > And we can add more methods into the base builder class in the future > without changes in the connectors. > 2) use Expression instead of SQL expression string for computed column and > watermark strategy > 3) use `watermark(rowtime, expr)` as the watermark method. > 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` > 5) drop Schema#proctime and > Schema#watermarkFor#boundedOutOfOrderTimestamps > > A full example will look like this: > > tEnv.createTemporaryTable( > "MyTable", > KafkaConnector.newBuilder() > .version("0.11") > .topic("user_logs") > .property("bootstrap.servers", "localhost:9092") > .property("group.id", "test-group") > .startFromEarliest() > .sinkPartitionerRoundRobin() > .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) > .schema( > Schema.newBuilder() > .column("user_id", DataTypes.BIGINT()) > .column("user_name", DataTypes.STRING()) > .column("score", DataTypes.DECIMAL(10, 2)) > .column("log_ts", DataTypes.STRING()) > .column("part_field_0", DataTypes.STRING()) > .column("part_field_1", DataTypes.INT()) > .column("proc", proctime()) // define a processing-time > attribute with column name "proc" > .column("ts", toTimestamp($("log_ts"))) > .watermark("ts", $("ts").minus(lit(3).seconds())) > .primaryKey("user_id") > .build()) > .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't > support partitioned table yet, this is just an example for the API > .build() > ); > > I hope this resolves all your concerns. Welcome for further feedback! > > Updated FLIP: > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder > > POC: > > https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 > > Best, > Jark > > On Thu, 16 Jul 2020 at 20:18, Jark Wu <[hidden email]> > <[hidden email]> wrote: > > Thank you all for the discussion! > > Here are my comments: > > 2) I agree we should support Expression as a computed column. But I'm in > favor of Leonard's point that maybe we can also support SQL string > expression as a computed column. > Because it also keeps aligned with DDL. The concern for Expression is that > converting Expression to SQL string, or (de)serializing Expression is > another topic not clear and may involve lots of work. > Maybe we can support Expression later if time permits. > > 6,7) I still prefer the "new" keyword over builder. I don't think > immutable is a strong reason. I care more about usability and experience > from users and devs perspective. > - Users need to type more words if using builder: > `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` > - It's more difficult for developers to write a descriptor. 2 classes > (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, > schema, partitionedBy, like, etc..). > With the "new" keyword all the common methods are defined by the > framework. > - It's hard to have the same API style for different connectors, > because > the common methods are defined by users. For example, some may have > `withSchema`, `partitionKey`, `withLike`, etc... > > 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`, > but the generic `Connector#option`. This doesn't work when using format > options. > > new Connector("kafka") > .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, > because "kafka" requires "json.ignore-parse-errors" as the option key, not > the "ignore-parse-errors". > > > ======================================== > Hi Timo, regarding having a complete new stack, I have thought about that. > But I still prefer to refactor the existing stack. Reasons: > Because I think it will be more confusing if users will see two similar > stacks and may have many problems if using the wrong class. > For example, we may have two `Schema` and `TableDescriptor` classes. The > `KafkaConnector` can't be used in legacy `connect()` API, > the legacy `Kafka` class can't be used in the new `createTemporaryTable()` > API. > Besides, the existing API has been deprecated in 1.11, I think it's fine > to remove them in 1.12. > > > Best, > Jark > > > On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> > <[hidden email]> wrote: > > Thanks for the discussion. > > Descriptor lacks the watermark and the computed column is too long. > > 1) +1 for just `column(...)` > > 2) +1 for being consistent with Table API, the Java Table API should be > Expression DSL. We don't need pure string support, users should just use > DDL instead. I think this is just a schema descriptor? The schema > descriptor should be consistent with DDL, so, definitely, it should > contain computed columns information. > > 3) +1 for not containing Schema#proctime and > Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in > legacy apis. > > 6,7) +1 for removing "new" and builder and making it immutable, For Jark, > the starting method is the static method, the others are not. > > 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. > For Leonard, I don't think user needs “json.fail-on-missing-field” rather > than “fail-on-missing-field”, user should > need “fail-on-missing-field” rather than “json.fail-on-missing-field", the > recommended way is "JsonFormat.newInstance().option(....)", should > configure options in the format scope. > > Best, > Jingsong > > On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> > <[hidden email]> wrote: > > Thanks Jark bring this discussion and organize the FLIP document. > > Thanks Dawid and Timo for the feedback. Here are my thoughts. > > 1) I’m +1 with using column() for both cases. > > 2) Expression DSL vs pure SQL string for computed columns > > I think we can support them both and implement the pure SQL String first, > I agree that Expression DSL brings more possibility and flexibility, but > using SQL string is a more unified way which can reuse most logic with DDL > like validation and persist in Catalog, > and Converting Expression DSL to SQL Expression is another big topic and > I did not figure out a feasible idea until now. > So, maybe we can postpone the Expression DSL support considered the > reality. > > 3) Methods Schema#proctime and > Schema#watermarkFor#boundedOutOfOrderTimestamps > > +1 with Dawid’s proposal to offer SQL like methods. > Schema() > .column("proctime", proctime()); > .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) > And we can simplify watermarkFor(“colName”, Expression > watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I > think the later one has can express the meaning of “ WATERMARK FOR > column_name AS watermark_strategy_expression“ well. > > 5)6)7) The new keyword vs the static method vs builder pattern > > I have not strong tendency, the new keyword and the static method on > descriptor can nearly treated as a builder and do same things like > builder. > For the builder pattern, we will introduce six > methods(connector.Builder()、connector.Builder.build(), format.Builder(), > format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think > we could reduce these unnecessary methods. I ‘m slightly +1 for new > keyword if we need a choice. > > 8) `Connector.option(...)` class should also accept `ConfigOption` > I’m slightly -1 for this, ConfigOption may not work because the key for > format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of > json, we need “json.fail-on-missing-field” rather than > “fail-on-missing-field”. > > public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = > ConfigOptions > .key("fail-on-missing-field") > .booleanType() > .defaultValue(false) > > WDYT? > > Best, > Leonard Xu > > > 在 2020年7月15日,16:37,Timo Walther <[hidden email]> <[hidden email]> > 写道: > > Hi Jark, > > thanks for working on this issue. It is time to fix this last part of > > inconsistency in the API. I also like the core parts of the FLIP, esp. > that > TableDescriptor is one entity that can be passed to different methods. > Here > is some feedback from my side: > > > 1) +1 for just `column(...)` > > 2) Expression DSL vs pure SQL string for computed columns > I agree with Dawid. Using the Expression DSL is desireable for a > > consistent API. Furthermore, otherwise people need to register functions > if > they want to use them in an expression. Refactoring TableSchema is > definitely on the list for 1.12. Maybe we can come up with some > intermediate solution where we transform the expression to a SQL > expression > for the catalog. Until the discussions around FLIP-80 and > CatalogTableSchema have been finalized. > > > 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > We should design the descriptor very close to the SQL syntax. The more > > similar the syntax the more likely it is too keep the new descriptor API > stable. > > > 6) static method vs new keyword > Actually, the `new` keyword was one of the things that bothered me > > most in the old design. Fluent APIs avoid this nowadays. > > > 7) make the descriptors immutable with builders > The descriptors are some kind of builders already. But they are not > > called "builder". Instead of coming up with the new concept of a > "descriptor", we should use terminology that people esp. Java/Scala users > are familiar with already. > > > We could make the descriptors immutable to pass them around easily. > > Btw "Connector" and "Format" should always be in the classname. This > > was also a mistake in the past. Instead of calling the descriptor just > `Kafka` we could call it `KafkaConnector`. An entire example could look > like: > > > tEnv.createTemporaryTable( > "OrdersInKafka", > KafkaConnector.newBuilder() // builder pattern supported by IDE > .topic("user_logs") > .property("bootstrap.servers", "localhost:9092") > .property("group.id", "test-group") > .format(JsonFormat.newInstance()) // shortcut for no parameters > .schema( > Schema.newBuilder() > .column("user_id", DataTypes.BIGINT()) > .column("score", DataTypes.DECIMAL(10, 2)) > .column("log_ts", DataTypes.TIMESTAMP(3)) > .column("my_ts", toTimestamp($("log_ts")) > .build() > ) > .build() > ); > > Instead of refacoring the existing classes, we could also think about > > a completly new stack. I think this would avoid confusion for the old > users. We could deprecate the entire `Kafka` class instead of dealing with > backwards compatibility. > > > 8) minor extensions > A general `Connector.option(...)` class should also accept > > `ConfigOption` instead of only strings. > > A `Schema.column()` should accept `AbstractDataType` that can be > > resolved to a `DataType` by access to a `DataTypeFactory`. > > > What do you think? > > Thanks, > Timo > > > On 09.07.20 18:51, Jark Wu wrote: > > Hi Dawid, > Thanks for the great feedback! Here are my responses: > 1) computedColumn(..) vs column(..) > I'm fine to use `column(..)` in both cases. > 2) Expression DSL vs pure SQL string for computed columns > This is a good point. Actually, I also prefer to use Expression DSL > > because > > this is more Table API style. > However, this requires to modify TableSchema again to accept & expose > Expression as computed columns. > I'm not convinced about this, because AFAIK, we want to have a > CatalogTableSchema to hold this information > and don't want to extend TableSchema. Maybe Timo can give some points > > here. > > Besides, this will make the descriptor API can't be persisted in > > Catalog > > unless FLIP-80 is done. > 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > The original intention behind these APIs are providing shortcut APIs > > for > > Table API users. > But I'm also fine to only provide the DDL-like methods if you have > concerns. We can discuss shortcuts in the future if users request. > 4) LikeOption > LikeOption.INCLUDING.ALL is a constant (enum values). I have added > > more > > description about this in the FLIP. > 5) implementation? > I don't want to mention too much about implementation details in the > > FLIP > > at the beginning, because the API is already very long. > But I also added an "Implementation" section to explain them. > 6) static method vs new keyword > Personally I prefer the new keyword because it makes the API cleaner. > > If we > > want remove new keyword and use static methods, we have to: > Either adding a `Schema.builder()/create()` method as the starting > > method, > > Or duplicating all the methods as static methods, e.g. we have 12 > > methods > > in `Kafka`, any of them can be a starting method, then we will have 24 > methods in `Kafka`. > Both are not good, and it's hard to keep all the descriptors having > > the > > same starting method name, but all the descriptors can start from the > > same > > new keyword. > Best, > Jark > On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email] > > > wrote: > > Correction to my point 4. The example is correct. I did not read it > carefully enough. Sorry for the confusion. Nevertheless I'd still > > like > > to see a bit more explanation on the LikeOptions. > > On 07/07/2020 04:32, Jark Wu wrote: > > Hi everyone, > > Leonard and I prepared a FLIP about refactoring current Descriptor > > API, > > i.e. TableEnvironment#connect(). We would like to propose a new > > descriptor > > API to register connectors in Table API. > > Since Flink 1.9, the community focused more on the new SQL DDL > > feature. > > After a series of releases, the SQL DDL is powerful and has many > > rich > > features now. However, Descriptor API (the > > `TableEnvironment#connect()`) > > has been stagnant for a long time and missing lots of core > > features, such > > as computed columns and primary keys. That's frustrating for Table > > API > > users who want to register tables programmatically. Besides, > > currently, a > > connector must implement a corresponding Descriptor (e.g. `new > > Kafka()`) > > before using the "connect" API. Therefore, we hope to reduce this > > effort > > for connector developers, that custom source/sinks can be > > registered via > > the descriptor API without implementing a Descriptor. > > These are the problems we want to resolve in this FLIP. I'm looking > > forward > > to your comments. > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API > > > Best, > Jark > > > > > > > > -- > Best, Jingsong Lee > > > > > |
Hi Jark,
Ad. 1 Personally I don't see any benefit of having the specific descriptors (KafkaDescriptor/ElasticDescriptor/...). E.g. the KafkaDescriptor would not differ at all from other descriptors. I don't think returning always a TableDescriptor would be confusing for users. Therefore I am very much in favour of simplifying it. Ad. 2 Honestly, here we are discussing personal preferences. Let's go with your original suggestion. I am not strong on this. Ad. 3 My personal take is that in this case the digest should not include a table identifier as we don't have one. I could see the digest being calculated from the properties/CatalogTable. That way you would have the same digest for all inline tables with the same properties. In your approach two scans of the same descriptor will have different digests. (I am not saying this is particularly wrong, but different from scan("test") ). Similarly, we do not register filters/joins/groupings under generated names just to compute the digest. One additional problem you will have is you will need to add handling of that special tables when e.g. listing tables. Nevertheless I don't want to block the progress on this. I'd appreciate though if you consider that again during the implementation. Best, Dawid On 23/07/2020 16:56, Jark Wu wrote: > Hi Timo, > > That's a good point I missed in the design. I have updated the FLIP and > added a note under the `KafkaConnector` to mention this. > I will not list all the method names in the FLIP as the design doc is super > long now. > > ================================================================ > Hi Dawid, > > 1) KafkaConnector not extends TableDescriptor > The reason why KafkaConnector extends TableDescriptor is that, a builder > pattern "KafkaConnector.newBuilder()...build()" should return > "KafkaConnector" in theory. > So users can write something like the following code which might be > more intuitive. > > KafkaConnector kafka = KafkaConnector.newBuilder()...build(); > tEnv.createTemporaryTable("MyTable", kafka); > > But I agree connector implementation will be simpler if this is not > strongly needed, e.g. we don't need the generic type for descriptor, > we don't need to pass the descriptor class in the builder. So I'm also fine > to not extend it if others don't against it. What's your opinion here @Timo > Walther <[hidden email]> ? > > 2) LikeOptions > I am not very satisfied with the new design. Because the API is not very > fluent. Users will be interrupted to consider what the `overwrite()` > parameter to be. > And the API design doesn't protect users from using the wrong options > before running the code. > What about to list all possible options in one level? This will be more > aligned with SQL DDL and easy to understand and use for users. > > public enum LikeOption { > INCLUDING_ALL, > INCLUDING_CONSTRAINTS, > INCLUDING_GENERATED, > INCLUDING_OPTIONS, > INCLUDING_PARTITIONS, > INCLUDING_WATERMARKS, > > EXCLUDING_ALL, > EXCLUDING_CONSTRAINTS, > EXCLUDING_GENERATED, > EXCLUDING_OPTIONS, > EXCLUDING_PARTITIONS, > EXCLUDING_WATERMARKS, > > OVERWRITING_GENERATED, > OVERWRITING_OPTIONS > } > > 3) register the table under a generated table path > I'm afraid we have to do that. The generated table path is still needed for > `TableSourceTable#tableIdentifier` which is used to calculate the digest. > This requires that the registered table must have an unique identifier. The > old `TableSourceQueryOperation` will also generate the identifier according > to the hashcode of the TableSource object. However, the generated > identifier "Unregistered_TableSource_1234" is still possible to be in > conflict with > the user's table path. Therefore, I prefer to register the generated name > in the (temporary) catalog to throw explicit exceptions, rather than > generating a wrong plan. > > ================================================================ > Hi @Leonard Xu <[hidden email]> and @Jingsong Li <[hidden email]> > , > > Do you have other concerns on the latest FLIP and the above discussion? > > Best, > Jark > > On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz <[hidden email]> > wrote: > >> Hi Jark, >> >> Thanks for the update. I think the FLIP looks really well on the high >> level. >> >> I have a few comments to the code structure in the FLIP: >> >> 1) I really don't like how the TableDescriptor exposes protected fields. >> Moreover why do we need to extend from it? I don't think we need >> KafkaConnector extends TableDescriptor and alike. We only need the builders >> e.g. the KafkaConnectorBuilder. >> >> If I understand it correctly this is the interface needed from the >> TableEnvironment perspective and it is the contract that the >> TableEnvironment expects. I would suggest making it an interface: >> @PublicEvolving >> public interface TableDescriptor { >> List<String> getPartitionedFields(); >> Schema getSchema(); >> Map<String, String> getOptions(); >> LikeOption[] getLikeOptions(); >> String getLikePath(); >> } >> >> Then the TableDescriptorBuilder would work with an internal implementation >> of this interface >> @PublicEvolving >> public abstract class TableDescriptorBuilder<BUILDER extends TableDescriptorBuilder<BUILDER>> >> { >> >> private final InternalTableDescriptor descriptor = new >> InternalTableDescriptor(); >> >> /** >> * Returns the this builder instance in the type of subclass. >> */ >> protected abstract BUILDER self(); >> >> /** >> * Specifies the table schema. >> */ >> public BUILDER schema(Schema schema) { >> descriptor.schema = schema; >> return self(); >> } >> >> /** >> * Specifies the partition keys of this table. >> */ >> public BUILDER partitionedBy(String... fieldNames) { >> checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) >> shouldn't be called more than once."); >> descriptor.partitionedFields.addAll(Arrays.asList(fieldNames)); >> return self(); >> } >> >> /** >> * Extends some parts from the original registered table path. >> */ >> public BUILDER like(String tablePath, LikeOption... likeOptions) { >> descriptor.likePath = tablePath; >> descriptor.likeOptions = likeOptions; >> return self(); >> } >> >> protected BUILDER option(String key, String value) { >> descriptor.options.put(key, value); >> return self(); >> } >> >> /** >> * Returns created table descriptor. >> */ >> public TableDescriptor build() { >> return descriptor; >> } >> } >> >> >> 2) I'm also not the biggest fun of how the LikeOptions are suggested in >> the doc. Can't we have something more like >> >> class LikeOption { >> >> public enum MergingStrategy { >> INCLUDING, >> EXCLUDING, >> OVERWRITING >> } >> >> public enum FeatureOption { >> ALL, >> CONSTRAINTS, >> GENERATED, >> OPTIONS, >> PARTITIONS, >> WATERMARKS >> } >> >> private final MergingStrategy mergingStrategy; >> private final FeatureOption featureOption; >> >> >> public static final LikeOption including(FeatureOption option) { >> >> return new LikeOption(MergingStrategy.INCLUDING, option); >> >> } >> >> public static final LikeOption overwriting(FeatureOption option) { >> >> Preconditions.checkArgument(option != ALL && ...); >> >> return new LikeOption(MergingStrategy.INCLUDING, option); >> >> } >> >> } >> >> >> 3) TableEnvironment#from(descriptor) will register descriptor under a >> system generated table path (just like TableImpl#toString) first, and scan >> from the table path to derive the Table. Table#executeInsert() does it in >> the similar way. >> >> I would try not to register the table under a generated table path. Do we >> really need that? I am pretty sure we can use the tables without >> registering them in a catalog. Similarly to the old >> TableSourceQueryOperation. >> >> Otherwise looks good >> >> Best, >> >> Dawid >> >> On 23/07/2020 10:35, Timo Walther wrote: >> >> Hi Jark, >> >> thanks for the update. I think the FLIP is in a really good shape now and >> ready to be voted. If others have no further comments? >> >> I have one last comment around the methods of the descriptor builders. >> When refactoring classes such as `KafkaConnector` or >> `ElasticsearchConnector`. We should align the method names with the new >> property names introduced in FLIP-122: >> >> KafkaConnector.newBuilder() >> // similar to scan.startup.mode=earliest-offset >> .scanStartupModeEarliest() >> // similar to sink.partitioner=round-robin >> .sinkPartitionerRoundRobin() >> >> What do you think? >> >> Thanks for driving this, >> Timo >> >> >> On 22.07.20 17:26, Jark Wu wrote: >> >> Hi all, >> >> After some offline discussion with other people, I'm also fine with using >> the builder pattern now, >> even though I still think the `.build()` method is a little verbose in >> the >> user code. >> >> I have updated the FLIP with following changes: >> >> 1) use builder pattern instead of "new" keyword. In order to avoid >> duplicate code and reduce development burden for connector developers, >> I introduced abstract classes `TableDescriptorBuilder` and >> `FormatDescriptorBuilder`. >> All the common methods are pre-defined in the base builder class, all >> the custom descriptor builder should extend from the base builder classes. >> And we can add more methods into the base builder class in the future >> without changes in the connectors. >> 2) use Expression instead of SQL expression string for computed column and >> watermark strategy >> 3) use `watermark(rowtime, expr)` as the watermark method. >> 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` >> 5) drop Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps >> >> A full example will look like this: >> >> tEnv.createTemporaryTable( >> "MyTable", >> KafkaConnector.newBuilder() >> .version("0.11") >> .topic("user_logs") >> .property("bootstrap.servers", "localhost:9092") >> .property("group.id", "test-group") >> .startFromEarliest() >> .sinkPartitionerRoundRobin() >> .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) >> .schema( >> Schema.newBuilder() >> .column("user_id", DataTypes.BIGINT()) >> .column("user_name", DataTypes.STRING()) >> .column("score", DataTypes.DECIMAL(10, 2)) >> .column("log_ts", DataTypes.STRING()) >> .column("part_field_0", DataTypes.STRING()) >> .column("part_field_1", DataTypes.INT()) >> .column("proc", proctime()) // define a processing-time >> attribute with column name "proc" >> .column("ts", toTimestamp($("log_ts"))) >> .watermark("ts", $("ts").minus(lit(3).seconds())) >> .primaryKey("user_id") >> .build()) >> .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't >> support partitioned table yet, this is just an example for the API >> .build() >> ); >> >> I hope this resolves all your concerns. Welcome for further feedback! >> >> Updated FLIP: >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder >> >> POC: >> >> https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 >> >> Best, >> Jark >> >> On Thu, 16 Jul 2020 at 20:18, Jark Wu <[hidden email]> >> <[hidden email]> wrote: >> >> Thank you all for the discussion! >> >> Here are my comments: >> >> 2) I agree we should support Expression as a computed column. But I'm in >> favor of Leonard's point that maybe we can also support SQL string >> expression as a computed column. >> Because it also keeps aligned with DDL. The concern for Expression is that >> converting Expression to SQL string, or (de)serializing Expression is >> another topic not clear and may involve lots of work. >> Maybe we can support Expression later if time permits. >> >> 6,7) I still prefer the "new" keyword over builder. I don't think >> immutable is a strong reason. I care more about usability and experience >> from users and devs perspective. >> - Users need to type more words if using builder: >> `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` >> - It's more difficult for developers to write a descriptor. 2 classes >> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, >> schema, partitionedBy, like, etc..). >> With the "new" keyword all the common methods are defined by the >> framework. >> - It's hard to have the same API style for different connectors, >> because >> the common methods are defined by users. For example, some may have >> `withSchema`, `partitionKey`, `withLike`, etc... >> >> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`, >> but the generic `Connector#option`. This doesn't work when using format >> options. >> >> new Connector("kafka") >> .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, >> because "kafka" requires "json.ignore-parse-errors" as the option key, not >> the "ignore-parse-errors". >> >> >> ======================================== >> Hi Timo, regarding having a complete new stack, I have thought about that. >> But I still prefer to refactor the existing stack. Reasons: >> Because I think it will be more confusing if users will see two similar >> stacks and may have many problems if using the wrong class. >> For example, we may have two `Schema` and `TableDescriptor` classes. The >> `KafkaConnector` can't be used in legacy `connect()` API, >> the legacy `Kafka` class can't be used in the new `createTemporaryTable()` >> API. >> Besides, the existing API has been deprecated in 1.11, I think it's fine >> to remove them in 1.12. >> >> >> Best, >> Jark >> >> >> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> >> <[hidden email]> wrote: >> >> Thanks for the discussion. >> >> Descriptor lacks the watermark and the computed column is too long. >> >> 1) +1 for just `column(...)` >> >> 2) +1 for being consistent with Table API, the Java Table API should be >> Expression DSL. We don't need pure string support, users should just use >> DDL instead. I think this is just a schema descriptor? The schema >> descriptor should be consistent with DDL, so, definitely, it should >> contain computed columns information. >> >> 3) +1 for not containing Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in >> legacy apis. >> >> 6,7) +1 for removing "new" and builder and making it immutable, For Jark, >> the starting method is the static method, the others are not. >> >> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. >> For Leonard, I don't think user needs “json.fail-on-missing-field” rather >> than “fail-on-missing-field”, user should >> need “fail-on-missing-field” rather than “json.fail-on-missing-field", the >> recommended way is "JsonFormat.newInstance().option(....)", should >> configure options in the format scope. >> >> Best, >> Jingsong >> >> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> >> <[hidden email]> wrote: >> >> Thanks Jark bring this discussion and organize the FLIP document. >> >> Thanks Dawid and Timo for the feedback. Here are my thoughts. >> >> 1) I’m +1 with using column() for both cases. >> >> 2) Expression DSL vs pure SQL string for computed columns >> >> I think we can support them both and implement the pure SQL String first, >> I agree that Expression DSL brings more possibility and flexibility, but >> using SQL string is a more unified way which can reuse most logic with DDL >> like validation and persist in Catalog, >> and Converting Expression DSL to SQL Expression is another big topic and >> I did not figure out a feasible idea until now. >> So, maybe we can postpone the Expression DSL support considered the >> reality. >> >> 3) Methods Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps >> >> +1 with Dawid’s proposal to offer SQL like methods. >> Schema() >> .column("proctime", proctime()); >> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >> And we can simplify watermarkFor(“colName”, Expression >> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I >> think the later one has can express the meaning of “ WATERMARK FOR >> column_name AS watermark_strategy_expression“ well. >> >> 5)6)7) The new keyword vs the static method vs builder pattern >> >> I have not strong tendency, the new keyword and the static method on >> descriptor can nearly treated as a builder and do same things like >> builder. >> For the builder pattern, we will introduce six >> methods(connector.Builder()、connector.Builder.build(), format.Builder(), >> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think >> we could reduce these unnecessary methods. I ‘m slightly +1 for new >> keyword if we need a choice. >> >> 8) `Connector.option(...)` class should also accept `ConfigOption` >> I’m slightly -1 for this, ConfigOption may not work because the key for >> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of >> json, we need “json.fail-on-missing-field” rather than >> “fail-on-missing-field”. >> >> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >> ConfigOptions >> .key("fail-on-missing-field") >> .booleanType() >> .defaultValue(false) >> >> WDYT? >> >> Best, >> Leonard Xu >> >> >> 在 2020年7月15日,16:37,Timo Walther <[hidden email]> <[hidden email]> >> 写道: >> >> Hi Jark, >> >> thanks for working on this issue. It is time to fix this last part of >> >> inconsistency in the API. I also like the core parts of the FLIP, esp. >> that >> TableDescriptor is one entity that can be passed to different methods. >> Here >> is some feedback from my side: >> >> >> 1) +1 for just `column(...)` >> >> 2) Expression DSL vs pure SQL string for computed columns >> I agree with Dawid. Using the Expression DSL is desireable for a >> >> consistent API. Furthermore, otherwise people need to register functions >> if >> they want to use them in an expression. Refactoring TableSchema is >> definitely on the list for 1.12. Maybe we can come up with some >> intermediate solution where we transform the expression to a SQL >> expression >> for the catalog. Until the discussions around FLIP-80 and >> CatalogTableSchema have been finalized. >> >> >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >> We should design the descriptor very close to the SQL syntax. The more >> >> similar the syntax the more likely it is too keep the new descriptor API >> stable. >> >> >> 6) static method vs new keyword >> Actually, the `new` keyword was one of the things that bothered me >> >> most in the old design. Fluent APIs avoid this nowadays. >> >> >> 7) make the descriptors immutable with builders >> The descriptors are some kind of builders already. But they are not >> >> called "builder". Instead of coming up with the new concept of a >> "descriptor", we should use terminology that people esp. Java/Scala users >> are familiar with already. >> >> >> We could make the descriptors immutable to pass them around easily. >> >> Btw "Connector" and "Format" should always be in the classname. This >> >> was also a mistake in the past. Instead of calling the descriptor just >> `Kafka` we could call it `KafkaConnector`. An entire example could look >> like: >> >> >> tEnv.createTemporaryTable( >> "OrdersInKafka", >> KafkaConnector.newBuilder() // builder pattern supported by IDE >> .topic("user_logs") >> .property("bootstrap.servers", "localhost:9092") >> .property("group.id", "test-group") >> .format(JsonFormat.newInstance()) // shortcut for no parameters >> .schema( >> Schema.newBuilder() >> .column("user_id", DataTypes.BIGINT()) >> .column("score", DataTypes.DECIMAL(10, 2)) >> .column("log_ts", DataTypes.TIMESTAMP(3)) >> .column("my_ts", toTimestamp($("log_ts")) >> .build() >> ) >> .build() >> ); >> >> Instead of refacoring the existing classes, we could also think about >> >> a completly new stack. I think this would avoid confusion for the old >> users. We could deprecate the entire `Kafka` class instead of dealing with >> backwards compatibility. >> >> >> 8) minor extensions >> A general `Connector.option(...)` class should also accept >> >> `ConfigOption` instead of only strings. >> >> A `Schema.column()` should accept `AbstractDataType` that can be >> >> resolved to a `DataType` by access to a `DataTypeFactory`. >> >> >> What do you think? >> >> Thanks, >> Timo >> >> >> On 09.07.20 18:51, Jark Wu wrote: >> >> Hi Dawid, >> Thanks for the great feedback! Here are my responses: >> 1) computedColumn(..) vs column(..) >> I'm fine to use `column(..)` in both cases. >> 2) Expression DSL vs pure SQL string for computed columns >> This is a good point. Actually, I also prefer to use Expression DSL >> >> because >> >> this is more Table API style. >> However, this requires to modify TableSchema again to accept & expose >> Expression as computed columns. >> I'm not convinced about this, because AFAIK, we want to have a >> CatalogTableSchema to hold this information >> and don't want to extend TableSchema. Maybe Timo can give some points >> >> here. >> >> Besides, this will make the descriptor API can't be persisted in >> >> Catalog >> >> unless FLIP-80 is done. >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >> The original intention behind these APIs are providing shortcut APIs >> >> for >> >> Table API users. >> But I'm also fine to only provide the DDL-like methods if you have >> concerns. We can discuss shortcuts in the future if users request. >> 4) LikeOption >> LikeOption.INCLUDING.ALL is a constant (enum values). I have added >> >> more >> >> description about this in the FLIP. >> 5) implementation? >> I don't want to mention too much about implementation details in the >> >> FLIP >> >> at the beginning, because the API is already very long. >> But I also added an "Implementation" section to explain them. >> 6) static method vs new keyword >> Personally I prefer the new keyword because it makes the API cleaner. >> >> If we >> >> want remove new keyword and use static methods, we have to: >> Either adding a `Schema.builder()/create()` method as the starting >> >> method, >> >> Or duplicating all the methods as static methods, e.g. we have 12 >> >> methods >> >> in `Kafka`, any of them can be a starting method, then we will have 24 >> methods in `Kafka`. >> Both are not good, and it's hard to keep all the descriptors having >> >> the >> >> same starting method name, but all the descriptors can start from the >> >> same >> >> new keyword. >> Best, >> Jark >> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email] >> >> >> wrote: >> >> Correction to my point 4. The example is correct. I did not read it >> carefully enough. Sorry for the confusion. Nevertheless I'd still >> >> like >> >> to see a bit more explanation on the LikeOptions. >> >> On 07/07/2020 04:32, Jark Wu wrote: >> >> Hi everyone, >> >> Leonard and I prepared a FLIP about refactoring current Descriptor >> >> API, >> >> i.e. TableEnvironment#connect(). We would like to propose a new >> >> descriptor >> >> API to register connectors in Table API. >> >> Since Flink 1.9, the community focused more on the new SQL DDL >> >> feature. >> >> After a series of releases, the SQL DDL is powerful and has many >> >> rich >> >> features now. However, Descriptor API (the >> >> `TableEnvironment#connect()`) >> >> has been stagnant for a long time and missing lots of core >> >> features, such >> >> as computed columns and primary keys. That's frustrating for Table >> >> API >> >> users who want to register tables programmatically. Besides, >> >> currently, a >> >> connector must implement a corresponding Descriptor (e.g. `new >> >> Kafka()`) >> >> before using the "connect" API. Therefore, we hope to reduce this >> >> effort >> >> for connector developers, that custom source/sinks can be >> >> registered via >> >> the descriptor API without implementing a Descriptor. >> >> These are the problems we want to resolve in this FLIP. I'm looking >> >> forward >> >> to your comments. >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >> >> >> Best, >> Jark >> >> >> >> >> >> >> >> -- >> Best, Jingsong Lee >> >> >> >> >> signature.asc (849 bytes) Download Attachment |
Thanks for the update.
The FLIP looks good, I think it is time to vote. Best, Jingsong On Fri, Jul 24, 2020 at 3:21 PM Dawid Wysakowicz <[hidden email]> wrote: > Hi Jark, > > Ad. 1 > > Personally I don't see any benefit of having the specific descriptors > (KafkaDescriptor/ElasticDescriptor/...). E.g. the KafkaDescriptor would > not differ at all from other descriptors. I don't think returning always > a TableDescriptor would be confusing for users. Therefore I am very much > in favour of simplifying it. > > Ad. 2 > > Honestly, here we are discussing personal preferences. Let's go with > your original suggestion. I am not strong on this. > > Ad. 3 > > My personal take is that in this case the digest should not include a > table identifier as we don't have one. I could see the digest being > calculated from the properties/CatalogTable. That way you would have the > same digest for all inline tables with the same properties. In your > approach two scans of the same descriptor will have different digests. > (I am not saying this is particularly wrong, but different from > scan("test") ). Similarly, we do not register filters/joins/groupings > under generated names just to compute the digest. One additional problem > you will have is you will need to add handling of that special tables > when e.g. listing tables. > > Nevertheless I don't want to block the progress on this. I'd appreciate > though if you consider that again during the implementation. > > Best, > > Dawid > > On 23/07/2020 16:56, Jark Wu wrote: > > Hi Timo, > > > > That's a good point I missed in the design. I have updated the FLIP and > > added a note under the `KafkaConnector` to mention this. > > I will not list all the method names in the FLIP as the design doc is > super > > long now. > > > > ================================================================ > > Hi Dawid, > > > > 1) KafkaConnector not extends TableDescriptor > > The reason why KafkaConnector extends TableDescriptor is that, a builder > > pattern "KafkaConnector.newBuilder()...build()" should return > > "KafkaConnector" in theory. > > So users can write something like the following code which might be > > more intuitive. > > > > KafkaConnector kafka = KafkaConnector.newBuilder()...build(); > > tEnv.createTemporaryTable("MyTable", kafka); > > > > But I agree connector implementation will be simpler if this is not > > strongly needed, e.g. we don't need the generic type for descriptor, > > we don't need to pass the descriptor class in the builder. So I'm also > fine > > to not extend it if others don't against it. What's your opinion here > @Timo > > Walther <[hidden email]> ? > > > > 2) LikeOptions > > I am not very satisfied with the new design. Because the API is not very > > fluent. Users will be interrupted to consider what the `overwrite()` > > parameter to be. > > And the API design doesn't protect users from using the wrong options > > before running the code. > > What about to list all possible options in one level? This will be more > > aligned with SQL DDL and easy to understand and use for users. > > > > public enum LikeOption { > > INCLUDING_ALL, > > INCLUDING_CONSTRAINTS, > > INCLUDING_GENERATED, > > INCLUDING_OPTIONS, > > INCLUDING_PARTITIONS, > > INCLUDING_WATERMARKS, > > > > EXCLUDING_ALL, > > EXCLUDING_CONSTRAINTS, > > EXCLUDING_GENERATED, > > EXCLUDING_OPTIONS, > > EXCLUDING_PARTITIONS, > > EXCLUDING_WATERMARKS, > > > > OVERWRITING_GENERATED, > > OVERWRITING_OPTIONS > > } > > > > 3) register the table under a generated table path > > I'm afraid we have to do that. The generated table path is still needed > for > > `TableSourceTable#tableIdentifier` which is used to calculate the digest. > > This requires that the registered table must have an unique identifier. > The > > old `TableSourceQueryOperation` will also generate the identifier > according > > to the hashcode of the TableSource object. However, the generated > > identifier "Unregistered_TableSource_1234" is still possible to be in > > conflict with > > the user's table path. Therefore, I prefer to register the generated name > > in the (temporary) catalog to throw explicit exceptions, rather than > > generating a wrong plan. > > > > ================================================================ > > Hi @Leonard Xu <[hidden email]> and @Jingsong Li < > [hidden email]> > > , > > > > Do you have other concerns on the latest FLIP and the above discussion? > > > > Best, > > Jark > > > > On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz <[hidden email]> > > wrote: > > > >> Hi Jark, > >> > >> Thanks for the update. I think the FLIP looks really well on the high > >> level. > >> > >> I have a few comments to the code structure in the FLIP: > >> > >> 1) I really don't like how the TableDescriptor exposes protected fields. > >> Moreover why do we need to extend from it? I don't think we need > >> KafkaConnector extends TableDescriptor and alike. We only need the > builders > >> e.g. the KafkaConnectorBuilder. > >> > >> If I understand it correctly this is the interface needed from the > >> TableEnvironment perspective and it is the contract that the > >> TableEnvironment expects. I would suggest making it an interface: > >> @PublicEvolving > >> public interface TableDescriptor { > >> List<String> getPartitionedFields(); > >> Schema getSchema(); > >> Map<String, String> getOptions(); > >> LikeOption[] getLikeOptions(); > >> String getLikePath(); > >> } > >> > >> Then the TableDescriptorBuilder would work with an internal > implementation > >> of this interface > >> @PublicEvolving > >> public abstract class TableDescriptorBuilder<BUILDER extends > TableDescriptorBuilder<BUILDER>> > >> { > >> > >> private final InternalTableDescriptor descriptor = new > >> InternalTableDescriptor(); > >> > >> /** > >> * Returns the this builder instance in the type of subclass. > >> */ > >> protected abstract BUILDER self(); > >> > >> /** > >> * Specifies the table schema. > >> */ > >> public BUILDER schema(Schema schema) { > >> descriptor.schema = schema; > >> return self(); > >> } > >> > >> /** > >> * Specifies the partition keys of this table. > >> */ > >> public BUILDER partitionedBy(String... fieldNames) { > >> checkArgument(descriptor.partitionedFields.isEmpty(), > "partitionedBy(...) > >> shouldn't be called more than once."); > >> descriptor.partitionedFields.addAll(Arrays.asList(fieldNames)); > >> return self(); > >> } > >> > >> /** > >> * Extends some parts from the original registered table path. > >> */ > >> public BUILDER like(String tablePath, LikeOption... likeOptions) { > >> descriptor.likePath = tablePath; > >> descriptor.likeOptions = likeOptions; > >> return self(); > >> } > >> > >> protected BUILDER option(String key, String value) { > >> descriptor.options.put(key, value); > >> return self(); > >> } > >> > >> /** > >> * Returns created table descriptor. > >> */ > >> public TableDescriptor build() { > >> return descriptor; > >> } > >> } > >> > >> > >> 2) I'm also not the biggest fun of how the LikeOptions are suggested in > >> the doc. Can't we have something more like > >> > >> class LikeOption { > >> > >> public enum MergingStrategy { > >> INCLUDING, > >> EXCLUDING, > >> OVERWRITING > >> } > >> > >> public enum FeatureOption { > >> ALL, > >> CONSTRAINTS, > >> GENERATED, > >> OPTIONS, > >> PARTITIONS, > >> WATERMARKS > >> } > >> > >> private final MergingStrategy mergingStrategy; > >> private final FeatureOption featureOption; > >> > >> > >> public static final LikeOption including(FeatureOption option) { > >> > >> return new LikeOption(MergingStrategy.INCLUDING, option); > >> > >> } > >> > >> public static final LikeOption overwriting(FeatureOption option) { > >> > >> Preconditions.checkArgument(option != ALL && ...); > >> > >> return new LikeOption(MergingStrategy.INCLUDING, option); > >> > >> } > >> > >> } > >> > >> > >> 3) TableEnvironment#from(descriptor) will register descriptor under a > >> system generated table path (just like TableImpl#toString) first, and > scan > >> from the table path to derive the Table. Table#executeInsert() does it > in > >> the similar way. > >> > >> I would try not to register the table under a generated table path. Do > we > >> really need that? I am pretty sure we can use the tables without > >> registering them in a catalog. Similarly to the old > >> TableSourceQueryOperation. > >> > >> Otherwise looks good > >> > >> Best, > >> > >> Dawid > >> > >> On 23/07/2020 10:35, Timo Walther wrote: > >> > >> Hi Jark, > >> > >> thanks for the update. I think the FLIP is in a really good shape now > and > >> ready to be voted. If others have no further comments? > >> > >> I have one last comment around the methods of the descriptor builders. > >> When refactoring classes such as `KafkaConnector` or > >> `ElasticsearchConnector`. We should align the method names with the new > >> property names introduced in FLIP-122: > >> > >> KafkaConnector.newBuilder() > >> // similar to scan.startup.mode=earliest-offset > >> .scanStartupModeEarliest() > >> // similar to sink.partitioner=round-robin > >> .sinkPartitionerRoundRobin() > >> > >> What do you think? > >> > >> Thanks for driving this, > >> Timo > >> > >> > >> On 22.07.20 17:26, Jark Wu wrote: > >> > >> Hi all, > >> > >> After some offline discussion with other people, I'm also fine with > using > >> the builder pattern now, > >> even though I still think the `.build()` method is a little verbose in > >> the > >> user code. > >> > >> I have updated the FLIP with following changes: > >> > >> 1) use builder pattern instead of "new" keyword. In order to avoid > >> duplicate code and reduce development burden for connector developers, > >> I introduced abstract classes `TableDescriptorBuilder` and > >> `FormatDescriptorBuilder`. > >> All the common methods are pre-defined in the base builder class, > all > >> the custom descriptor builder should extend from the base builder > classes. > >> And we can add more methods into the base builder class in the > future > >> without changes in the connectors. > >> 2) use Expression instead of SQL expression string for computed column > and > >> watermark strategy > >> 3) use `watermark(rowtime, expr)` as the watermark method. > >> 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` > >> 5) drop Schema#proctime and > >> Schema#watermarkFor#boundedOutOfOrderTimestamps > >> > >> A full example will look like this: > >> > >> tEnv.createTemporaryTable( > >> "MyTable", > >> KafkaConnector.newBuilder() > >> .version("0.11") > >> .topic("user_logs") > >> .property("bootstrap.servers", "localhost:9092") > >> .property("group.id", "test-group") > >> .startFromEarliest() > >> .sinkPartitionerRoundRobin() > >> > .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) > >> .schema( > >> Schema.newBuilder() > >> .column("user_id", DataTypes.BIGINT()) > >> .column("user_name", DataTypes.STRING()) > >> .column("score", DataTypes.DECIMAL(10, 2)) > >> .column("log_ts", DataTypes.STRING()) > >> .column("part_field_0", DataTypes.STRING()) > >> .column("part_field_1", DataTypes.INT()) > >> .column("proc", proctime()) // define a processing-time > >> attribute with column name "proc" > >> .column("ts", toTimestamp($("log_ts"))) > >> .watermark("ts", $("ts").minus(lit(3).seconds())) > >> .primaryKey("user_id") > >> .build()) > >> .partitionedBy("part_field_0", "part_field_1") // Kafka > doesn't > >> support partitioned table yet, this is just an example for the API > >> .build() > >> ); > >> > >> I hope this resolves all your concerns. Welcome for further feedback! > >> > >> Updated FLIP: > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder > >> > >> POC: > >> > >> > https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 > >> > >> Best, > >> Jark > >> > >> On Thu, 16 Jul 2020 at 20:18, Jark Wu <[hidden email]> > >> <[hidden email]> wrote: > >> > >> Thank you all for the discussion! > >> > >> Here are my comments: > >> > >> 2) I agree we should support Expression as a computed column. But I'm in > >> favor of Leonard's point that maybe we can also support SQL string > >> expression as a computed column. > >> Because it also keeps aligned with DDL. The concern for Expression is > that > >> converting Expression to SQL string, or (de)serializing Expression is > >> another topic not clear and may involve lots of work. > >> Maybe we can support Expression later if time permits. > >> > >> 6,7) I still prefer the "new" keyword over builder. I don't think > >> immutable is a strong reason. I care more about usability and experience > >> from users and devs perspective. > >> - Users need to type more words if using builder: > >> `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` > >> - It's more difficult for developers to write a descriptor. 2 > classes > >> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, > >> schema, partitionedBy, like, etc..). > >> With the "new" keyword all the common methods are defined by the > >> framework. > >> - It's hard to have the same API style for different connectors, > >> because > >> the common methods are defined by users. For example, some may have > >> `withSchema`, `partitionKey`, `withLike`, etc... > >> > >> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on > `JsonFormat`, > >> but the generic `Connector#option`. This doesn't work when using format > >> options. > >> > >> new Connector("kafka") > >> .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, > >> because "kafka" requires "json.ignore-parse-errors" as the option key, > not > >> the "ignore-parse-errors". > >> > >> > >> ======================================== > >> Hi Timo, regarding having a complete new stack, I have thought about > that. > >> But I still prefer to refactor the existing stack. Reasons: > >> Because I think it will be more confusing if users will see two similar > >> stacks and may have many problems if using the wrong class. > >> For example, we may have two `Schema` and `TableDescriptor` classes. The > >> `KafkaConnector` can't be used in legacy `connect()` API, > >> the legacy `Kafka` class can't be used in the new > `createTemporaryTable()` > >> API. > >> Besides, the existing API has been deprecated in 1.11, I think it's fine > >> to remove them in 1.12. > >> > >> > >> Best, > >> Jark > >> > >> > >> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> > >> <[hidden email]> wrote: > >> > >> Thanks for the discussion. > >> > >> Descriptor lacks the watermark and the computed column is too long. > >> > >> 1) +1 for just `column(...)` > >> > >> 2) +1 for being consistent with Table API, the Java Table API should be > >> Expression DSL. We don't need pure string support, users should just use > >> DDL instead. I think this is just a schema descriptor? The schema > >> descriptor should be consistent with DDL, so, definitely, it should > >> contain computed columns information. > >> > >> 3) +1 for not containing Schema#proctime and > >> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them > in > >> legacy apis. > >> > >> 6,7) +1 for removing "new" and builder and making it immutable, For > Jark, > >> the starting method is the static method, the others are not. > >> > >> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. > >> For Leonard, I don't think user needs “json.fail-on-missing-field” > rather > >> than “fail-on-missing-field”, user should > >> need “fail-on-missing-field” rather than “json.fail-on-missing-field", > the > >> recommended way is "JsonFormat.newInstance().option(....)", should > >> configure options in the format scope. > >> > >> Best, > >> Jingsong > >> > >> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> > >> <[hidden email]> wrote: > >> > >> Thanks Jark bring this discussion and organize the FLIP document. > >> > >> Thanks Dawid and Timo for the feedback. Here are my thoughts. > >> > >> 1) I’m +1 with using column() for both cases. > >> > >> 2) Expression DSL vs pure SQL string for computed columns > >> > >> I think we can support them both and implement the pure SQL String > first, > >> I agree that Expression DSL brings more possibility and flexibility, but > >> using SQL string is a more unified way which can reuse most logic with > DDL > >> like validation and persist in Catalog, > >> and Converting Expression DSL to SQL Expression is another big topic and > >> I did not figure out a feasible idea until now. > >> So, maybe we can postpone the Expression DSL support considered the > >> reality. > >> > >> 3) Methods Schema#proctime and > >> Schema#watermarkFor#boundedOutOfOrderTimestamps > >> > >> +1 with Dawid’s proposal to offer SQL like methods. > >> Schema() > >> .column("proctime", proctime()); > >> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) > >> And we can simplify watermarkFor(“colName”, Expression > >> watermarkStrategy)to watermark(“colName”, Expression > watermarkStrategy), I > >> think the later one has can express the meaning of “ WATERMARK FOR > >> column_name AS watermark_strategy_expression“ well. > >> > >> 5)6)7) The new keyword vs the static method vs builder pattern > >> > >> I have not strong tendency, the new keyword and the static method on > >> descriptor can nearly treated as a builder and do same things like > >> builder. > >> For the builder pattern, we will introduce six > >> methods(connector.Builder()、connector.Builder.build(), format.Builder(), > >> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I > think > >> we could reduce these unnecessary methods. I ‘m slightly +1 for new > >> keyword if we need a choice. > >> > >> 8) `Connector.option(...)` class should also accept `ConfigOption` > >> I’m slightly -1 for this, ConfigOption may not work because the key for > >> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of > >> json, we need “json.fail-on-missing-field” rather than > >> “fail-on-missing-field”. > >> > >> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = > >> ConfigOptions > >> .key("fail-on-missing-field") > >> .booleanType() > >> .defaultValue(false) > >> > >> WDYT? > >> > >> Best, > >> Leonard Xu > >> > >> > >> 在 2020年7月15日,16:37,Timo Walther <[hidden email]> < > [hidden email]> > >> 写道: > >> > >> Hi Jark, > >> > >> thanks for working on this issue. It is time to fix this last part of > >> > >> inconsistency in the API. I also like the core parts of the FLIP, esp. > >> that > >> TableDescriptor is one entity that can be passed to different methods. > >> Here > >> is some feedback from my side: > >> > >> > >> 1) +1 for just `column(...)` > >> > >> 2) Expression DSL vs pure SQL string for computed columns > >> I agree with Dawid. Using the Expression DSL is desireable for a > >> > >> consistent API. Furthermore, otherwise people need to register functions > >> if > >> they want to use them in an expression. Refactoring TableSchema is > >> definitely on the list for 1.12. Maybe we can come up with some > >> intermediate solution where we transform the expression to a SQL > >> expression > >> for the catalog. Until the discussions around FLIP-80 and > >> CatalogTableSchema have been finalized. > >> > >> > >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > >> We should design the descriptor very close to the SQL syntax. The more > >> > >> similar the syntax the more likely it is too keep the new descriptor API > >> stable. > >> > >> > >> 6) static method vs new keyword > >> Actually, the `new` keyword was one of the things that bothered me > >> > >> most in the old design. Fluent APIs avoid this nowadays. > >> > >> > >> 7) make the descriptors immutable with builders > >> The descriptors are some kind of builders already. But they are not > >> > >> called "builder". Instead of coming up with the new concept of a > >> "descriptor", we should use terminology that people esp. Java/Scala > users > >> are familiar with already. > >> > >> > >> We could make the descriptors immutable to pass them around easily. > >> > >> Btw "Connector" and "Format" should always be in the classname. This > >> > >> was also a mistake in the past. Instead of calling the descriptor just > >> `Kafka` we could call it `KafkaConnector`. An entire example could look > >> like: > >> > >> > >> tEnv.createTemporaryTable( > >> "OrdersInKafka", > >> KafkaConnector.newBuilder() // builder pattern supported by IDE > >> .topic("user_logs") > >> .property("bootstrap.servers", "localhost:9092") > >> .property("group.id", "test-group") > >> .format(JsonFormat.newInstance()) // shortcut for no parameters > >> .schema( > >> Schema.newBuilder() > >> .column("user_id", DataTypes.BIGINT()) > >> .column("score", DataTypes.DECIMAL(10, 2)) > >> .column("log_ts", DataTypes.TIMESTAMP(3)) > >> .column("my_ts", toTimestamp($("log_ts")) > >> .build() > >> ) > >> .build() > >> ); > >> > >> Instead of refacoring the existing classes, we could also think about > >> > >> a completly new stack. I think this would avoid confusion for the old > >> users. We could deprecate the entire `Kafka` class instead of dealing > with > >> backwards compatibility. > >> > >> > >> 8) minor extensions > >> A general `Connector.option(...)` class should also accept > >> > >> `ConfigOption` instead of only strings. > >> > >> A `Schema.column()` should accept `AbstractDataType` that can be > >> > >> resolved to a `DataType` by access to a `DataTypeFactory`. > >> > >> > >> What do you think? > >> > >> Thanks, > >> Timo > >> > >> > >> On 09.07.20 18:51, Jark Wu wrote: > >> > >> Hi Dawid, > >> Thanks for the great feedback! Here are my responses: > >> 1) computedColumn(..) vs column(..) > >> I'm fine to use `column(..)` in both cases. > >> 2) Expression DSL vs pure SQL string for computed columns > >> This is a good point. Actually, I also prefer to use Expression DSL > >> > >> because > >> > >> this is more Table API style. > >> However, this requires to modify TableSchema again to accept & expose > >> Expression as computed columns. > >> I'm not convinced about this, because AFAIK, we want to have a > >> CatalogTableSchema to hold this information > >> and don't want to extend TableSchema. Maybe Timo can give some points > >> > >> here. > >> > >> Besides, this will make the descriptor API can't be persisted in > >> > >> Catalog > >> > >> unless FLIP-80 is done. > >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps > >> The original intention behind these APIs are providing shortcut APIs > >> > >> for > >> > >> Table API users. > >> But I'm also fine to only provide the DDL-like methods if you have > >> concerns. We can discuss shortcuts in the future if users request. > >> 4) LikeOption > >> LikeOption.INCLUDING.ALL is a constant (enum values). I have added > >> > >> more > >> > >> description about this in the FLIP. > >> 5) implementation? > >> I don't want to mention too much about implementation details in the > >> > >> FLIP > >> > >> at the beginning, because the API is already very long. > >> But I also added an "Implementation" section to explain them. > >> 6) static method vs new keyword > >> Personally I prefer the new keyword because it makes the API cleaner. > >> > >> If we > >> > >> want remove new keyword and use static methods, we have to: > >> Either adding a `Schema.builder()/create()` method as the starting > >> > >> method, > >> > >> Or duplicating all the methods as static methods, e.g. we have 12 > >> > >> methods > >> > >> in `Kafka`, any of them can be a starting method, then we will have 24 > >> methods in `Kafka`. > >> Both are not good, and it's hard to keep all the descriptors having > >> > >> the > >> > >> same starting method name, but all the descriptors can start from the > >> > >> same > >> > >> new keyword. > >> Best, > >> Jark > >> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email] > >> > >> > >> wrote: > >> > >> Correction to my point 4. The example is correct. I did not read it > >> carefully enough. Sorry for the confusion. Nevertheless I'd still > >> > >> like > >> > >> to see a bit more explanation on the LikeOptions. > >> > >> On 07/07/2020 04:32, Jark Wu wrote: > >> > >> Hi everyone, > >> > >> Leonard and I prepared a FLIP about refactoring current Descriptor > >> > >> API, > >> > >> i.e. TableEnvironment#connect(). We would like to propose a new > >> > >> descriptor > >> > >> API to register connectors in Table API. > >> > >> Since Flink 1.9, the community focused more on the new SQL DDL > >> > >> feature. > >> > >> After a series of releases, the SQL DDL is powerful and has many > >> > >> rich > >> > >> features now. However, Descriptor API (the > >> > >> `TableEnvironment#connect()`) > >> > >> has been stagnant for a long time and missing lots of core > >> > >> features, such > >> > >> as computed columns and primary keys. That's frustrating for Table > >> > >> API > >> > >> users who want to register tables programmatically. Besides, > >> > >> currently, a > >> > >> connector must implement a corresponding Descriptor (e.g. `new > >> > >> Kafka()`) > >> > >> before using the "connect" API. Therefore, we hope to reduce this > >> > >> effort > >> > >> for connector developers, that custom source/sinks can be > >> > >> registered via > >> > >> the descriptor API without implementing a Descriptor. > >> > >> These are the problems we want to resolve in this FLIP. I'm looking > >> > >> forward > >> > >> to your comments. > >> > >> > >> > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API > >> > >> > >> Best, > >> Jark > >> > >> > >> > >> > >> > >> > >> > >> -- > >> Best, Jingsong Lee > >> > >> > >> > >> > >> > > -- Best, Jingsong Lee |
In reply to this post by Jark Wu-2
Thanks Jark for the update.
The latest FLIP looks well. I like Dawid’s proposal of TableDescriptor. Best Leonard Xu > 在 2020年7月23日,22:56,Jark Wu <[hidden email]> 写道: > > Hi Timo, > > That's a good point I missed in the design. I have updated the FLIP and added a note under the `KafkaConnector` to mention this. > I will not list all the method names in the FLIP as the design doc is super long now. > > ================================================================ > Hi Dawid, > > 1) KafkaConnector not extends TableDescriptor > The reason why KafkaConnector extends TableDescriptor is that, a builder pattern "KafkaConnector.newBuilder()...build()" should return "KafkaConnector" in theory. > So users can write something like the following code which might be more intuitive. > > KafkaConnector kafka = KafkaConnector.newBuilder()...build(); > tEnv.createTemporaryTable("MyTable", kafka); > > But I agree connector implementation will be simpler if this is not strongly needed, e.g. we don't need the generic type for descriptor, > we don't need to pass the descriptor class in the builder. So I'm also fine to not extend it if others don't against it. What's your opinion here @Timo Walther <mailto:[hidden email]> ? > > 2) LikeOptions > I am not very satisfied with the new design. Because the API is not very fluent. Users will be interrupted to consider what the `overwrite()` parameter to be. > And the API design doesn't protect users from using the wrong options before running the code. > What about to list all possible options in one level? This will be more aligned with SQL DDL and easy to understand and use for users. > > public enum LikeOption { > INCLUDING_ALL, > INCLUDING_CONSTRAINTS, > INCLUDING_GENERATED, > INCLUDING_OPTIONS, > INCLUDING_PARTITIONS, > INCLUDING_WATERMARKS, > > EXCLUDING_ALL, > EXCLUDING_CONSTRAINTS, > EXCLUDING_GENERATED, > EXCLUDING_OPTIONS, > EXCLUDING_PARTITIONS, > EXCLUDING_WATERMARKS, > > OVERWRITING_GENERATED, > OVERWRITING_OPTIONS > } > > 3) register the table under a generated table path > I'm afraid we have to do that. The generated table path is still needed for `TableSourceTable#tableIdentifier` which is used to calculate the digest. > This requires that the registered table must have an unique identifier. The old `TableSourceQueryOperation` will also generate the identifier according > to the hashcode of the TableSource object. However, the generated identifier "Unregistered_TableSource_1234" is still possible to be in conflict with > the user's table path. Therefore, I prefer to register the generated name in the (temporary) catalog to throw explicit exceptions, rather than generating a wrong plan. > > ================================================================ > Hi @Leonard Xu <mailto:[hidden email]> and @Jingsong Li <mailto:[hidden email]> , > > Do you have other concerns on the latest FLIP and the above discussion? > > Best, > Jark > > On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz <[hidden email] <mailto:[hidden email]>> wrote: > Hi Jark, > > Thanks for the update. I think the FLIP looks really well on the high level. > > I have a few comments to the code structure in the FLIP: > > 1) I really don't like how the TableDescriptor exposes protected fields. Moreover why do we need to extend from it? I don't think we need KafkaConnector extends TableDescriptor and alike. We only need the builders e.g. the KafkaConnectorBuilder. > > If I understand it correctly this is the interface needed from the TableEnvironment perspective and it is the contract that the TableEnvironment expects. I would suggest making it an interface: > > @PublicEvolving > public interface TableDescriptor { > List<String> getPartitionedFields(); > Schema getSchema(); > Map<String, String> getOptions(); > LikeOption[] getLikeOptions(); > String getLikePath(); > } > > Then the TableDescriptorBuilder would work with an internal implementation of this interface > > @PublicEvolving > public abstract class TableDescriptorBuilder<BUILDER extends TableDescriptorBuilder<BUILDER>> { > > private final InternalTableDescriptor descriptor = new InternalTableDescriptor(); > > /** > * Returns the this builder instance in the type of subclass. > */ > protected abstract BUILDER self(); > > /** > * Specifies the table schema. > */ > public BUILDER schema(Schema schema) { > descriptor.schema = schema; > return self(); > } > > /** > * Specifies the partition keys of this table. > */ > public BUILDER partitionedBy(String... fieldNames) { > checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) shouldn't be called more than once."); > descriptor.partitionedFields.addAll(Arrays.asList(fieldNames)); > return self(); > } > > /** > * Extends some parts from the original registered table path. > */ > public BUILDER like(String tablePath, LikeOption... likeOptions) { > descriptor.likePath = tablePath; > descriptor.likeOptions = likeOptions; > return self(); > } > > protected BUILDER option(String key, String value) { > descriptor.options.put(key, value); > return self(); > } > > /** > * Returns created table descriptor. > */ > public TableDescriptor build() { > return descriptor; > } > } > > > 2) I'm also not the biggest fun of how the LikeOptions are suggested in the doc. Can't we have something more like > > class LikeOption { > > public enum MergingStrategy { > INCLUDING, > EXCLUDING, > OVERWRITING > } > > public enum FeatureOption { > ALL, > CONSTRAINTS, > GENERATED, > OPTIONS, > PARTITIONS, > WATERMARKS > } > > private final MergingStrategy mergingStrategy; > private final FeatureOption featureOption; > > > > public static final LikeOption including(FeatureOption option) { > > return new LikeOption(MergingStrategy.INCLUDING, option); > > } > > public static final LikeOption overwriting(FeatureOption option) { > > Preconditions.checkArgument(option != ALL && ...); > > return new LikeOption(MergingStrategy.INCLUDING, option); > > } > > } > > > > 3) TableEnvironment#from(descriptor) will register descriptor under a system generated table path (just like TableImpl#toString) first, and scan from the table path to derive the Table. Table#executeInsert() does it in the similar way. > > I would try not to register the table under a generated table path. Do we really need that? I am pretty sure we can use the tables without registering them in a catalog. Similarly to the old TableSourceQueryOperation. > > Otherwise looks good > > Best, > > Dawid > > > On 23/07/2020 10:35, Timo Walther wrote: >> Hi Jark, >> >> thanks for the update. I think the FLIP is in a really good shape now and ready to be voted. If others have no further comments? >> >> I have one last comment around the methods of the descriptor builders. When refactoring classes such as `KafkaConnector` or `ElasticsearchConnector`. We should align the method names with the new property names introduced in FLIP-122: >> >> KafkaConnector.newBuilder() >> // similar to scan.startup.mode=earliest-offset >> .scanStartupModeEarliest() >> // similar to sink.partitioner=round-robin >> .sinkPartitionerRoundRobin() >> >> What do you think? >> >> Thanks for driving this, >> Timo >> >> >> On 22.07.20 17:26, Jark Wu wrote: >>> Hi all, >>> >>> After some offline discussion with other people, I'm also fine with using >>> the builder pattern now, >>> even though I still think the `.build()` method is a little verbose in the >>> user code. >>> >>> I have updated the FLIP with following changes: >>> >>> 1) use builder pattern instead of "new" keyword. In order to avoid >>> duplicate code and reduce development burden for connector developers, >>> I introduced abstract classes `TableDescriptorBuilder` and >>> `FormatDescriptorBuilder`. >>> All the common methods are pre-defined in the base builder class, all >>> the custom descriptor builder should extend from the base builder classes. >>> And we can add more methods into the base builder class in the future >>> without changes in the connectors. >>> 2) use Expression instead of SQL expression string for computed column and >>> watermark strategy >>> 3) use `watermark(rowtime, expr)` as the watermark method. >>> 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` >>> 5) drop Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>> >>> A full example will look like this: >>> >>> tEnv.createTemporaryTable( >>> "MyTable", >>> KafkaConnector.newBuilder() >>> .version("0.11") >>> .topic("user_logs") >>> .property("bootstrap.servers", "localhost:9092") >>> .property("group.id <http://group.id/>", "test-group") >>> .startFromEarliest() >>> .sinkPartitionerRoundRobin() >>> .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) >>> .schema( >>> Schema.newBuilder() >>> .column("user_id", DataTypes.BIGINT()) >>> .column("user_name", DataTypes.STRING()) >>> .column("score", DataTypes.DECIMAL(10, 2)) >>> .column("log_ts", DataTypes.STRING()) >>> .column("part_field_0", DataTypes.STRING()) >>> .column("part_field_1", DataTypes.INT()) >>> .column("proc", proctime()) // define a processing-time >>> attribute with column name "proc" >>> .column("ts", toTimestamp($("log_ts"))) >>> .watermark("ts", $("ts").minus(lit(3).seconds())) >>> .primaryKey("user_id") >>> .build()) >>> .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't >>> support partitioned table yet, this is just an example for the API >>> .build() >>> ); >>> >>> I hope this resolves all your concerns. Welcome for further feedback! >>> >>> Updated FLIP: >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder> >>> >>> POC: >>> https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 <https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3> >>> >>> Best, >>> Jark >>> >>> On Thu, 16 Jul 2020 at 20:18, Jark Wu <[hidden email]> <mailto:[hidden email]> wrote: >>> >>>> Thank you all for the discussion! >>>> >>>> Here are my comments: >>>> >>>> 2) I agree we should support Expression as a computed column. But I'm in >>>> favor of Leonard's point that maybe we can also support SQL string >>>> expression as a computed column. >>>> Because it also keeps aligned with DDL. The concern for Expression is that >>>> converting Expression to SQL string, or (de)serializing Expression is >>>> another topic not clear and may involve lots of work. >>>> Maybe we can support Expression later if time permits. >>>> >>>> 6,7) I still prefer the "new" keyword over builder. I don't think >>>> immutable is a strong reason. I care more about usability and experience >>>> from users and devs perspective. >>>> - Users need to type more words if using builder: >>>> `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` >>>> - It's more difficult for developers to write a descriptor. 2 classes >>>> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, >>>> schema, partitionedBy, like, etc..). >>>> With the "new" keyword all the common methods are defined by the >>>> framework. >>>> - It's hard to have the same API style for different connectors, because >>>> the common methods are defined by users. For example, some may have >>>> `withSchema`, `partitionKey`, `withLike`, etc... >>>> >>>> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`, >>>> but the generic `Connector#option`. This doesn't work when using format >>>> options. >>>> >>>> new Connector("kafka") >>>> .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, >>>> because "kafka" requires "json.ignore-parse-errors" as the option key, not >>>> the "ignore-parse-errors". >>>> >>>> >>>> ======================================== >>>> Hi Timo, regarding having a complete new stack, I have thought about that. >>>> But I still prefer to refactor the existing stack. Reasons: >>>> Because I think it will be more confusing if users will see two similar >>>> stacks and may have many problems if using the wrong class. >>>> For example, we may have two `Schema` and `TableDescriptor` classes. The >>>> `KafkaConnector` can't be used in legacy `connect()` API, >>>> the legacy `Kafka` class can't be used in the new `createTemporaryTable()` >>>> API. >>>> Besides, the existing API has been deprecated in 1.11, I think it's fine >>>> to remove them in 1.12. >>>> >>>> >>>> Best, >>>> Jark >>>> >>>> >>>> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> <mailto:[hidden email]> wrote: >>>> >>>>> Thanks for the discussion. >>>>> >>>>> Descriptor lacks the watermark and the computed column is too long. >>>>> >>>>> 1) +1 for just `column(...)` >>>>> >>>>> 2) +1 for being consistent with Table API, the Java Table API should be >>>>> Expression DSL. We don't need pure string support, users should just use >>>>> DDL instead. I think this is just a schema descriptor? The schema >>>>> descriptor should be consistent with DDL, so, definitely, it should >>>>> contain computed columns information. >>>>> >>>>> 3) +1 for not containing Schema#proctime and >>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them in >>>>> legacy apis. >>>>> >>>>> 6,7) +1 for removing "new" and builder and making it immutable, For Jark, >>>>> the starting method is the static method, the others are not. >>>>> >>>>> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. >>>>> For Leonard, I don't think user needs “json.fail-on-missing-field” rather >>>>> than “fail-on-missing-field”, user should >>>>> need “fail-on-missing-field” rather than “json.fail-on-missing-field", the >>>>> recommended way is "JsonFormat.newInstance().option(....)", should >>>>> configure options in the format scope. >>>>> >>>>> Best, >>>>> Jingsong >>>>> >>>>> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> <mailto:[hidden email]> wrote: >>>>> >>>>>> Thanks Jark bring this discussion and organize the FLIP document. >>>>>> >>>>>> Thanks Dawid and Timo for the feedback. Here are my thoughts. >>>>>> >>>>>> 1) I’m +1 with using column() for both cases. >>>>>> >>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>> >>>>>> I think we can support them both and implement the pure SQL String first, >>>>>> I agree that Expression DSL brings more possibility and flexibility, but >>>>>> using SQL string is a more unified way which can reuse most logic with DDL >>>>>> like validation and persist in Catalog, >>>>>> and Converting Expression DSL to SQL Expression is another big topic and >>>>>> I did not figure out a feasible idea until now. >>>>>> So, maybe we can postpone the Expression DSL support considered the >>>>>> reality. >>>>>> >>>>>> 3) Methods Schema#proctime and >>>>>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>> >>>>>> +1 with Dawid’s proposal to offer SQL like methods. >>>>>> Schema() >>>>>> .column("proctime", proctime()); >>>>>> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >>>>>> And we can simplify watermarkFor(“colName”, Expression >>>>>> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), I >>>>>> think the later one has can express the meaning of “ WATERMARK FOR >>>>>> column_name AS watermark_strategy_expression“ well. >>>>>> >>>>>> 5)6)7) The new keyword vs the static method vs builder pattern >>>>>> >>>>>> I have not strong tendency, the new keyword and the static method on >>>>>> descriptor can nearly treated as a builder and do same things like >>>>>> builder. >>>>>> For the builder pattern, we will introduce six >>>>>> methods(connector.Builder()、connector.Builder.build(), format.Builder(), >>>>>> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think >>>>>> we could reduce these unnecessary methods. I ‘m slightly +1 for new >>>>>> keyword if we need a choice. >>>>>> >>>>>> 8) `Connector.option(...)` class should also accept `ConfigOption` >>>>>> I’m slightly -1 for this, ConfigOption may not work because the key for >>>>>> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of >>>>>> json, we need “json.fail-on-missing-field” rather than >>>>>> “fail-on-missing-field”. >>>>>> >>>>>> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >>>>>> ConfigOptions >>>>>> .key("fail-on-missing-field") >>>>>> .booleanType() >>>>>> .defaultValue(false) >>>>>> >>>>>> WDYT? >>>>>> >>>>>> Best, >>>>>> Leonard Xu >>>>>> >>>>>> >>>>>>> 在 2020年7月15日,16:37,Timo Walther <[hidden email]> <mailto:[hidden email]> 写道: >>>>>>> >>>>>>> Hi Jark, >>>>>>> >>>>>>> thanks for working on this issue. It is time to fix this last part of >>>>>> inconsistency in the API. I also like the core parts of the FLIP, esp. that >>>>>> TableDescriptor is one entity that can be passed to different methods. Here >>>>>> is some feedback from my side: >>>>>>> >>>>>>> 1) +1 for just `column(...)` >>>>>>> >>>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>>> I agree with Dawid. Using the Expression DSL is desireable for a >>>>>> consistent API. Furthermore, otherwise people need to register functions if >>>>>> they want to use them in an expression. Refactoring TableSchema is >>>>>> definitely on the list for 1.12. Maybe we can come up with some >>>>>> intermediate solution where we transform the expression to a SQL expression >>>>>> for the catalog. Until the discussions around FLIP-80 and >>>>>> CatalogTableSchema have been finalized. >>>>>>> >>>>>>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>>> We should design the descriptor very close to the SQL syntax. The more >>>>>> similar the syntax the more likely it is too keep the new descriptor API >>>>>> stable. >>>>>>> >>>>>>> 6) static method vs new keyword >>>>>>> Actually, the `new` keyword was one of the things that bothered me >>>>>> most in the old design. Fluent APIs avoid this nowadays. >>>>>>> >>>>>>> 7) make the descriptors immutable with builders >>>>>>> The descriptors are some kind of builders already. But they are not >>>>>> called "builder". Instead of coming up with the new concept of a >>>>>> "descriptor", we should use terminology that people esp. Java/Scala users >>>>>> are familiar with already. >>>>>>> >>>>>>> We could make the descriptors immutable to pass them around easily. >>>>>>> >>>>>>> Btw "Connector" and "Format" should always be in the classname. This >>>>>> was also a mistake in the past. Instead of calling the descriptor just >>>>>> `Kafka` we could call it `KafkaConnector`. An entire example could look >>>>>> like: >>>>>>> >>>>>>> tEnv.createTemporaryTable( >>>>>>> "OrdersInKafka", >>>>>>> KafkaConnector.newBuilder() // builder pattern supported by IDE >>>>>>> .topic("user_logs") >>>>>>> .property("bootstrap.servers", "localhost:9092") >>>>>>> .property("group.id <http://group.id/>", "test-group") >>>>>>> .format(JsonFormat.newInstance()) // shortcut for no parameters >>>>>>> .schema( >>>>>>> Schema.newBuilder() >>>>>>> .column("user_id", DataTypes.BIGINT()) >>>>>>> .column("score", DataTypes.DECIMAL(10, 2)) >>>>>>> .column("log_ts", DataTypes.TIMESTAMP(3)) >>>>>>> .column("my_ts", toTimestamp($("log_ts")) >>>>>>> .build() >>>>>>> ) >>>>>>> .build() >>>>>>> ); >>>>>>> >>>>>>> Instead of refacoring the existing classes, we could also think about >>>>>> a completly new stack. I think this would avoid confusion for the old >>>>>> users. We could deprecate the entire `Kafka` class instead of dealing with >>>>>> backwards compatibility. >>>>>>> >>>>>>> 8) minor extensions >>>>>>> A general `Connector.option(...)` class should also accept >>>>>> `ConfigOption` instead of only strings. >>>>>>> A `Schema.column()` should accept `AbstractDataType` that can be >>>>>> resolved to a `DataType` by access to a `DataTypeFactory`. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Thanks, >>>>>>> Timo >>>>>>> >>>>>>> >>>>>>> On 09.07.20 18:51, Jark Wu wrote: >>>>>>>> Hi Dawid, >>>>>>>> Thanks for the great feedback! Here are my responses: >>>>>>>> 1) computedColumn(..) vs column(..) >>>>>>>> I'm fine to use `column(..)` in both cases. >>>>>>>> 2) Expression DSL vs pure SQL string for computed columns >>>>>>>> This is a good point. Actually, I also prefer to use Expression DSL >>>>>> because >>>>>>>> this is more Table API style. >>>>>>>> However, this requires to modify TableSchema again to accept & expose >>>>>>>> Expression as computed columns. >>>>>>>> I'm not convinced about this, because AFAIK, we want to have a >>>>>>>> CatalogTableSchema to hold this information >>>>>>>> and don't want to extend TableSchema. Maybe Timo can give some points >>>>>> here. >>>>>>>> Besides, this will make the descriptor API can't be persisted in >>>>>> Catalog >>>>>>>> unless FLIP-80 is done. >>>>>>>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>>>>>>> The original intention behind these APIs are providing shortcut APIs >>>>>> for >>>>>>>> Table API users. >>>>>>>> But I'm also fine to only provide the DDL-like methods if you have >>>>>>>> concerns. We can discuss shortcuts in the future if users request. >>>>>>>> 4) LikeOption >>>>>>>> LikeOption.INCLUDING.ALL is a constant (enum values). I have added >>>>>> more >>>>>>>> description about this in the FLIP. >>>>>>>> 5) implementation? >>>>>>>> I don't want to mention too much about implementation details in the >>>>>> FLIP >>>>>>>> at the beginning, because the API is already very long. >>>>>>>> But I also added an "Implementation" section to explain them. >>>>>>>> 6) static method vs new keyword >>>>>>>> Personally I prefer the new keyword because it makes the API cleaner. >>>>>> If we >>>>>>>> want remove new keyword and use static methods, we have to: >>>>>>>> Either adding a `Schema.builder()/create()` method as the starting >>>>>> method, >>>>>>>> Or duplicating all the methods as static methods, e.g. we have 12 >>>>>> methods >>>>>>>> in `Kafka`, any of them can be a starting method, then we will have 24 >>>>>>>> methods in `Kafka`. >>>>>>>> Both are not good, and it's hard to keep all the descriptors having >>>>>> the >>>>>>>> same starting method name, but all the descriptors can start from the >>>>>> same >>>>>>>> new keyword. >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email] <mailto:[hidden email]> >>>>>>> >>>>>>>> wrote: >>>>>>>>> Correction to my point 4. The example is correct. I did not read it >>>>>>>>> carefully enough. Sorry for the confusion. Nevertheless I'd still >>>>>> like >>>>>>>>> to see a bit more explanation on the LikeOptions. >>>>>>>>> >>>>>>>>> On 07/07/2020 04:32, Jark Wu wrote: >>>>>>>>>> Hi everyone, >>>>>>>>>> >>>>>>>>>> Leonard and I prepared a FLIP about refactoring current Descriptor >>>>>> API, >>>>>>>>>> i.e. TableEnvironment#connect(). We would like to propose a new >>>>>>>>> descriptor >>>>>>>>>> API to register connectors in Table API. >>>>>>>>>> >>>>>>>>>> Since Flink 1.9, the community focused more on the new SQL DDL >>>>>> feature. >>>>>>>>>> After a series of releases, the SQL DDL is powerful and has many >>>>>> rich >>>>>>>>>> features now. However, Descriptor API (the >>>>>> `TableEnvironment#connect()`) >>>>>>>>>> has been stagnant for a long time and missing lots of core >>>>>> features, such >>>>>>>>>> as computed columns and primary keys. That's frustrating for Table >>>>>> API >>>>>>>>>> users who want to register tables programmatically. Besides, >>>>>> currently, a >>>>>>>>>> connector must implement a corresponding Descriptor (e.g. `new >>>>>> Kafka()`) >>>>>>>>>> before using the "connect" API. Therefore, we hope to reduce this >>>>>> effort >>>>>>>>>> for connector developers, that custom source/sinks can be >>>>>> registered via >>>>>>>>>> the descriptor API without implementing a Descriptor. >>>>>>>>>> >>>>>>>>>> These are the problems we want to resolve in this FLIP. I'm looking >>>>>>>>> forward >>>>>>>>>> to your comments. >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jark >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>> >> |
Thanks Dawid,
Regarding (3), I think you mentioned it will affect other behavior, e.g. listing tables, is a strong reason. I will at least consider implementing it in the `TableSourceQueryOperation` way to avoid affecting other behaviors. I have updated the FLIP. Anyway, this is implementation detail, we can continue this discussion in JIRA and code review. Thank you all for the response. It seems that everyone participated in the discussion has reached a consensus. I will start a vote later today. Best, Jark On Fri, 24 Jul 2020 at 16:03, Leonard Xu <[hidden email]> wrote: > Thanks Jark for the update. > > The latest FLIP looks well. > > I like Dawid’s proposal of TableDescriptor. > > Best > Leonard Xu > > 在 2020年7月23日,22:56,Jark Wu <[hidden email]> 写道: > > Hi Timo, > > That's a good point I missed in the design. I have updated the FLIP and > added a note under the `KafkaConnector` to mention this. > I will not list all the method names in the FLIP as the design doc is > super long now. > > ================================================================ > Hi Dawid, > > 1) KafkaConnector not extends TableDescriptor > The reason why KafkaConnector extends TableDescriptor is that, a builder > pattern "KafkaConnector.newBuilder()...build()" should return > "KafkaConnector" in theory. > So users can write something like the following code which might be > more intuitive. > > KafkaConnector kafka = KafkaConnector.newBuilder()...build(); > tEnv.createTemporaryTable("MyTable", kafka); > > But I agree connector implementation will be simpler if this is not > strongly needed, e.g. we don't need the generic type for descriptor, > we don't need to pass the descriptor class in the builder. So I'm also > fine to not extend it if others don't against it. What's your opinion here @Timo > Walther <[hidden email]> ? > > 2) LikeOptions > I am not very satisfied with the new design. Because the API is not very > fluent. Users will be interrupted to consider what the `overwrite()` > parameter to be. > And the API design doesn't protect users from using the wrong options > before running the code. > What about to list all possible options in one level? This will be more > aligned with SQL DDL and easy to understand and use for users. > > public enum LikeOption { > INCLUDING_ALL, > INCLUDING_CONSTRAINTS, > INCLUDING_GENERATED, > INCLUDING_OPTIONS, > INCLUDING_PARTITIONS, > INCLUDING_WATERMARKS, > > EXCLUDING_ALL, > EXCLUDING_CONSTRAINTS, > EXCLUDING_GENERATED, > EXCLUDING_OPTIONS, > EXCLUDING_PARTITIONS, > EXCLUDING_WATERMARKS, > > OVERWRITING_GENERATED, > OVERWRITING_OPTIONS > } > > 3) register the table under a generated table path > I'm afraid we have to do that. The generated table path is still needed > for `TableSourceTable#tableIdentifier` which is used to calculate the > digest. > This requires that the registered table must have an unique identifier. > The old `TableSourceQueryOperation` will also generate the identifier > according > to the hashcode of the TableSource object. However, the generated > identifier "Unregistered_TableSource_1234" is still possible to be in > conflict with > the user's table path. Therefore, I prefer to register the generated name > in the (temporary) catalog to throw explicit exceptions, rather than > generating a wrong plan. > > ================================================================ > Hi @Leonard Xu <[hidden email]> and @Jingsong Li > <[hidden email]> , > > Do you have other concerns on the latest FLIP and the above discussion? > > Best, > Jark > > On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz <[hidden email]> > wrote: > >> Hi Jark, >> >> Thanks for the update. I think the FLIP looks really well on the high >> level. >> >> I have a few comments to the code structure in the FLIP: >> >> 1) I really don't like how the TableDescriptor exposes protected fields. >> Moreover why do we need to extend from it? I don't think we need >> KafkaConnector extends TableDescriptor and alike. We only need the builders >> e.g. the KafkaConnectorBuilder. >> >> If I understand it correctly this is the interface needed from the >> TableEnvironment perspective and it is the contract that the >> TableEnvironment expects. I would suggest making it an interface: >> @PublicEvolving >> public interface TableDescriptor { >> List<String> getPartitionedFields(); >> Schema getSchema(); >> Map<String, String> getOptions(); >> LikeOption[] getLikeOptions(); >> String getLikePath(); >> } >> >> Then the TableDescriptorBuilder would work with an internal >> implementation of this interface >> @PublicEvolving >> public abstract class TableDescriptorBuilder<BUILDER extends TableDescriptorBuilder<BUILDER>> >> { >> >> private final InternalTableDescriptor descriptor = new >> InternalTableDescriptor(); >> >> /** >> * Returns the this builder instance in the type of subclass. >> */ >> protected abstract BUILDER self(); >> >> /** >> * Specifies the table schema. >> */ >> public BUILDER schema(Schema schema) { >> descriptor.schema = schema; >> return self(); >> } >> >> /** >> * Specifies the partition keys of this table. >> */ >> public BUILDER partitionedBy(String... fieldNames) { >> checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) >> shouldn't be called more than once."); >> descriptor.partitionedFields.addAll(Arrays.asList(fieldNames)); >> return self(); >> } >> >> /** >> * Extends some parts from the original registered table path. >> */ >> public BUILDER like(String tablePath, LikeOption... likeOptions) { >> descriptor.likePath = tablePath; >> descriptor.likeOptions = likeOptions; >> return self(); >> } >> >> protected BUILDER option(String key, String value) { >> descriptor.options.put(key, value); >> return self(); >> } >> >> /** >> * Returns created table descriptor. >> */ >> public TableDescriptor build() { >> return descriptor; >> } >> } >> >> >> 2) I'm also not the biggest fun of how the LikeOptions are suggested in >> the doc. Can't we have something more like >> >> class LikeOption { >> >> public enum MergingStrategy { >> INCLUDING, >> EXCLUDING, >> OVERWRITING >> } >> >> public enum FeatureOption { >> ALL, >> CONSTRAINTS, >> GENERATED, >> OPTIONS, >> PARTITIONS, >> WATERMARKS >> } >> >> private final MergingStrategy mergingStrategy; >> private final FeatureOption featureOption; >> >> >> public static final LikeOption including(FeatureOption option) { >> >> return new LikeOption(MergingStrategy.INCLUDING, option); >> >> } >> >> public static final LikeOption overwriting(FeatureOption option) { >> >> Preconditions.checkArgument(option != ALL && ...); >> >> return new LikeOption(MergingStrategy.INCLUDING, option); >> >> } >> >> } >> >> >> 3) TableEnvironment#from(descriptor) will register descriptor under a >> system generated table path (just like TableImpl#toString) first, and scan >> from the table path to derive the Table. Table#executeInsert() does it in >> the similar way. >> >> I would try not to register the table under a generated table path. Do we >> really need that? I am pretty sure we can use the tables without >> registering them in a catalog. Similarly to the old >> TableSourceQueryOperation. >> >> Otherwise looks good >> >> Best, >> >> Dawid >> >> On 23/07/2020 10:35, Timo Walther wrote: >> >> Hi Jark, >> >> thanks for the update. I think the FLIP is in a really good shape now and >> ready to be voted. If others have no further comments? >> >> I have one last comment around the methods of the descriptor builders. >> When refactoring classes such as `KafkaConnector` or >> `ElasticsearchConnector`. We should align the method names with the new >> property names introduced in FLIP-122: >> >> KafkaConnector.newBuilder() >> // similar to scan.startup.mode=earliest-offset >> .scanStartupModeEarliest() >> // similar to sink.partitioner=round-robin >> .sinkPartitionerRoundRobin() >> >> What do you think? >> >> Thanks for driving this, >> Timo >> >> >> On 22.07.20 17:26, Jark Wu wrote: >> >> Hi all, >> >> After some offline discussion with other people, I'm also fine with using >> the builder pattern now, >> even though I still think the `.build()` method is a little verbose in >> the >> user code. >> >> I have updated the FLIP with following changes: >> >> 1) use builder pattern instead of "new" keyword. In order to avoid >> duplicate code and reduce development burden for connector developers, >> I introduced abstract classes `TableDescriptorBuilder` and >> `FormatDescriptorBuilder`. >> All the common methods are pre-defined in the base builder class, >> all >> the custom descriptor builder should extend from the base builder >> classes. >> And we can add more methods into the base builder class in the >> future >> without changes in the connectors. >> 2) use Expression instead of SQL expression string for computed column >> and >> watermark strategy >> 3) use `watermark(rowtime, expr)` as the watermark method. >> 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` >> 5) drop Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps >> >> A full example will look like this: >> >> tEnv.createTemporaryTable( >> "MyTable", >> KafkaConnector.newBuilder() >> .version("0.11") >> .topic("user_logs") >> .property("bootstrap.servers", "localhost:9092") >> .property("group.id", "test-group") >> .startFromEarliest() >> .sinkPartitionerRoundRobin() >> >> .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) >> .schema( >> Schema.newBuilder() >> .column("user_id", DataTypes.BIGINT()) >> .column("user_name", DataTypes.STRING()) >> .column("score", DataTypes.DECIMAL(10, 2)) >> .column("log_ts", DataTypes.STRING()) >> .column("part_field_0", DataTypes.STRING()) >> .column("part_field_1", DataTypes.INT()) >> .column("proc", proctime()) // define a processing-time >> attribute with column name "proc" >> .column("ts", toTimestamp($("log_ts"))) >> .watermark("ts", $("ts").minus(lit(3).seconds())) >> .primaryKey("user_id") >> .build()) >> .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't >> support partitioned table yet, this is just an example for the API >> .build() >> ); >> >> I hope this resolves all your concerns. Welcome for further feedback! >> >> Updated FLIP: >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder >> >> POC: >> >> https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 >> >> Best, >> Jark >> >> On Thu, 16 Jul 2020 at 20:18, Jark Wu <[hidden email]> >> <[hidden email]> wrote: >> >> Thank you all for the discussion! >> >> Here are my comments: >> >> 2) I agree we should support Expression as a computed column. But I'm in >> favor of Leonard's point that maybe we can also support SQL string >> expression as a computed column. >> Because it also keeps aligned with DDL. The concern for Expression is >> that >> converting Expression to SQL string, or (de)serializing Expression is >> another topic not clear and may involve lots of work. >> Maybe we can support Expression later if time permits. >> >> 6,7) I still prefer the "new" keyword over builder. I don't think >> immutable is a strong reason. I care more about usability and experience >> from users and devs perspective. >> - Users need to type more words if using builder: >> `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` >> - It's more difficult for developers to write a descriptor. 2 classes >> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, >> schema, partitionedBy, like, etc..). >> With the "new" keyword all the common methods are defined by the >> framework. >> - It's hard to have the same API style for different connectors, >> because >> the common methods are defined by users. For example, some may have >> `withSchema`, `partitionKey`, `withLike`, etc... >> >> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on >> `JsonFormat`, >> but the generic `Connector#option`. This doesn't work when using format >> options. >> >> new Connector("kafka") >> .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, >> because "kafka" requires "json.ignore-parse-errors" as the option key, >> not >> the "ignore-parse-errors". >> >> >> ======================================== >> Hi Timo, regarding having a complete new stack, I have thought about >> that. >> But I still prefer to refactor the existing stack. Reasons: >> Because I think it will be more confusing if users will see two similar >> stacks and may have many problems if using the wrong class. >> For example, we may have two `Schema` and `TableDescriptor` classes. The >> `KafkaConnector` can't be used in legacy `connect()` API, >> the legacy `Kafka` class can't be used in the new >> `createTemporaryTable()` >> API. >> Besides, the existing API has been deprecated in 1.11, I think it's fine >> to remove them in 1.12. >> >> >> Best, >> Jark >> >> >> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> >> <[hidden email]> wrote: >> >> Thanks for the discussion. >> >> Descriptor lacks the watermark and the computed column is too long. >> >> 1) +1 for just `column(...)` >> >> 2) +1 for being consistent with Table API, the Java Table API should be >> Expression DSL. We don't need pure string support, users should just use >> DDL instead. I think this is just a schema descriptor? The schema >> descriptor should be consistent with DDL, so, definitely, it should >> contain computed columns information. >> >> 3) +1 for not containing Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them >> in >> legacy apis. >> >> 6,7) +1 for removing "new" and builder and making it immutable, For Jark, >> the starting method is the static method, the others are not. >> >> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. >> For Leonard, I don't think user needs “json.fail-on-missing-field” rather >> than “fail-on-missing-field”, user should >> need “fail-on-missing-field” rather than “json.fail-on-missing-field", >> the >> recommended way is "JsonFormat.newInstance().option(....)", should >> configure options in the format scope. >> >> Best, >> Jingsong >> >> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> >> <[hidden email]> wrote: >> >> Thanks Jark bring this discussion and organize the FLIP document. >> >> Thanks Dawid and Timo for the feedback. Here are my thoughts. >> >> 1) I’m +1 with using column() for both cases. >> >> 2) Expression DSL vs pure SQL string for computed columns >> >> I think we can support them both and implement the pure SQL String first, >> I agree that Expression DSL brings more possibility and flexibility, but >> using SQL string is a more unified way which can reuse most logic with >> DDL >> like validation and persist in Catalog, >> and Converting Expression DSL to SQL Expression is another big topic and >> I did not figure out a feasible idea until now. >> So, maybe we can postpone the Expression DSL support considered the >> reality. >> >> 3) Methods Schema#proctime and >> Schema#watermarkFor#boundedOutOfOrderTimestamps >> >> +1 with Dawid’s proposal to offer SQL like methods. >> Schema() >> .column("proctime", proctime()); >> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >> And we can simplify watermarkFor(“colName”, Expression >> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), >> I >> think the later one has can express the meaning of “ WATERMARK FOR >> column_name AS watermark_strategy_expression“ well. >> >> 5)6)7) The new keyword vs the static method vs builder pattern >> >> I have not strong tendency, the new keyword and the static method on >> descriptor can nearly treated as a builder and do same things like >> builder. >> For the builder pattern, we will introduce six >> methods(connector.Builder()、connector.Builder.build(), format.Builder(), >> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think >> we could reduce these unnecessary methods. I ‘m slightly +1 for new >> keyword if we need a choice. >> >> 8) `Connector.option(...)` class should also accept `ConfigOption` >> I’m slightly -1 for this, ConfigOption may not work because the key for >> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of >> json, we need “json.fail-on-missing-field” rather than >> “fail-on-missing-field”. >> >> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >> ConfigOptions >> .key("fail-on-missing-field") >> .booleanType() >> .defaultValue(false) >> >> WDYT? >> >> Best, >> Leonard Xu >> >> >> 在 2020年7月15日,16:37,Timo Walther <[hidden email]> <[hidden email]> >> 写道: >> >> Hi Jark, >> >> thanks for working on this issue. It is time to fix this last part of >> >> inconsistency in the API. I also like the core parts of the FLIP, esp. >> that >> TableDescriptor is one entity that can be passed to different methods. >> Here >> is some feedback from my side: >> >> >> 1) +1 for just `column(...)` >> >> 2) Expression DSL vs pure SQL string for computed columns >> I agree with Dawid. Using the Expression DSL is desireable for a >> >> consistent API. Furthermore, otherwise people need to register functions >> if >> they want to use them in an expression. Refactoring TableSchema is >> definitely on the list for 1.12. Maybe we can come up with some >> intermediate solution where we transform the expression to a SQL >> expression >> for the catalog. Until the discussions around FLIP-80 and >> CatalogTableSchema have been finalized. >> >> >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >> We should design the descriptor very close to the SQL syntax. The more >> >> similar the syntax the more likely it is too keep the new descriptor API >> stable. >> >> >> 6) static method vs new keyword >> Actually, the `new` keyword was one of the things that bothered me >> >> most in the old design. Fluent APIs avoid this nowadays. >> >> >> 7) make the descriptors immutable with builders >> The descriptors are some kind of builders already. But they are not >> >> called "builder". Instead of coming up with the new concept of a >> "descriptor", we should use terminology that people esp. Java/Scala users >> are familiar with already. >> >> >> We could make the descriptors immutable to pass them around easily. >> >> Btw "Connector" and "Format" should always be in the classname. This >> >> was also a mistake in the past. Instead of calling the descriptor just >> `Kafka` we could call it `KafkaConnector`. An entire example could look >> like: >> >> >> tEnv.createTemporaryTable( >> "OrdersInKafka", >> KafkaConnector.newBuilder() // builder pattern supported by IDE >> .topic("user_logs") >> .property("bootstrap.servers", "localhost:9092") >> .property("group.id", "test-group") >> .format(JsonFormat.newInstance()) // shortcut for no parameters >> .schema( >> Schema.newBuilder() >> .column("user_id", DataTypes.BIGINT()) >> .column("score", DataTypes.DECIMAL(10, 2)) >> .column("log_ts", DataTypes.TIMESTAMP(3)) >> .column("my_ts", toTimestamp($("log_ts")) >> .build() >> ) >> .build() >> ); >> >> Instead of refacoring the existing classes, we could also think about >> >> a completly new stack. I think this would avoid confusion for the old >> users. We could deprecate the entire `Kafka` class instead of dealing >> with >> backwards compatibility. >> >> >> 8) minor extensions >> A general `Connector.option(...)` class should also accept >> >> `ConfigOption` instead of only strings. >> >> A `Schema.column()` should accept `AbstractDataType` that can be >> >> resolved to a `DataType` by access to a `DataTypeFactory`. >> >> >> What do you think? >> >> Thanks, >> Timo >> >> >> On 09.07.20 18:51, Jark Wu wrote: >> >> Hi Dawid, >> Thanks for the great feedback! Here are my responses: >> 1) computedColumn(..) vs column(..) >> I'm fine to use `column(..)` in both cases. >> 2) Expression DSL vs pure SQL string for computed columns >> This is a good point. Actually, I also prefer to use Expression DSL >> >> because >> >> this is more Table API style. >> However, this requires to modify TableSchema again to accept & expose >> Expression as computed columns. >> I'm not convinced about this, because AFAIK, we want to have a >> CatalogTableSchema to hold this information >> and don't want to extend TableSchema. Maybe Timo can give some points >> >> here. >> >> Besides, this will make the descriptor API can't be persisted in >> >> Catalog >> >> unless FLIP-80 is done. >> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >> The original intention behind these APIs are providing shortcut APIs >> >> for >> >> Table API users. >> But I'm also fine to only provide the DDL-like methods if you have >> concerns. We can discuss shortcuts in the future if users request. >> 4) LikeOption >> LikeOption.INCLUDING.ALL is a constant (enum values). I have added >> >> more >> >> description about this in the FLIP. >> 5) implementation? >> I don't want to mention too much about implementation details in the >> >> FLIP >> >> at the beginning, because the API is already very long. >> But I also added an "Implementation" section to explain them. >> 6) static method vs new keyword >> Personally I prefer the new keyword because it makes the API cleaner. >> >> If we >> >> want remove new keyword and use static methods, we have to: >> Either adding a `Schema.builder()/create()` method as the starting >> >> method, >> >> Or duplicating all the methods as static methods, e.g. we have 12 >> >> methods >> >> in `Kafka`, any of them can be a starting method, then we will have 24 >> methods in `Kafka`. >> Both are not good, and it's hard to keep all the descriptors having >> >> the >> >> same starting method name, but all the descriptors can start from the >> >> same >> >> new keyword. >> Best, >> Jark >> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email] >> >> >> wrote: >> >> Correction to my point 4. The example is correct. I did not read it >> carefully enough. Sorry for the confusion. Nevertheless I'd still >> >> like >> >> to see a bit more explanation on the LikeOptions. >> >> On 07/07/2020 04:32, Jark Wu wrote: >> >> Hi everyone, >> >> Leonard and I prepared a FLIP about refactoring current Descriptor >> >> API, >> >> i.e. TableEnvironment#connect(). We would like to propose a new >> >> descriptor >> >> API to register connectors in Table API. >> >> Since Flink 1.9, the community focused more on the new SQL DDL >> >> feature. >> >> After a series of releases, the SQL DDL is powerful and has many >> >> rich >> >> features now. However, Descriptor API (the >> >> `TableEnvironment#connect()`) >> >> has been stagnant for a long time and missing lots of core >> >> features, such >> >> as computed columns and primary keys. That's frustrating for Table >> >> API >> >> users who want to register tables programmatically. Besides, >> >> currently, a >> >> connector must implement a corresponding Descriptor (e.g. `new >> >> Kafka()`) >> >> before using the "connect" API. Therefore, we hope to reduce this >> >> effort >> >> for connector developers, that custom source/sinks can be >> >> registered via >> >> the descriptor API without implementing a Descriptor. >> >> These are the problems we want to resolve in this FLIP. I'm looking >> >> forward >> >> to your comments. >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >> >> >> Best, >> Jark >> >> >> >> >> >> >> >> -- >> Best, Jingsong Lee >> >> >> >> >> > |
I'm jumping in quite late but I think overall this is a very good effort
and it's in very good shape now. Best, Aljoscha On 24.07.20 10:24, Jark Wu wrote: > Thanks Dawid, > > Regarding (3), I think you mentioned it will affect other behavior, e.g. > listing tables, is a strong reason. > I will at least consider implementing it in the `TableSourceQueryOperation` > way to avoid affecting other behaviors. I have updated the FLIP. > Anyway, this is implementation detail, we can continue this discussion in > JIRA and code review. > > Thank you all for the response. > It seems that everyone participated in the discussion has reached a > consensus. > I will start a vote later today. > > Best, > Jark > > On Fri, 24 Jul 2020 at 16:03, Leonard Xu <[hidden email]> wrote: > >> Thanks Jark for the update. >> >> The latest FLIP looks well. >> >> I like Dawid’s proposal of TableDescriptor. >> >> Best >> Leonard Xu >> >> 在 2020年7月23日,22:56,Jark Wu <[hidden email]> 写道: >> >> Hi Timo, >> >> That's a good point I missed in the design. I have updated the FLIP and >> added a note under the `KafkaConnector` to mention this. >> I will not list all the method names in the FLIP as the design doc is >> super long now. >> >> ================================================================ >> Hi Dawid, >> >> 1) KafkaConnector not extends TableDescriptor >> The reason why KafkaConnector extends TableDescriptor is that, a builder >> pattern "KafkaConnector.newBuilder()...build()" should return >> "KafkaConnector" in theory. >> So users can write something like the following code which might be >> more intuitive. >> >> KafkaConnector kafka = KafkaConnector.newBuilder()...build(); >> tEnv.createTemporaryTable("MyTable", kafka); >> >> But I agree connector implementation will be simpler if this is not >> strongly needed, e.g. we don't need the generic type for descriptor, >> we don't need to pass the descriptor class in the builder. So I'm also >> fine to not extend it if others don't against it. What's your opinion here @Timo >> Walther <[hidden email]> ? >> >> 2) LikeOptions >> I am not very satisfied with the new design. Because the API is not very >> fluent. Users will be interrupted to consider what the `overwrite()` >> parameter to be. >> And the API design doesn't protect users from using the wrong options >> before running the code. >> What about to list all possible options in one level? This will be more >> aligned with SQL DDL and easy to understand and use for users. >> >> public enum LikeOption { >> INCLUDING_ALL, >> INCLUDING_CONSTRAINTS, >> INCLUDING_GENERATED, >> INCLUDING_OPTIONS, >> INCLUDING_PARTITIONS, >> INCLUDING_WATERMARKS, >> >> EXCLUDING_ALL, >> EXCLUDING_CONSTRAINTS, >> EXCLUDING_GENERATED, >> EXCLUDING_OPTIONS, >> EXCLUDING_PARTITIONS, >> EXCLUDING_WATERMARKS, >> >> OVERWRITING_GENERATED, >> OVERWRITING_OPTIONS >> } >> >> 3) register the table under a generated table path >> I'm afraid we have to do that. The generated table path is still needed >> for `TableSourceTable#tableIdentifier` which is used to calculate the >> digest. >> This requires that the registered table must have an unique identifier. >> The old `TableSourceQueryOperation` will also generate the identifier >> according >> to the hashcode of the TableSource object. However, the generated >> identifier "Unregistered_TableSource_1234" is still possible to be in >> conflict with >> the user's table path. Therefore, I prefer to register the generated name >> in the (temporary) catalog to throw explicit exceptions, rather than >> generating a wrong plan. >> >> ================================================================ >> Hi @Leonard Xu <[hidden email]> and @Jingsong Li >> <[hidden email]> , >> >> Do you have other concerns on the latest FLIP and the above discussion? >> >> Best, >> Jark >> >> On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz <[hidden email]> >> wrote: >> >>> Hi Jark, >>> >>> Thanks for the update. I think the FLIP looks really well on the high >>> level. >>> >>> I have a few comments to the code structure in the FLIP: >>> >>> 1) I really don't like how the TableDescriptor exposes protected fields. >>> Moreover why do we need to extend from it? I don't think we need >>> KafkaConnector extends TableDescriptor and alike. We only need the builders >>> e.g. the KafkaConnectorBuilder. >>> >>> If I understand it correctly this is the interface needed from the >>> TableEnvironment perspective and it is the contract that the >>> TableEnvironment expects. I would suggest making it an interface: >>> @PublicEvolving >>> public interface TableDescriptor { >>> List<String> getPartitionedFields(); >>> Schema getSchema(); >>> Map<String, String> getOptions(); >>> LikeOption[] getLikeOptions(); >>> String getLikePath(); >>> } >>> >>> Then the TableDescriptorBuilder would work with an internal >>> implementation of this interface >>> @PublicEvolving >>> public abstract class TableDescriptorBuilder<BUILDER extends TableDescriptorBuilder<BUILDER>> >>> { >>> >>> private final InternalTableDescriptor descriptor = new >>> InternalTableDescriptor(); >>> >>> /** >>> * Returns the this builder instance in the type of subclass. >>> */ >>> protected abstract BUILDER self(); >>> >>> /** >>> * Specifies the table schema. >>> */ >>> public BUILDER schema(Schema schema) { >>> descriptor.schema = schema; >>> return self(); >>> } >>> >>> /** >>> * Specifies the partition keys of this table. >>> */ >>> public BUILDER partitionedBy(String... fieldNames) { >>> checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) >>> shouldn't be called more than once."); >>> descriptor.partitionedFields.addAll(Arrays.asList(fieldNames)); >>> return self(); >>> } >>> >>> /** >>> * Extends some parts from the original registered table path. >>> */ >>> public BUILDER like(String tablePath, LikeOption... likeOptions) { >>> descriptor.likePath = tablePath; >>> descriptor.likeOptions = likeOptions; >>> return self(); >>> } >>> >>> protected BUILDER option(String key, String value) { >>> descriptor.options.put(key, value); >>> return self(); >>> } >>> >>> /** >>> * Returns created table descriptor. >>> */ >>> public TableDescriptor build() { >>> return descriptor; >>> } >>> } >>> >>> >>> 2) I'm also not the biggest fun of how the LikeOptions are suggested in >>> the doc. Can't we have something more like >>> >>> class LikeOption { >>> >>> public enum MergingStrategy { >>> INCLUDING, >>> EXCLUDING, >>> OVERWRITING >>> } >>> >>> public enum FeatureOption { >>> ALL, >>> CONSTRAINTS, >>> GENERATED, >>> OPTIONS, >>> PARTITIONS, >>> WATERMARKS >>> } >>> >>> private final MergingStrategy mergingStrategy; >>> private final FeatureOption featureOption; >>> >>> >>> public static final LikeOption including(FeatureOption option) { >>> >>> return new LikeOption(MergingStrategy.INCLUDING, option); >>> >>> } >>> >>> public static final LikeOption overwriting(FeatureOption option) { >>> >>> Preconditions.checkArgument(option != ALL && ...); >>> >>> return new LikeOption(MergingStrategy.INCLUDING, option); >>> >>> } >>> >>> } >>> >>> >>> 3) TableEnvironment#from(descriptor) will register descriptor under a >>> system generated table path (just like TableImpl#toString) first, and scan >>> from the table path to derive the Table. Table#executeInsert() does it in >>> the similar way. >>> >>> I would try not to register the table under a generated table path. Do we >>> really need that? I am pretty sure we can use the tables without >>> registering them in a catalog. Similarly to the old >>> TableSourceQueryOperation. >>> >>> Otherwise looks good >>> >>> Best, >>> >>> Dawid >>> >>> On 23/07/2020 10:35, Timo Walther wrote: >>> >>> Hi Jark, >>> >>> thanks for the update. I think the FLIP is in a really good shape now and >>> ready to be voted. If others have no further comments? >>> >>> I have one last comment around the methods of the descriptor builders. >>> When refactoring classes such as `KafkaConnector` or >>> `ElasticsearchConnector`. We should align the method names with the new >>> property names introduced in FLIP-122: >>> >>> KafkaConnector.newBuilder() >>> // similar to scan.startup.mode=earliest-offset >>> .scanStartupModeEarliest() >>> // similar to sink.partitioner=round-robin >>> .sinkPartitionerRoundRobin() >>> >>> What do you think? >>> >>> Thanks for driving this, >>> Timo >>> >>> >>> On 22.07.20 17:26, Jark Wu wrote: >>> >>> Hi all, >>> >>> After some offline discussion with other people, I'm also fine with using >>> the builder pattern now, >>> even though I still think the `.build()` method is a little verbose in >>> the >>> user code. >>> >>> I have updated the FLIP with following changes: >>> >>> 1) use builder pattern instead of "new" keyword. In order to avoid >>> duplicate code and reduce development burden for connector developers, >>> I introduced abstract classes `TableDescriptorBuilder` and >>> `FormatDescriptorBuilder`. >>> All the common methods are pre-defined in the base builder class, >>> all >>> the custom descriptor builder should extend from the base builder >>> classes. >>> And we can add more methods into the base builder class in the >>> future >>> without changes in the connectors. >>> 2) use Expression instead of SQL expression string for computed column >>> and >>> watermark strategy >>> 3) use `watermark(rowtime, expr)` as the watermark method. >>> 4) `Schema.column()` accepts `AbstractDataType` instead of `DataType` >>> 5) drop Schema#proctime and >>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>> >>> A full example will look like this: >>> >>> tEnv.createTemporaryTable( >>> "MyTable", >>> KafkaConnector.newBuilder() >>> .version("0.11") >>> .topic("user_logs") >>> .property("bootstrap.servers", "localhost:9092") >>> .property("group.id", "test-group") >>> .startFromEarliest() >>> .sinkPartitionerRoundRobin() >>> >>> .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) >>> .schema( >>> Schema.newBuilder() >>> .column("user_id", DataTypes.BIGINT()) >>> .column("user_name", DataTypes.STRING()) >>> .column("score", DataTypes.DECIMAL(10, 2)) >>> .column("log_ts", DataTypes.STRING()) >>> .column("part_field_0", DataTypes.STRING()) >>> .column("part_field_1", DataTypes.INT()) >>> .column("proc", proctime()) // define a processing-time >>> attribute with column name "proc" >>> .column("ts", toTimestamp($("log_ts"))) >>> .watermark("ts", $("ts").minus(lit(3).seconds())) >>> .primaryKey("user_id") >>> .build()) >>> .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't >>> support partitioned table yet, this is just an example for the API >>> .build() >>> ); >>> >>> I hope this resolves all your concerns. Welcome for further feedback! >>> >>> Updated FLIP: >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor&FormatDescriptorBuilder >>> >>> POC: >>> >>> https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3 >>> >>> Best, >>> Jark >>> >>> On Thu, 16 Jul 2020 at 20:18, Jark Wu <[hidden email]> >>> <[hidden email]> wrote: >>> >>> Thank you all for the discussion! >>> >>> Here are my comments: >>> >>> 2) I agree we should support Expression as a computed column. But I'm in >>> favor of Leonard's point that maybe we can also support SQL string >>> expression as a computed column. >>> Because it also keeps aligned with DDL. The concern for Expression is >>> that >>> converting Expression to SQL string, or (de)serializing Expression is >>> another topic not clear and may involve lots of work. >>> Maybe we can support Expression later if time permits. >>> >>> 6,7) I still prefer the "new" keyword over builder. I don't think >>> immutable is a strong reason. I care more about usability and experience >>> from users and devs perspective. >>> - Users need to type more words if using builder: >>> `KafkaConnector.newBuilder()...build()` vs `new KafkaConnector()...` >>> - It's more difficult for developers to write a descriptor. 2 classes >>> (KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders, >>> schema, partitionedBy, like, etc..). >>> With the "new" keyword all the common methods are defined by the >>> framework. >>> - It's hard to have the same API style for different connectors, >>> because >>> the common methods are defined by users. For example, some may have >>> `withSchema`, `partitionKey`, `withLike`, etc... >>> >>> 8) I'm -1 to `ConfigOption`. The ConfigOption is not used on >>> `JsonFormat`, >>> but the generic `Connector#option`. This doesn't work when using format >>> options. >>> >>> new Connector("kafka") >>> .option(JsonOptions.IGNORE_PARSE_ERRORS, true); // this is wrong, >>> because "kafka" requires "json.ignore-parse-errors" as the option key, >>> not >>> the "ignore-parse-errors". >>> >>> >>> ======================================== >>> Hi Timo, regarding having a complete new stack, I have thought about >>> that. >>> But I still prefer to refactor the existing stack. Reasons: >>> Because I think it will be more confusing if users will see two similar >>> stacks and may have many problems if using the wrong class. >>> For example, we may have two `Schema` and `TableDescriptor` classes. The >>> `KafkaConnector` can't be used in legacy `connect()` API, >>> the legacy `Kafka` class can't be used in the new >>> `createTemporaryTable()` >>> API. >>> Besides, the existing API has been deprecated in 1.11, I think it's fine >>> to remove them in 1.12. >>> >>> >>> Best, >>> Jark >>> >>> >>> On Thu, 16 Jul 2020 at 15:26, Jingsong Li <[hidden email]> >>> <[hidden email]> wrote: >>> >>> Thanks for the discussion. >>> >>> Descriptor lacks the watermark and the computed column is too long. >>> >>> 1) +1 for just `column(...)` >>> >>> 2) +1 for being consistent with Table API, the Java Table API should be >>> Expression DSL. We don't need pure string support, users should just use >>> DDL instead. I think this is just a schema descriptor? The schema >>> descriptor should be consistent with DDL, so, definitely, it should >>> contain computed columns information. >>> >>> 3) +1 for not containing Schema#proctime and >>> Schema#watermarkFor#boundedOutOfOrderTimestamps, we can just leave them >>> in >>> legacy apis. >>> >>> 6,7) +1 for removing "new" and builder and making it immutable, For Jark, >>> the starting method is the static method, the others are not. >>> >>> 8) +1 for `ConfigOption`, this can be consistent with `WritableConfig`. >>> For Leonard, I don't think user needs “json.fail-on-missing-field” rather >>> than “fail-on-missing-field”, user should >>> need “fail-on-missing-field” rather than “json.fail-on-missing-field", >>> the >>> recommended way is "JsonFormat.newInstance().option(....)", should >>> configure options in the format scope. >>> >>> Best, >>> Jingsong >>> >>> On Wed, Jul 15, 2020 at 9:30 PM Leonard Xu <[hidden email]> >>> <[hidden email]> wrote: >>> >>> Thanks Jark bring this discussion and organize the FLIP document. >>> >>> Thanks Dawid and Timo for the feedback. Here are my thoughts. >>> >>> 1) I’m +1 with using column() for both cases. >>> >>> 2) Expression DSL vs pure SQL string for computed columns >>> >>> I think we can support them both and implement the pure SQL String first, >>> I agree that Expression DSL brings more possibility and flexibility, but >>> using SQL string is a more unified way which can reuse most logic with >>> DDL >>> like validation and persist in Catalog, >>> and Converting Expression DSL to SQL Expression is another big topic and >>> I did not figure out a feasible idea until now. >>> So, maybe we can postpone the Expression DSL support considered the >>> reality. >>> >>> 3) Methods Schema#proctime and >>> Schema#watermarkFor#boundedOutOfOrderTimestamps >>> >>> +1 with Dawid’s proposal to offer SQL like methods. >>> Schema() >>> .column("proctime", proctime()); >>> .watermarkFor("rowtime", $("rowtime").minus(lit(3).seconds())) >>> And we can simplify watermarkFor(“colName”, Expression >>> watermarkStrategy)to watermark(“colName”, Expression watermarkStrategy), >>> I >>> think the later one has can express the meaning of “ WATERMARK FOR >>> column_name AS watermark_strategy_expression“ well. >>> >>> 5)6)7) The new keyword vs the static method vs builder pattern >>> >>> I have not strong tendency, the new keyword and the static method on >>> descriptor can nearly treated as a builder and do same things like >>> builder. >>> For the builder pattern, we will introduce six >>> methods(connector.Builder()、connector.Builder.build(), format.Builder(), >>> format.Builder.build(), Schema.Builder(),Schema.Builder.build() ),I think >>> we could reduce these unnecessary methods. I ‘m slightly +1 for new >>> keyword if we need a choice. >>> >>> 8) `Connector.option(...)` class should also accept `ConfigOption` >>> I’m slightly -1 for this, ConfigOption may not work because the key for >>> format configOption has not format prefix eg: FAIL_ON_MISSING_FIELD of >>> json, we need “json.fail-on-missing-field” rather than >>> “fail-on-missing-field”. >>> >>> public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = >>> ConfigOptions >>> .key("fail-on-missing-field") >>> .booleanType() >>> .defaultValue(false) >>> >>> WDYT? >>> >>> Best, >>> Leonard Xu >>> >>> >>> 在 2020年7月15日,16:37,Timo Walther <[hidden email]> <[hidden email]> >>> 写道: >>> >>> Hi Jark, >>> >>> thanks for working on this issue. It is time to fix this last part of >>> >>> inconsistency in the API. I also like the core parts of the FLIP, esp. >>> that >>> TableDescriptor is one entity that can be passed to different methods. >>> Here >>> is some feedback from my side: >>> >>> >>> 1) +1 for just `column(...)` >>> >>> 2) Expression DSL vs pure SQL string for computed columns >>> I agree with Dawid. Using the Expression DSL is desireable for a >>> >>> consistent API. Furthermore, otherwise people need to register functions >>> if >>> they want to use them in an expression. Refactoring TableSchema is >>> definitely on the list for 1.12. Maybe we can come up with some >>> intermediate solution where we transform the expression to a SQL >>> expression >>> for the catalog. Until the discussions around FLIP-80 and >>> CatalogTableSchema have been finalized. >>> >>> >>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>> We should design the descriptor very close to the SQL syntax. The more >>> >>> similar the syntax the more likely it is too keep the new descriptor API >>> stable. >>> >>> >>> 6) static method vs new keyword >>> Actually, the `new` keyword was one of the things that bothered me >>> >>> most in the old design. Fluent APIs avoid this nowadays. >>> >>> >>> 7) make the descriptors immutable with builders >>> The descriptors are some kind of builders already. But they are not >>> >>> called "builder". Instead of coming up with the new concept of a >>> "descriptor", we should use terminology that people esp. Java/Scala users >>> are familiar with already. >>> >>> >>> We could make the descriptors immutable to pass them around easily. >>> >>> Btw "Connector" and "Format" should always be in the classname. This >>> >>> was also a mistake in the past. Instead of calling the descriptor just >>> `Kafka` we could call it `KafkaConnector`. An entire example could look >>> like: >>> >>> >>> tEnv.createTemporaryTable( >>> "OrdersInKafka", >>> KafkaConnector.newBuilder() // builder pattern supported by IDE >>> .topic("user_logs") >>> .property("bootstrap.servers", "localhost:9092") >>> .property("group.id", "test-group") >>> .format(JsonFormat.newInstance()) // shortcut for no parameters >>> .schema( >>> Schema.newBuilder() >>> .column("user_id", DataTypes.BIGINT()) >>> .column("score", DataTypes.DECIMAL(10, 2)) >>> .column("log_ts", DataTypes.TIMESTAMP(3)) >>> .column("my_ts", toTimestamp($("log_ts")) >>> .build() >>> ) >>> .build() >>> ); >>> >>> Instead of refacoring the existing classes, we could also think about >>> >>> a completly new stack. I think this would avoid confusion for the old >>> users. We could deprecate the entire `Kafka` class instead of dealing >>> with >>> backwards compatibility. >>> >>> >>> 8) minor extensions >>> A general `Connector.option(...)` class should also accept >>> >>> `ConfigOption` instead of only strings. >>> >>> A `Schema.column()` should accept `AbstractDataType` that can be >>> >>> resolved to a `DataType` by access to a `DataTypeFactory`. >>> >>> >>> What do you think? >>> >>> Thanks, >>> Timo >>> >>> >>> On 09.07.20 18:51, Jark Wu wrote: >>> >>> Hi Dawid, >>> Thanks for the great feedback! Here are my responses: >>> 1) computedColumn(..) vs column(..) >>> I'm fine to use `column(..)` in both cases. >>> 2) Expression DSL vs pure SQL string for computed columns >>> This is a good point. Actually, I also prefer to use Expression DSL >>> >>> because >>> >>> this is more Table API style. >>> However, this requires to modify TableSchema again to accept & expose >>> Expression as computed columns. >>> I'm not convinced about this, because AFAIK, we want to have a >>> CatalogTableSchema to hold this information >>> and don't want to extend TableSchema. Maybe Timo can give some points >>> >>> here. >>> >>> Besides, this will make the descriptor API can't be persisted in >>> >>> Catalog >>> >>> unless FLIP-80 is done. >>> 3) Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps >>> The original intention behind these APIs are providing shortcut APIs >>> >>> for >>> >>> Table API users. >>> But I'm also fine to only provide the DDL-like methods if you have >>> concerns. We can discuss shortcuts in the future if users request. >>> 4) LikeOption >>> LikeOption.INCLUDING.ALL is a constant (enum values). I have added >>> >>> more >>> >>> description about this in the FLIP. >>> 5) implementation? >>> I don't want to mention too much about implementation details in the >>> >>> FLIP >>> >>> at the beginning, because the API is already very long. >>> But I also added an "Implementation" section to explain them. >>> 6) static method vs new keyword >>> Personally I prefer the new keyword because it makes the API cleaner. >>> >>> If we >>> >>> want remove new keyword and use static methods, we have to: >>> Either adding a `Schema.builder()/create()` method as the starting >>> >>> method, >>> >>> Or duplicating all the methods as static methods, e.g. we have 12 >>> >>> methods >>> >>> in `Kafka`, any of them can be a starting method, then we will have 24 >>> methods in `Kafka`. >>> Both are not good, and it's hard to keep all the descriptors having >>> >>> the >>> >>> same starting method name, but all the descriptors can start from the >>> >>> same >>> >>> new keyword. >>> Best, >>> Jark >>> On Thu, 9 Jul 2020 at 15:48, Dawid Wysakowicz <[hidden email] >>> >>> >>> wrote: >>> >>> Correction to my point 4. The example is correct. I did not read it >>> carefully enough. Sorry for the confusion. Nevertheless I'd still >>> >>> like >>> >>> to see a bit more explanation on the LikeOptions. >>> >>> On 07/07/2020 04:32, Jark Wu wrote: >>> >>> Hi everyone, >>> >>> Leonard and I prepared a FLIP about refactoring current Descriptor >>> >>> API, >>> >>> i.e. TableEnvironment#connect(). We would like to propose a new >>> >>> descriptor >>> >>> API to register connectors in Table API. >>> >>> Since Flink 1.9, the community focused more on the new SQL DDL >>> >>> feature. >>> >>> After a series of releases, the SQL DDL is powerful and has many >>> >>> rich >>> >>> features now. However, Descriptor API (the >>> >>> `TableEnvironment#connect()`) >>> >>> has been stagnant for a long time and missing lots of core >>> >>> features, such >>> >>> as computed columns and primary keys. That's frustrating for Table >>> >>> API >>> >>> users who want to register tables programmatically. Besides, >>> >>> currently, a >>> >>> connector must implement a corresponding Descriptor (e.g. `new >>> >>> Kafka()`) >>> >>> before using the "connect" API. Therefore, we hope to reduce this >>> >>> effort >>> >>> for connector developers, that custom source/sinks can be >>> >>> registered via >>> >>> the descriptor API without implementing a Descriptor. >>> >>> These are the problems we want to resolve in this FLIP. I'm looking >>> >>> forward >>> >>> to your comments. >>> >>> >>> >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connector+for+Table+API >>> >>> >>> Best, >>> Jark >>> >>> >>> >>> >>> >>> >>> >>> -- >>> Best, Jingsong Lee >>> >>> >>> >>> >>> >> > |
Free forum by Nabble | Edit this page |