[DISCUSS] Improve TableFactory

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Improve TableFactory

Jingsong Li
Hi dev,

I'd like to kick off a discussion on the improvement of TableSourceFactory
and TableSinkFactory.

Motivation:
Now the main needs and problems are:
1.Connector can't get TableConfig [1], and some behaviors really need to be
controlled by the user's table configuration. In the era of catalog, we
can't put these config in connector properties, which is too inconvenient.
2.Connector can't know if this is batch or stream execution mode. But the
sink implementation of batch and stream is totally different. I understand
there is an update mode property now, but it splits the batch and stream in
the catalog dimension. In fact, this information can be obtained through
the current TableEnvironment.
3.No interface to call validation. Now our validation is more util classes.
It depends on whether or not the connector calls. Now we have some new
validations to add, such as [2], which is really confuse uses, even
developers. Another problem is that our SQL update (DDL) does not have
validation [3]. It is better to report an error when executing DDL,
otherwise it will confuse the user.

Proposed change draft for 1 and 2:

interface CatalogTableContext {
   ObjectPath getTablePath();
   CatalogTable getTable();
   ReadableConfig getTableConfig();
   boolean isStreamingMode();
}

public interface TableSourceFactory<T> extends TableFactory {

   default TableSource<T> createTableSource(CatalogTableContext context) {
      return createTableSource(context.getTablePath(), context.getTable());
   }

   ......
}

Proposed change draft for 3:

public interface TableFactory {

   TableValidators validators();

   interface TableValidators {
      ConnectorDescriptorValidator connectorValidator();
      TableSchemaValidator schemaValidator();
      FormatDescriptorValidator formatValidator();
   }
}

What do you think?

[1] https://issues.apache.org/jira/browse/FLINK-15290
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
[3] https://issues.apache.org/jira/browse/FLINK-15509

Best,
Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

bowen.li
Hi Jingsong,

The 1st and 2nd pain points you described are very valid, as I'm more
familiar with them. I agree these are shortcomings of the current Flink SQL
design.

A couple comments on your 1st proposal:

1. is it better to have explicit APIs like "createBatchTableSource(...)"
and "createStreamingTableSource(...)" in TableSourceFactory (would be
similar for sink factory) to let planner handle which mode (streaming vs
batch) of source should be instantiated? That way we don't need to always
let connector developers handling an if-else on isStreamingMode.
2. I'm not sure of the benefits to have a CatalogTableContext class. The
path, table, and config are fairly independent of each other. So why not
pass the config in as 3rd parameter as `createXxxTableSource(path,
catalogTable, tableConfig)?


On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]> wrote:

> Hi dev,
>
> I'd like to kick off a discussion on the improvement of TableSourceFactory
> and TableSinkFactory.
>
> Motivation:
> Now the main needs and problems are:
> 1.Connector can't get TableConfig [1], and some behaviors really need to be
> controlled by the user's table configuration. In the era of catalog, we
> can't put these config in connector properties, which is too inconvenient.
> 2.Connector can't know if this is batch or stream execution mode. But the
> sink implementation of batch and stream is totally different. I understand
> there is an update mode property now, but it splits the batch and stream in
> the catalog dimension. In fact, this information can be obtained through
> the current TableEnvironment.
> 3.No interface to call validation. Now our validation is more util classes.
> It depends on whether or not the connector calls. Now we have some new
> validations to add, such as [2], which is really confuse uses, even
> developers. Another problem is that our SQL update (DDL) does not have
> validation [3]. It is better to report an error when executing DDL,
> otherwise it will confuse the user.
>
> Proposed change draft for 1 and 2:
>
> interface CatalogTableContext {
>    ObjectPath getTablePath();
>    CatalogTable getTable();
>    ReadableConfig getTableConfig();
>    boolean isStreamingMode();
> }
>
> public interface TableSourceFactory<T> extends TableFactory {
>
>    default TableSource<T> createTableSource(CatalogTableContext context) {
>       return createTableSource(context.getTablePath(), context.getTable());
>    }
>
>    ......
> }
>
> Proposed change draft for 3:
>
> public interface TableFactory {
>
>    TableValidators validators();
>
>    interface TableValidators {
>       ConnectorDescriptorValidator connectorValidator();
>       TableSchemaValidator schemaValidator();
>       FormatDescriptorValidator formatValidator();
>    }
> }
>
> What do you think?
>
> [1] https://issues.apache.org/jira/browse/FLINK-15290
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> [3] https://issues.apache.org/jira/browse/FLINK-15509
>
> Best,
> Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Timo Walther-2
Hi Jingsong,

+1 for adding a context in the source and sink factories. A context
class also allows for future modifications without touching the
TableFactory interface again.

How about:

interface TableSourceFactory {
     interface Context {
        // ...
     }
}

Because I find the name `CatalogTableContext` confusing and we can bound
the interface to the factory class itself as an inner interface.

Readable access to configuration sounds also right to me. Can we remove
the `ObjectPath getTablePath()` method? I don't see a reason why a
factory should know the path. And if so, it should be an
`ObjectIdentifier` instead to also know about the catalog we are using.

The `isStreamingMode()` should be renamed to `isBounded()` because we
would like to use terminology around boundedness rather than
streaming/batch.

@Bowen: We are in the process of unifying the APIs and thus explicitly
avoid specialized methods in the future.

Can we postpone the change of TableValidators? I don't think that every
factory needs a schema validator. Ideally, the factory should just
return a List<ConfigOption> or ConfigOptionGroup that contains the
validation logic as mentioned in the validation part of FLIP-54[1]. But
currently our config options are not rich enough to have a unified
validation. Additionally, the factory should return some properties such
as "supports event-time" for the schema validation outside of the
factory itself.

Regards,
Timo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration 



On 16.01.20 00:51, Bowen Li wrote:

> Hi Jingsong,
>
> The 1st and 2nd pain points you described are very valid, as I'm more
> familiar with them. I agree these are shortcomings of the current Flink SQL
> design.
>
> A couple comments on your 1st proposal:
>
> 1. is it better to have explicit APIs like "createBatchTableSource(...)"
> and "createStreamingTableSource(...)" in TableSourceFactory (would be
> similar for sink factory) to let planner handle which mode (streaming vs
> batch) of source should be instantiated? That way we don't need to always
> let connector developers handling an if-else on isStreamingMode.
> 2. I'm not sure of the benefits to have a CatalogTableContext class. The
> path, table, and config are fairly independent of each other. So why not
> pass the config in as 3rd parameter as `createXxxTableSource(path,
> catalogTable, tableConfig)?
>
>
> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]> wrote:
>
>> Hi dev,
>>
>> I'd like to kick off a discussion on the improvement of TableSourceFactory
>> and TableSinkFactory.
>>
>> Motivation:
>> Now the main needs and problems are:
>> 1.Connector can't get TableConfig [1], and some behaviors really need to be
>> controlled by the user's table configuration. In the era of catalog, we
>> can't put these config in connector properties, which is too inconvenient.
>> 2.Connector can't know if this is batch or stream execution mode. But the
>> sink implementation of batch and stream is totally different. I understand
>> there is an update mode property now, but it splits the batch and stream in
>> the catalog dimension. In fact, this information can be obtained through
>> the current TableEnvironment.
>> 3.No interface to call validation. Now our validation is more util classes.
>> It depends on whether or not the connector calls. Now we have some new
>> validations to add, such as [2], which is really confuse uses, even
>> developers. Another problem is that our SQL update (DDL) does not have
>> validation [3]. It is better to report an error when executing DDL,
>> otherwise it will confuse the user.
>>
>> Proposed change draft for 1 and 2:
>>
>> interface CatalogTableContext {
>>     ObjectPath getTablePath();
>>     CatalogTable getTable();
>>     ReadableConfig getTableConfig();
>>     boolean isStreamingMode();
>> }
>>
>> public interface TableSourceFactory<T> extends TableFactory {
>>
>>     default TableSource<T> createTableSource(CatalogTableContext context) {
>>        return createTableSource(context.getTablePath(), context.getTable());
>>     }
>>
>>     ......
>> }
>>
>> Proposed change draft for 3:
>>
>> public interface TableFactory {
>>
>>     TableValidators validators();
>>
>>     interface TableValidators {
>>        ConnectorDescriptorValidator connectorValidator();
>>        TableSchemaValidator schemaValidator();
>>        FormatDescriptorValidator formatValidator();
>>     }
>> }
>>
>> What do you think?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15290
>> [2]
>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>> [3] https://issues.apache.org/jira/browse/FLINK-15509
>>
>> Best,
>> Jingsong Lee
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jark Wu-2
Hi Jingsong,

I'm also +1 for adding a context in the source and sink factories.

Base on the Timo's proposal, IMO,

1) table path might be useful because it is a part of DDL and users might
use it as the source/sink display name in explainSource().
2) isStreamingMode vs isBounded: however, a bounded source can also be
executed in streaming mode. I think `isStreamingMode`
is fine because it's intuitive and EnvironmentSettings also uses this
terminology. My concern is should we consider the unified mode
maybe introduced in the future?

Best,
Jark



On Thu, 16 Jan 2020 at 18:01, Timo Walther <[hidden email]> wrote:

> Hi Jingsong,
>
> +1 for adding a context in the source and sink factories. A context
> class also allows for future modifications without touching the
> TableFactory interface again.
>
> How about:
>
> interface TableSourceFactory {
>      interface Context {
>         // ...
>      }
> }
>
> Because I find the name `CatalogTableContext` confusing and we can bound
> the interface to the factory class itself as an inner interface.
>
> Readable access to configuration sounds also right to me. Can we remove
> the `ObjectPath getTablePath()` method? I don't see a reason why a
> factory should know the path. And if so, it should be an
> `ObjectIdentifier` instead to also know about the catalog we are using.
>
> The `isStreamingMode()` should be renamed to `isBounded()` because we
> would like to use terminology around boundedness rather than
> streaming/batch.
>
> @Bowen: We are in the process of unifying the APIs and thus explicitly
> avoid specialized methods in the future.
>
> Can we postpone the change of TableValidators? I don't think that every
> factory needs a schema validator. Ideally, the factory should just
> return a List<ConfigOption> or ConfigOptionGroup that contains the
> validation logic as mentioned in the validation part of FLIP-54[1]. But
> currently our config options are not rich enough to have a unified
> validation. Additionally, the factory should return some properties such
> as "supports event-time" for the schema validation outside of the
> factory itself.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>
>
>
> On 16.01.20 00:51, Bowen Li wrote:
> > Hi Jingsong,
> >
> > The 1st and 2nd pain points you described are very valid, as I'm more
> > familiar with them. I agree these are shortcomings of the current Flink
> SQL
> > design.
> >
> > A couple comments on your 1st proposal:
> >
> > 1. is it better to have explicit APIs like "createBatchTableSource(...)"
> > and "createStreamingTableSource(...)" in TableSourceFactory (would be
> > similar for sink factory) to let planner handle which mode (streaming vs
> > batch) of source should be instantiated? That way we don't need to always
> > let connector developers handling an if-else on isStreamingMode.
> > 2. I'm not sure of the benefits to have a CatalogTableContext class. The
> > path, table, and config are fairly independent of each other. So why not
> > pass the config in as 3rd parameter as `createXxxTableSource(path,
> > catalogTable, tableConfig)?
> >
> >
> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]>
> wrote:
> >
> >> Hi dev,
> >>
> >> I'd like to kick off a discussion on the improvement of
> TableSourceFactory
> >> and TableSinkFactory.
> >>
> >> Motivation:
> >> Now the main needs and problems are:
> >> 1.Connector can't get TableConfig [1], and some behaviors really need
> to be
> >> controlled by the user's table configuration. In the era of catalog, we
> >> can't put these config in connector properties, which is too
> inconvenient.
> >> 2.Connector can't know if this is batch or stream execution mode. But
> the
> >> sink implementation of batch and stream is totally different. I
> understand
> >> there is an update mode property now, but it splits the batch and
> stream in
> >> the catalog dimension. In fact, this information can be obtained through
> >> the current TableEnvironment.
> >> 3.No interface to call validation. Now our validation is more util
> classes.
> >> It depends on whether or not the connector calls. Now we have some new
> >> validations to add, such as [2], which is really confuse uses, even
> >> developers. Another problem is that our SQL update (DDL) does not have
> >> validation [3]. It is better to report an error when executing DDL,
> >> otherwise it will confuse the user.
> >>
> >> Proposed change draft for 1 and 2:
> >>
> >> interface CatalogTableContext {
> >>     ObjectPath getTablePath();
> >>     CatalogTable getTable();
> >>     ReadableConfig getTableConfig();
> >>     boolean isStreamingMode();
> >> }
> >>
> >> public interface TableSourceFactory<T> extends TableFactory {
> >>
> >>     default TableSource<T> createTableSource(CatalogTableContext
> context) {
> >>        return createTableSource(context.getTablePath(),
> context.getTable());
> >>     }
> >>
> >>     ......
> >> }
> >>
> >> Proposed change draft for 3:
> >>
> >> public interface TableFactory {
> >>
> >>     TableValidators validators();
> >>
> >>     interface TableValidators {
> >>        ConnectorDescriptorValidator connectorValidator();
> >>        TableSchemaValidator schemaValidator();
> >>        FormatDescriptorValidator formatValidator();
> >>     }
> >> }
> >>
> >> What do you think?
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-15290
> >> [2]
> >>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> >> [3] https://issues.apache.org/jira/browse/FLINK-15509
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jingsong Li
In reply to this post by Timo Walther-2
Thanks Bowen and Timo for involving.

Hi Bowen,

> 1. is it better to have explicit APIs like "createBatchTableSource(...)"
I think it is better to keep one method, since in [1], we have reached one
in DataStream layer to maintain a single API in "env.source". I think it is
good to not split batch and stream, And our TableSource/TableSink are the
same class for both batch and streaming too.

> 2. I'm not sure of the benefits to have a CatalogTableContext class.
As Timo said, We may have more parameters to add in the future, take a look
to "AbstractRichFunction.RuntimeContext", It's added little by little.

Hi Timo,

Your suggestion about Context looks good to me.
"TablePath" used in Hive for updating the catalog information of this
table. Yes, "ObjectIdentifier" looks better than "ObjectPath".

> Can we postpone the change of TableValidators?
Yes, ConfigOption validation looks good to me. It seems that you have been
thinking about this for a long time. It's very good. Looking forward to the
promotion of FLIP-54.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692

Best,
Jingsong Lee

On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]> wrote:

> Hi Jingsong,
>
> +1 for adding a context in the source and sink factories. A context
> class also allows for future modifications without touching the
> TableFactory interface again.
>
> How about:
>
> interface TableSourceFactory {
>      interface Context {
>         // ...
>      }
> }
>
> Because I find the name `CatalogTableContext` confusing and we can bound
> the interface to the factory class itself as an inner interface.
>
> Readable access to configuration sounds also right to me. Can we remove
> the `ObjectPath getTablePath()` method? I don't see a reason why a
> factory should know the path. And if so, it should be an
> `ObjectIdentifier` instead to also know about the catalog we are using.
>
> The `isStreamingMode()` should be renamed to `isBounded()` because we
> would like to use terminology around boundedness rather than
> streaming/batch.
>
> @Bowen: We are in the process of unifying the APIs and thus explicitly
> avoid specialized methods in the future.
>
> Can we postpone the change of TableValidators? I don't think that every
> factory needs a schema validator. Ideally, the factory should just
> return a List<ConfigOption> or ConfigOptionGroup that contains the
> validation logic as mentioned in the validation part of FLIP-54[1]. But
> currently our config options are not rich enough to have a unified
> validation. Additionally, the factory should return some properties such
> as "supports event-time" for the schema validation outside of the
> factory itself.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>
>
>
> On 16.01.20 00:51, Bowen Li wrote:
> > Hi Jingsong,
> >
> > The 1st and 2nd pain points you described are very valid, as I'm more
> > familiar with them. I agree these are shortcomings of the current Flink
> SQL
> > design.
> >
> > A couple comments on your 1st proposal:
> >
> > 1. is it better to have explicit APIs like "createBatchTableSource(...)"
> > and "createStreamingTableSource(...)" in TableSourceFactory (would be
> > similar for sink factory) to let planner handle which mode (streaming vs
> > batch) of source should be instantiated? That way we don't need to always
> > let connector developers handling an if-else on isStreamingMode.
> > 2. I'm not sure of the benefits to have a CatalogTableContext class. The
> > path, table, and config are fairly independent of each other. So why not
> > pass the config in as 3rd parameter as `createXxxTableSource(path,
> > catalogTable, tableConfig)?
> >
> >
> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]>
> wrote:
> >
> >> Hi dev,
> >>
> >> I'd like to kick off a discussion on the improvement of
> TableSourceFactory
> >> and TableSinkFactory.
> >>
> >> Motivation:
> >> Now the main needs and problems are:
> >> 1.Connector can't get TableConfig [1], and some behaviors really need
> to be
> >> controlled by the user's table configuration. In the era of catalog, we
> >> can't put these config in connector properties, which is too
> inconvenient.
> >> 2.Connector can't know if this is batch or stream execution mode. But
> the
> >> sink implementation of batch and stream is totally different. I
> understand
> >> there is an update mode property now, but it splits the batch and
> stream in
> >> the catalog dimension. In fact, this information can be obtained through
> >> the current TableEnvironment.
> >> 3.No interface to call validation. Now our validation is more util
> classes.
> >> It depends on whether or not the connector calls. Now we have some new
> >> validations to add, such as [2], which is really confuse uses, even
> >> developers. Another problem is that our SQL update (DDL) does not have
> >> validation [3]. It is better to report an error when executing DDL,
> >> otherwise it will confuse the user.
> >>
> >> Proposed change draft for 1 and 2:
> >>
> >> interface CatalogTableContext {
> >>     ObjectPath getTablePath();
> >>     CatalogTable getTable();
> >>     ReadableConfig getTableConfig();
> >>     boolean isStreamingMode();
> >> }
> >>
> >> public interface TableSourceFactory<T> extends TableFactory {
> >>
> >>     default TableSource<T> createTableSource(CatalogTableContext
> context) {
> >>        return createTableSource(context.getTablePath(),
> context.getTable());
> >>     }
> >>
> >>     ......
> >> }
> >>
> >> Proposed change draft for 3:
> >>
> >> public interface TableFactory {
> >>
> >>     TableValidators validators();
> >>
> >>     interface TableValidators {
> >>        ConnectorDescriptorValidator connectorValidator();
> >>        TableSchemaValidator schemaValidator();
> >>        FormatDescriptorValidator formatValidator();
> >>     }
> >> }
> >>
> >> What do you think?
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-15290
> >> [2]
> >>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> >> [3] https://issues.apache.org/jira/browse/FLINK-15509
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jingsong Li
Hi Jark,

Thanks involving, yes, it's hard to understand to add isBounded on the
source.
I recommend adding only to sink at present, because sink has upstream. Its
upstream is either bounded or unbounded.

Hi all,

Let me summarize with your suggestions.

public interface TableSourceFactory<T> extends TableFactory {

   ......


   /**
    * Creates and configures a {@link TableSource} based on the given
{@link Context}.
    *
    * @param context context of this table source.
    * @return the configured table source.
    */
   default TableSource<T> createTableSource(Context context) {
      ObjectIdentifier tableIdentifier = context.getTableIdentifier();
      return createTableSource(
            new ObjectPath(tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()),
            context.getTable());
   }

   /**
    * Context of table source creation. Contains table information and
environment information.
    */
   interface Context {

      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getTableIdentifier();

      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();

      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getTableConfig();
   }
}

public interface TableSinkFactory<T> extends TableFactory {

   ......

   /**
    * Creates and configures a {@link TableSink} based on the given
{@link Context}.
    *
    * @param context context of this table sink.
    * @return the configured table sink.
    */
   default TableSink<T> createTableSink(Context context) {
      ObjectIdentifier tableIdentifier = context.getTableIdentifier();
      return createTableSink(
            new ObjectPath(tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()),
            context.getTable());
   }

   /**
    * Context of table sink creation. Contains table information and
environment information.
    */
   interface Context {

      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getTableIdentifier();

      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();

      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getTableConfig();

      /**
       * @return Input whether or not it is bounded.
       */
      boolean isBounded();
   }
}

If there is no objection, I will start a vote thread. (if necessary, I can
also edit a FLIP).

Best,
Jingsong Lee

On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <[hidden email]> wrote:

> Thanks Bowen and Timo for involving.
>
> Hi Bowen,
>
> > 1. is it better to have explicit APIs like "createBatchTableSource(...)"
> I think it is better to keep one method, since in [1], we have reached one
> in DataStream layer to maintain a single API in "env.source". I think it is
> good to not split batch and stream, And our TableSource/TableSink are the
> same class for both batch and streaming too.
>
> > 2. I'm not sure of the benefits to have a CatalogTableContext class.
> As Timo said, We may have more parameters to add in the future, take a
> look to "AbstractRichFunction.RuntimeContext", It's added little by little.
>
> Hi Timo,
>
> Your suggestion about Context looks good to me.
> "TablePath" used in Hive for updating the catalog information of this
> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
>
> > Can we postpone the change of TableValidators?
> Yes, ConfigOption validation looks good to me. It seems that you have been
> thinking about this for a long time. It's very good. Looking forward to the
> promotion of FLIP-54.
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]> wrote:
>
>> Hi Jingsong,
>>
>> +1 for adding a context in the source and sink factories. A context
>> class also allows for future modifications without touching the
>> TableFactory interface again.
>>
>> How about:
>>
>> interface TableSourceFactory {
>>      interface Context {
>>         // ...
>>      }
>> }
>>
>> Because I find the name `CatalogTableContext` confusing and we can bound
>> the interface to the factory class itself as an inner interface.
>>
>> Readable access to configuration sounds also right to me. Can we remove
>> the `ObjectPath getTablePath()` method? I don't see a reason why a
>> factory should know the path. And if so, it should be an
>> `ObjectIdentifier` instead to also know about the catalog we are using.
>>
>> The `isStreamingMode()` should be renamed to `isBounded()` because we
>> would like to use terminology around boundedness rather than
>> streaming/batch.
>>
>> @Bowen: We are in the process of unifying the APIs and thus explicitly
>> avoid specialized methods in the future.
>>
>> Can we postpone the change of TableValidators? I don't think that every
>> factory needs a schema validator. Ideally, the factory should just
>> return a List<ConfigOption> or ConfigOptionGroup that contains the
>> validation logic as mentioned in the validation part of FLIP-54[1]. But
>> currently our config options are not rich enough to have a unified
>> validation. Additionally, the factory should return some properties such
>> as "supports event-time" for the schema validation outside of the
>> factory itself.
>>
>> Regards,
>> Timo
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>>
>>
>>
>> On 16.01.20 00:51, Bowen Li wrote:
>> > Hi Jingsong,
>> >
>> > The 1st and 2nd pain points you described are very valid, as I'm more
>> > familiar with them. I agree these are shortcomings of the current Flink
>> SQL
>> > design.
>> >
>> > A couple comments on your 1st proposal:
>> >
>> > 1. is it better to have explicit APIs like "createBatchTableSource(...)"
>> > and "createStreamingTableSource(...)" in TableSourceFactory (would be
>> > similar for sink factory) to let planner handle which mode (streaming vs
>> > batch) of source should be instantiated? That way we don't need to
>> always
>> > let connector developers handling an if-else on isStreamingMode.
>> > 2. I'm not sure of the benefits to have a CatalogTableContext class. The
>> > path, table, and config are fairly independent of each other. So why not
>> > pass the config in as 3rd parameter as `createXxxTableSource(path,
>> > catalogTable, tableConfig)?
>> >
>> >
>> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]>
>> wrote:
>> >
>> >> Hi dev,
>> >>
>> >> I'd like to kick off a discussion on the improvement of
>> TableSourceFactory
>> >> and TableSinkFactory.
>> >>
>> >> Motivation:
>> >> Now the main needs and problems are:
>> >> 1.Connector can't get TableConfig [1], and some behaviors really need
>> to be
>> >> controlled by the user's table configuration. In the era of catalog, we
>> >> can't put these config in connector properties, which is too
>> inconvenient.
>> >> 2.Connector can't know if this is batch or stream execution mode. But
>> the
>> >> sink implementation of batch and stream is totally different. I
>> understand
>> >> there is an update mode property now, but it splits the batch and
>> stream in
>> >> the catalog dimension. In fact, this information can be obtained
>> through
>> >> the current TableEnvironment.
>> >> 3.No interface to call validation. Now our validation is more util
>> classes.
>> >> It depends on whether or not the connector calls. Now we have some new
>> >> validations to add, such as [2], which is really confuse uses, even
>> >> developers. Another problem is that our SQL update (DDL) does not have
>> >> validation [3]. It is better to report an error when executing DDL,
>> >> otherwise it will confuse the user.
>> >>
>> >> Proposed change draft for 1 and 2:
>> >>
>> >> interface CatalogTableContext {
>> >>     ObjectPath getTablePath();
>> >>     CatalogTable getTable();
>> >>     ReadableConfig getTableConfig();
>> >>     boolean isStreamingMode();
>> >> }
>> >>
>> >> public interface TableSourceFactory<T> extends TableFactory {
>> >>
>> >>     default TableSource<T> createTableSource(CatalogTableContext
>> context) {
>> >>        return createTableSource(context.getTablePath(),
>> context.getTable());
>> >>     }
>> >>
>> >>     ......
>> >> }
>> >>
>> >> Proposed change draft for 3:
>> >>
>> >> public interface TableFactory {
>> >>
>> >>     TableValidators validators();
>> >>
>> >>     interface TableValidators {
>> >>        ConnectorDescriptorValidator connectorValidator();
>> >>        TableSchemaValidator schemaValidator();
>> >>        FormatDescriptorValidator formatValidator();
>> >>     }
>> >> }
>> >>
>> >> What do you think?
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-15290
>> >> [2]
>> >>
>> >>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>> >> [3] https://issues.apache.org/jira/browse/FLINK-15509
>> >>
>> >> Best,
>> >> Jingsong Lee
>> >>
>> >
>>
>>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jingsong Li
Hi all,

After rethinking and discussion with Kurt, I'd like to remove "isBounded".
We can delay this is bounded message to TableSink.
With TableSink refactor, we need consider "consumeDataStream"
and "consumeBoundedStream".

Best,
Jingsong Lee

On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <[hidden email]> wrote:

> Hi Jark,
>
> Thanks involving, yes, it's hard to understand to add isBounded on the
> source.
> I recommend adding only to sink at present, because sink has upstream. Its
> upstream is either bounded or unbounded.
>
> Hi all,
>
> Let me summarize with your suggestions.
>
> public interface TableSourceFactory<T> extends TableFactory {
>
>    ......
>
>
>    /**
>     * Creates and configures a {@link TableSource} based on the given {@link Context}.
>     *
>     * @param context context of this table source.
>     * @return the configured table source.
>     */
>    default TableSource<T> createTableSource(Context context) {
>       ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>       return createTableSource(
>             new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()),
>             context.getTable());
>    }
>
>    /**
>     * Context of table source creation. Contains table information and environment information.
>     */
>    interface Context {
>
>       /**
>        * @return full identifier of the given {@link CatalogTable}.
>        */
>       ObjectIdentifier getTableIdentifier();
>
>       /**
>        * @return table {@link CatalogTable} instance.
>        */
>       CatalogTable getTable();
>
>       /**
>        * @return readable config of this table environment.
>        */
>       ReadableConfig getTableConfig();
>    }
> }
>
> public interface TableSinkFactory<T> extends TableFactory {
>
>    ......
>
>    /**
>     * Creates and configures a {@link TableSink} based on the given {@link Context}.
>     *
>     * @param context context of this table sink.
>     * @return the configured table sink.
>     */
>    default TableSink<T> createTableSink(Context context) {
>       ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>       return createTableSink(
>             new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()),
>             context.getTable());
>    }
>
>    /**
>     * Context of table sink creation. Contains table information and environment information.
>     */
>    interface Context {
>
>       /**
>        * @return full identifier of the given {@link CatalogTable}.
>        */
>       ObjectIdentifier getTableIdentifier();
>
>       /**
>        * @return table {@link CatalogTable} instance.
>        */
>       CatalogTable getTable();
>
>       /**
>        * @return readable config of this table environment.
>        */
>       ReadableConfig getTableConfig();
>
>       /**
>        * @return Input whether or not it is bounded.
>        */
>       boolean isBounded();
>    }
> }
>
> If there is no objection, I will start a vote thread. (if necessary, I can
> also edit a FLIP).
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <[hidden email]>
> wrote:
>
>> Thanks Bowen and Timo for involving.
>>
>> Hi Bowen,
>>
>> > 1. is it better to have explicit APIs like "createBatchTableSource(...)"
>> I think it is better to keep one method, since in [1], we have reached
>> one in DataStream layer to maintain a single API in "env.source". I think
>> it is good to not split batch and stream, And our TableSource/TableSink are
>> the same class for both batch and streaming too.
>>
>> > 2. I'm not sure of the benefits to have a CatalogTableContext class.
>> As Timo said, We may have more parameters to add in the future, take a
>> look to "AbstractRichFunction.RuntimeContext", It's added little by little.
>>
>> Hi Timo,
>>
>> Your suggestion about Context looks good to me.
>> "TablePath" used in Hive for updating the catalog information of this
>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
>>
>> > Can we postpone the change of TableValidators?
>> Yes, ConfigOption validation looks good to me. It seems that you have
>> been thinking about this for a long time. It's very good. Looking forward
>> to the promotion of FLIP-54.
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]> wrote:
>>
>>> Hi Jingsong,
>>>
>>> +1 for adding a context in the source and sink factories. A context
>>> class also allows for future modifications without touching the
>>> TableFactory interface again.
>>>
>>> How about:
>>>
>>> interface TableSourceFactory {
>>>      interface Context {
>>>         // ...
>>>      }
>>> }
>>>
>>> Because I find the name `CatalogTableContext` confusing and we can bound
>>> the interface to the factory class itself as an inner interface.
>>>
>>> Readable access to configuration sounds also right to me. Can we remove
>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
>>> factory should know the path. And if so, it should be an
>>> `ObjectIdentifier` instead to also know about the catalog we are using.
>>>
>>> The `isStreamingMode()` should be renamed to `isBounded()` because we
>>> would like to use terminology around boundedness rather than
>>> streaming/batch.
>>>
>>> @Bowen: We are in the process of unifying the APIs and thus explicitly
>>> avoid specialized methods in the future.
>>>
>>> Can we postpone the change of TableValidators? I don't think that every
>>> factory needs a schema validator. Ideally, the factory should just
>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
>>> validation logic as mentioned in the validation part of FLIP-54[1]. But
>>> currently our config options are not rich enough to have a unified
>>> validation. Additionally, the factory should return some properties such
>>> as "supports event-time" for the schema validation outside of the
>>> factory itself.
>>>
>>> Regards,
>>> Timo
>>>
>>> [1]
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>>>
>>>
>>>
>>> On 16.01.20 00:51, Bowen Li wrote:
>>> > Hi Jingsong,
>>> >
>>> > The 1st and 2nd pain points you described are very valid, as I'm more
>>> > familiar with them. I agree these are shortcomings of the current
>>> Flink SQL
>>> > design.
>>> >
>>> > A couple comments on your 1st proposal:
>>> >
>>> > 1. is it better to have explicit APIs like
>>> "createBatchTableSource(...)"
>>> > and "createStreamingTableSource(...)" in TableSourceFactory (would be
>>> > similar for sink factory) to let planner handle which mode (streaming
>>> vs
>>> > batch) of source should be instantiated? That way we don't need to
>>> always
>>> > let connector developers handling an if-else on isStreamingMode.
>>> > 2. I'm not sure of the benefits to have a CatalogTableContext class.
>>> The
>>> > path, table, and config are fairly independent of each other. So why
>>> not
>>> > pass the config in as 3rd parameter as `createXxxTableSource(path,
>>> > catalogTable, tableConfig)?
>>> >
>>> >
>>> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]>
>>> wrote:
>>> >
>>> >> Hi dev,
>>> >>
>>> >> I'd like to kick off a discussion on the improvement of
>>> TableSourceFactory
>>> >> and TableSinkFactory.
>>> >>
>>> >> Motivation:
>>> >> Now the main needs and problems are:
>>> >> 1.Connector can't get TableConfig [1], and some behaviors really need
>>> to be
>>> >> controlled by the user's table configuration. In the era of catalog,
>>> we
>>> >> can't put these config in connector properties, which is too
>>> inconvenient.
>>> >> 2.Connector can't know if this is batch or stream execution mode. But
>>> the
>>> >> sink implementation of batch and stream is totally different. I
>>> understand
>>> >> there is an update mode property now, but it splits the batch and
>>> stream in
>>> >> the catalog dimension. In fact, this information can be obtained
>>> through
>>> >> the current TableEnvironment.
>>> >> 3.No interface to call validation. Now our validation is more util
>>> classes.
>>> >> It depends on whether or not the connector calls. Now we have some new
>>> >> validations to add, such as [2], which is really confuse uses, even
>>> >> developers. Another problem is that our SQL update (DDL) does not have
>>> >> validation [3]. It is better to report an error when executing DDL,
>>> >> otherwise it will confuse the user.
>>> >>
>>> >> Proposed change draft for 1 and 2:
>>> >>
>>> >> interface CatalogTableContext {
>>> >>     ObjectPath getTablePath();
>>> >>     CatalogTable getTable();
>>> >>     ReadableConfig getTableConfig();
>>> >>     boolean isStreamingMode();
>>> >> }
>>> >>
>>> >> public interface TableSourceFactory<T> extends TableFactory {
>>> >>
>>> >>     default TableSource<T> createTableSource(CatalogTableContext
>>> context) {
>>> >>        return createTableSource(context.getTablePath(),
>>> context.getTable());
>>> >>     }
>>> >>
>>> >>     ......
>>> >> }
>>> >>
>>> >> Proposed change draft for 3:
>>> >>
>>> >> public interface TableFactory {
>>> >>
>>> >>     TableValidators validators();
>>> >>
>>> >>     interface TableValidators {
>>> >>        ConnectorDescriptorValidator connectorValidator();
>>> >>        TableSchemaValidator schemaValidator();
>>> >>        FormatDescriptorValidator formatValidator();
>>> >>     }
>>> >> }
>>> >>
>>> >> What do you think?
>>> >>
>>> >> [1] https://issues.apache.org/jira/browse/FLINK-15290
>>> >> [2]
>>> >>
>>> >>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>>> >> [3] https://issues.apache.org/jira/browse/FLINK-15509
>>> >>
>>> >> Best,
>>> >> Jingsong Lee
>>> >>
>>> >
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jingsong Li
So the interface will be:

public interface TableSourceFactory<T> extends TableFactory {
   ......

   /**
    * Creates and configures a {@link TableSource} based on the given
{@link Context}.
    *
    * @param context context of this table source.
    * @return the configured table source.
    */
   default TableSource<T> createTableSource(Context context) {
      ObjectIdentifier tableIdentifier = context.getTableIdentifier();
      return createTableSource(
            new ObjectPath(tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()),
            context.getTable());
   }
   /**
    * Context of table source creation. Contains table information and
environment information.
    */
   interface Context {
      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getTableIdentifier();
      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();
      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getTableConfig();
   }
}

public interface TableSinkFactory<T> extends TableFactory {
   ......
   /**
    * Creates and configures a {@link TableSink} based on the given
{@link Context}.
    *
    * @param context context of this table sink.
    * @return the configured table sink.
    */
   default TableSink<T> createTableSink(Context context) {
      ObjectIdentifier tableIdentifier = context.getTableIdentifier();
      return createTableSink(
            new ObjectPath(tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()),
            context.getTable());
   }
   /**
    * Context of table sink creation. Contains table information and
environment information.
    */
   interface Context {
      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getTableIdentifier();
      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();
      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getTableConfig();
   }
}


Best,
Jingsong Lee

On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <[hidden email]> wrote:

> Hi all,
>
> After rethinking and discussion with Kurt, I'd like to remove "isBounded".
> We can delay this is bounded message to TableSink.
> With TableSink refactor, we need consider "consumeDataStream"
> and "consumeBoundedStream".
>
> Best,
> Jingsong Lee
>
> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <[hidden email]> wrote:
>
>> Hi Jark,
>>
>> Thanks involving, yes, it's hard to understand to add isBounded on the
>> source.
>> I recommend adding only to sink at present, because sink has upstream.
>> Its upstream is either bounded or unbounded.
>>
>> Hi all,
>>
>> Let me summarize with your suggestions.
>>
>> public interface TableSourceFactory<T> extends TableFactory {
>>
>>    ......
>>
>>
>>    /**
>>     * Creates and configures a {@link TableSource} based on the given {@link Context}.
>>     *
>>     * @param context context of this table source.
>>     * @return the configured table source.
>>     */
>>    default TableSource<T> createTableSource(Context context) {
>>       ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>>       return createTableSource(
>>             new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()),
>>             context.getTable());
>>    }
>>
>>    /**
>>     * Context of table source creation. Contains table information and environment information.
>>     */
>>    interface Context {
>>
>>       /**
>>        * @return full identifier of the given {@link CatalogTable}.
>>        */
>>       ObjectIdentifier getTableIdentifier();
>>
>>       /**
>>        * @return table {@link CatalogTable} instance.
>>        */
>>       CatalogTable getTable();
>>
>>       /**
>>        * @return readable config of this table environment.
>>        */
>>       ReadableConfig getTableConfig();
>>    }
>> }
>>
>> public interface TableSinkFactory<T> extends TableFactory {
>>
>>    ......
>>
>>    /**
>>     * Creates and configures a {@link TableSink} based on the given {@link Context}.
>>     *
>>     * @param context context of this table sink.
>>     * @return the configured table sink.
>>     */
>>    default TableSink<T> createTableSink(Context context) {
>>       ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>>       return createTableSink(
>>             new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()),
>>             context.getTable());
>>    }
>>
>>    /**
>>     * Context of table sink creation. Contains table information and environment information.
>>     */
>>    interface Context {
>>
>>       /**
>>        * @return full identifier of the given {@link CatalogTable}.
>>        */
>>       ObjectIdentifier getTableIdentifier();
>>
>>       /**
>>        * @return table {@link CatalogTable} instance.
>>        */
>>       CatalogTable getTable();
>>
>>       /**
>>        * @return readable config of this table environment.
>>        */
>>       ReadableConfig getTableConfig();
>>
>>       /**
>>        * @return Input whether or not it is bounded.
>>        */
>>       boolean isBounded();
>>    }
>> }
>>
>> If there is no objection, I will start a vote thread. (if necessary, I
>> can also edit a FLIP).
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <[hidden email]>
>> wrote:
>>
>>> Thanks Bowen and Timo for involving.
>>>
>>> Hi Bowen,
>>>
>>> > 1. is it better to have explicit APIs like
>>> "createBatchTableSource(...)"
>>> I think it is better to keep one method, since in [1], we have reached
>>> one in DataStream layer to maintain a single API in "env.source". I think
>>> it is good to not split batch and stream, And our TableSource/TableSink are
>>> the same class for both batch and streaming too.
>>>
>>> > 2. I'm not sure of the benefits to have a CatalogTableContext class.
>>> As Timo said, We may have more parameters to add in the future, take a
>>> look to "AbstractRichFunction.RuntimeContext", It's added little by little.
>>>
>>> Hi Timo,
>>>
>>> Your suggestion about Context looks good to me.
>>> "TablePath" used in Hive for updating the catalog information of this
>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
>>>
>>> > Can we postpone the change of TableValidators?
>>> Yes, ConfigOption validation looks good to me. It seems that you have
>>> been thinking about this for a long time. It's very good. Looking forward
>>> to the promotion of FLIP-54.
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]> wrote:
>>>
>>>> Hi Jingsong,
>>>>
>>>> +1 for adding a context in the source and sink factories. A context
>>>> class also allows for future modifications without touching the
>>>> TableFactory interface again.
>>>>
>>>> How about:
>>>>
>>>> interface TableSourceFactory {
>>>>      interface Context {
>>>>         // ...
>>>>      }
>>>> }
>>>>
>>>> Because I find the name `CatalogTableContext` confusing and we can
>>>> bound
>>>> the interface to the factory class itself as an inner interface.
>>>>
>>>> Readable access to configuration sounds also right to me. Can we remove
>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
>>>> factory should know the path. And if so, it should be an
>>>> `ObjectIdentifier` instead to also know about the catalog we are using.
>>>>
>>>> The `isStreamingMode()` should be renamed to `isBounded()` because we
>>>> would like to use terminology around boundedness rather than
>>>> streaming/batch.
>>>>
>>>> @Bowen: We are in the process of unifying the APIs and thus explicitly
>>>> avoid specialized methods in the future.
>>>>
>>>> Can we postpone the change of TableValidators? I don't think that every
>>>> factory needs a schema validator. Ideally, the factory should just
>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
>>>> validation logic as mentioned in the validation part of FLIP-54[1]. But
>>>> currently our config options are not rich enough to have a unified
>>>> validation. Additionally, the factory should return some properties
>>>> such
>>>> as "supports event-time" for the schema validation outside of the
>>>> factory itself.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1]
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>>>>
>>>>
>>>>
>>>> On 16.01.20 00:51, Bowen Li wrote:
>>>> > Hi Jingsong,
>>>> >
>>>> > The 1st and 2nd pain points you described are very valid, as I'm more
>>>> > familiar with them. I agree these are shortcomings of the current
>>>> Flink SQL
>>>> > design.
>>>> >
>>>> > A couple comments on your 1st proposal:
>>>> >
>>>> > 1. is it better to have explicit APIs like
>>>> "createBatchTableSource(...)"
>>>> > and "createStreamingTableSource(...)" in TableSourceFactory (would be
>>>> > similar for sink factory) to let planner handle which mode (streaming
>>>> vs
>>>> > batch) of source should be instantiated? That way we don't need to
>>>> always
>>>> > let connector developers handling an if-else on isStreamingMode.
>>>> > 2. I'm not sure of the benefits to have a CatalogTableContext class.
>>>> The
>>>> > path, table, and config are fairly independent of each other. So why
>>>> not
>>>> > pass the config in as 3rd parameter as `createXxxTableSource(path,
>>>> > catalogTable, tableConfig)?
>>>> >
>>>> >
>>>> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]>
>>>> wrote:
>>>> >
>>>> >> Hi dev,
>>>> >>
>>>> >> I'd like to kick off a discussion on the improvement of
>>>> TableSourceFactory
>>>> >> and TableSinkFactory.
>>>> >>
>>>> >> Motivation:
>>>> >> Now the main needs and problems are:
>>>> >> 1.Connector can't get TableConfig [1], and some behaviors really
>>>> need to be
>>>> >> controlled by the user's table configuration. In the era of catalog,
>>>> we
>>>> >> can't put these config in connector properties, which is too
>>>> inconvenient.
>>>> >> 2.Connector can't know if this is batch or stream execution mode.
>>>> But the
>>>> >> sink implementation of batch and stream is totally different. I
>>>> understand
>>>> >> there is an update mode property now, but it splits the batch and
>>>> stream in
>>>> >> the catalog dimension. In fact, this information can be obtained
>>>> through
>>>> >> the current TableEnvironment.
>>>> >> 3.No interface to call validation. Now our validation is more util
>>>> classes.
>>>> >> It depends on whether or not the connector calls. Now we have some
>>>> new
>>>> >> validations to add, such as [2], which is really confuse uses, even
>>>> >> developers. Another problem is that our SQL update (DDL) does not
>>>> have
>>>> >> validation [3]. It is better to report an error when executing DDL,
>>>> >> otherwise it will confuse the user.
>>>> >>
>>>> >> Proposed change draft for 1 and 2:
>>>> >>
>>>> >> interface CatalogTableContext {
>>>> >>     ObjectPath getTablePath();
>>>> >>     CatalogTable getTable();
>>>> >>     ReadableConfig getTableConfig();
>>>> >>     boolean isStreamingMode();
>>>> >> }
>>>> >>
>>>> >> public interface TableSourceFactory<T> extends TableFactory {
>>>> >>
>>>> >>     default TableSource<T> createTableSource(CatalogTableContext
>>>> context) {
>>>> >>        return createTableSource(context.getTablePath(),
>>>> context.getTable());
>>>> >>     }
>>>> >>
>>>> >>     ......
>>>> >> }
>>>> >>
>>>> >> Proposed change draft for 3:
>>>> >>
>>>> >> public interface TableFactory {
>>>> >>
>>>> >>     TableValidators validators();
>>>> >>
>>>> >>     interface TableValidators {
>>>> >>        ConnectorDescriptorValidator connectorValidator();
>>>> >>        TableSchemaValidator schemaValidator();
>>>> >>        FormatDescriptorValidator formatValidator();
>>>> >>     }
>>>> >> }
>>>> >>
>>>> >> What do you think?
>>>> >>
>>>> >> [1] https://issues.apache.org/jira/browse/FLINK-15290
>>>> >> [2]
>>>> >>
>>>> >>
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>>>> >> [3] https://issues.apache.org/jira/browse/FLINK-15509
>>>> >>
>>>> >> Best,
>>>> >> Jingsong Lee
>>>> >>
>>>> >
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Timo Walther-2
Hi Jingsong,

some last minute changes from my side:

1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
obvious. Otherwise people expect a `TableIdentifier` class being
returned here.

2. rename `getTableConfig` to `getConfiguration()` in the future this
will not only be a "table" config but might give access to the full
Flink config

Thanks,
Timo


On 04.02.20 06:27, Jingsong Li wrote:

> So the interface will be:
>
> public interface TableSourceFactory<T> extends TableFactory {
>     ......
>
>     /**
>      * Creates and configures a {@link TableSource} based on the given
> {@link Context}.
>      *
>      * @param context context of this table source.
>      * @return the configured table source.
>      */
>     default TableSource<T> createTableSource(Context context) {
>        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>        return createTableSource(
>              new ObjectPath(tableIdentifier.getDatabaseName(),
> tableIdentifier.getObjectName()),
>              context.getTable());
>     }
>     /**
>      * Context of table source creation. Contains table information and
> environment information.
>      */
>     interface Context {
>        /**
>         * @return full identifier of the given {@link CatalogTable}.
>         */
>        ObjectIdentifier getTableIdentifier();
>        /**
>         * @return table {@link CatalogTable} instance.
>         */
>        CatalogTable getTable();
>        /**
>         * @return readable config of this table environment.
>         */
>        ReadableConfig getTableConfig();
>     }
> }
>
> public interface TableSinkFactory<T> extends TableFactory {
>     ......
>     /**
>      * Creates and configures a {@link TableSink} based on the given
> {@link Context}.
>      *
>      * @param context context of this table sink.
>      * @return the configured table sink.
>      */
>     default TableSink<T> createTableSink(Context context) {
>        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>        return createTableSink(
>              new ObjectPath(tableIdentifier.getDatabaseName(),
> tableIdentifier.getObjectName()),
>              context.getTable());
>     }
>     /**
>      * Context of table sink creation. Contains table information and
> environment information.
>      */
>     interface Context {
>        /**
>         * @return full identifier of the given {@link CatalogTable}.
>         */
>        ObjectIdentifier getTableIdentifier();
>        /**
>         * @return table {@link CatalogTable} instance.
>         */
>        CatalogTable getTable();
>        /**
>         * @return readable config of this table environment.
>         */
>        ReadableConfig getTableConfig();
>     }
> }
>
>
> Best,
> Jingsong Lee
>
> On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <[hidden email]> wrote:
>
>> Hi all,
>>
>> After rethinking and discussion with Kurt, I'd like to remove "isBounded".
>> We can delay this is bounded message to TableSink.
>> With TableSink refactor, we need consider "consumeDataStream"
>> and "consumeBoundedStream".
>>
>> Best,
>> Jingsong Lee
>>
>> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <[hidden email]> wrote:
>>
>>> Hi Jark,
>>>
>>> Thanks involving, yes, it's hard to understand to add isBounded on the
>>> source.
>>> I recommend adding only to sink at present, because sink has upstream.
>>> Its upstream is either bounded or unbounded.
>>>
>>> Hi all,
>>>
>>> Let me summarize with your suggestions.
>>>
>>> public interface TableSourceFactory<T> extends TableFactory {
>>>
>>>     ......
>>>
>>>
>>>     /**
>>>      * Creates and configures a {@link TableSource} based on the given {@link Context}.
>>>      *
>>>      * @param context context of this table source.
>>>      * @return the configured table source.
>>>      */
>>>     default TableSource<T> createTableSource(Context context) {
>>>        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>>>        return createTableSource(
>>>              new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()),
>>>              context.getTable());
>>>     }
>>>
>>>     /**
>>>      * Context of table source creation. Contains table information and environment information.
>>>      */
>>>     interface Context {
>>>
>>>        /**
>>>         * @return full identifier of the given {@link CatalogTable}.
>>>         */
>>>        ObjectIdentifier getTableIdentifier();
>>>
>>>        /**
>>>         * @return table {@link CatalogTable} instance.
>>>         */
>>>        CatalogTable getTable();
>>>
>>>        /**
>>>         * @return readable config of this table environment.
>>>         */
>>>        ReadableConfig getTableConfig();
>>>     }
>>> }
>>>
>>> public interface TableSinkFactory<T> extends TableFactory {
>>>
>>>     ......
>>>
>>>     /**
>>>      * Creates and configures a {@link TableSink} based on the given {@link Context}.
>>>      *
>>>      * @param context context of this table sink.
>>>      * @return the configured table sink.
>>>      */
>>>     default TableSink<T> createTableSink(Context context) {
>>>        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>>>        return createTableSink(
>>>              new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()),
>>>              context.getTable());
>>>     }
>>>
>>>     /**
>>>      * Context of table sink creation. Contains table information and environment information.
>>>      */
>>>     interface Context {
>>>
>>>        /**
>>>         * @return full identifier of the given {@link CatalogTable}.
>>>         */
>>>        ObjectIdentifier getTableIdentifier();
>>>
>>>        /**
>>>         * @return table {@link CatalogTable} instance.
>>>         */
>>>        CatalogTable getTable();
>>>
>>>        /**
>>>         * @return readable config of this table environment.
>>>         */
>>>        ReadableConfig getTableConfig();
>>>
>>>        /**
>>>         * @return Input whether or not it is bounded.
>>>         */
>>>        boolean isBounded();
>>>     }
>>> }
>>>
>>> If there is no objection, I will start a vote thread. (if necessary, I
>>> can also edit a FLIP).
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <[hidden email]>
>>> wrote:
>>>
>>>> Thanks Bowen and Timo for involving.
>>>>
>>>> Hi Bowen,
>>>>
>>>>> 1. is it better to have explicit APIs like
>>>> "createBatchTableSource(...)"
>>>> I think it is better to keep one method, since in [1], we have reached
>>>> one in DataStream layer to maintain a single API in "env.source". I think
>>>> it is good to not split batch and stream, And our TableSource/TableSink are
>>>> the same class for both batch and streaming too.
>>>>
>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class.
>>>> As Timo said, We may have more parameters to add in the future, take a
>>>> look to "AbstractRichFunction.RuntimeContext", It's added little by little.
>>>>
>>>> Hi Timo,
>>>>
>>>> Your suggestion about Context looks good to me.
>>>> "TablePath" used in Hive for updating the catalog information of this
>>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
>>>>
>>>>> Can we postpone the change of TableValidators?
>>>> Yes, ConfigOption validation looks good to me. It seems that you have
>>>> been thinking about this for a long time. It's very good. Looking forward
>>>> to the promotion of FLIP-54.
>>>>
>>>> [1]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]> wrote:
>>>>
>>>>> Hi Jingsong,
>>>>>
>>>>> +1 for adding a context in the source and sink factories. A context
>>>>> class also allows for future modifications without touching the
>>>>> TableFactory interface again.
>>>>>
>>>>> How about:
>>>>>
>>>>> interface TableSourceFactory {
>>>>>       interface Context {
>>>>>          // ...
>>>>>       }
>>>>> }
>>>>>
>>>>> Because I find the name `CatalogTableContext` confusing and we can
>>>>> bound
>>>>> the interface to the factory class itself as an inner interface.
>>>>>
>>>>> Readable access to configuration sounds also right to me. Can we remove
>>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
>>>>> factory should know the path. And if so, it should be an
>>>>> `ObjectIdentifier` instead to also know about the catalog we are using.
>>>>>
>>>>> The `isStreamingMode()` should be renamed to `isBounded()` because we
>>>>> would like to use terminology around boundedness rather than
>>>>> streaming/batch.
>>>>>
>>>>> @Bowen: We are in the process of unifying the APIs and thus explicitly
>>>>> avoid specialized methods in the future.
>>>>>
>>>>> Can we postpone the change of TableValidators? I don't think that every
>>>>> factory needs a schema validator. Ideally, the factory should just
>>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
>>>>> validation logic as mentioned in the validation part of FLIP-54[1]. But
>>>>> currently our config options are not rich enough to have a unified
>>>>> validation. Additionally, the factory should return some properties
>>>>> such
>>>>> as "supports event-time" for the schema validation outside of the
>>>>> factory itself.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>> [1]
>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>>>>>
>>>>>
>>>>>
>>>>> On 16.01.20 00:51, Bowen Li wrote:
>>>>>> Hi Jingsong,
>>>>>>
>>>>>> The 1st and 2nd pain points you described are very valid, as I'm more
>>>>>> familiar with them. I agree these are shortcomings of the current
>>>>> Flink SQL
>>>>>> design.
>>>>>>
>>>>>> A couple comments on your 1st proposal:
>>>>>>
>>>>>> 1. is it better to have explicit APIs like
>>>>> "createBatchTableSource(...)"
>>>>>> and "createStreamingTableSource(...)" in TableSourceFactory (would be
>>>>>> similar for sink factory) to let planner handle which mode (streaming
>>>>> vs
>>>>>> batch) of source should be instantiated? That way we don't need to
>>>>> always
>>>>>> let connector developers handling an if-else on isStreamingMode.
>>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class.
>>>>> The
>>>>>> path, table, and config are fairly independent of each other. So why
>>>>> not
>>>>>> pass the config in as 3rd parameter as `createXxxTableSource(path,
>>>>>> catalogTable, tableConfig)?
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>>> Hi dev,
>>>>>>>
>>>>>>> I'd like to kick off a discussion on the improvement of
>>>>> TableSourceFactory
>>>>>>> and TableSinkFactory.
>>>>>>>
>>>>>>> Motivation:
>>>>>>> Now the main needs and problems are:
>>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really
>>>>> need to be
>>>>>>> controlled by the user's table configuration. In the era of catalog,
>>>>> we
>>>>>>> can't put these config in connector properties, which is too
>>>>> inconvenient.
>>>>>>> 2.Connector can't know if this is batch or stream execution mode.
>>>>> But the
>>>>>>> sink implementation of batch and stream is totally different. I
>>>>> understand
>>>>>>> there is an update mode property now, but it splits the batch and
>>>>> stream in
>>>>>>> the catalog dimension. In fact, this information can be obtained
>>>>> through
>>>>>>> the current TableEnvironment.
>>>>>>> 3.No interface to call validation. Now our validation is more util
>>>>> classes.
>>>>>>> It depends on whether or not the connector calls. Now we have some
>>>>> new
>>>>>>> validations to add, such as [2], which is really confuse uses, even
>>>>>>> developers. Another problem is that our SQL update (DDL) does not
>>>>> have
>>>>>>> validation [3]. It is better to report an error when executing DDL,
>>>>>>> otherwise it will confuse the user.
>>>>>>>
>>>>>>> Proposed change draft for 1 and 2:
>>>>>>>
>>>>>>> interface CatalogTableContext {
>>>>>>>      ObjectPath getTablePath();
>>>>>>>      CatalogTable getTable();
>>>>>>>      ReadableConfig getTableConfig();
>>>>>>>      boolean isStreamingMode();
>>>>>>> }
>>>>>>>
>>>>>>> public interface TableSourceFactory<T> extends TableFactory {
>>>>>>>
>>>>>>>      default TableSource<T> createTableSource(CatalogTableContext
>>>>> context) {
>>>>>>>         return createTableSource(context.getTablePath(),
>>>>> context.getTable());
>>>>>>>      }
>>>>>>>
>>>>>>>      ......
>>>>>>> }
>>>>>>>
>>>>>>> Proposed change draft for 3:
>>>>>>>
>>>>>>> public interface TableFactory {
>>>>>>>
>>>>>>>      TableValidators validators();
>>>>>>>
>>>>>>>      interface TableValidators {
>>>>>>>         ConnectorDescriptorValidator connectorValidator();
>>>>>>>         TableSchemaValidator schemaValidator();
>>>>>>>         FormatDescriptorValidator formatValidator();
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
>>>>>>> [2]
>>>>>>>
>>>>>>>
>>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jingsong Li
Hi Timo,

G ood catch!

I really love the idea 2, a full Flink config looks very good to me.

Try to understand your first one, actually we don't have `TableIdentifier`
class now. But TableFactory already indicate table. So I am OK.

New Context should be:

   /**
    * Context of table source creation. Contains table information and
environment information.
    */
   interface Context {
      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getObjectIdentifier();
      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();
      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getConfiguration();
   }


Best,
Jingsong Lee

On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <[hidden email]> wrote:

> Hi Jingsong,
>
> some last minute changes from my side:
>
> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
> obvious. Otherwise people expect a `TableIdentifier` class being
> returned here.
>
> 2. rename `getTableConfig` to `getConfiguration()` in the future this
> will not only be a "table" config but might give access to the full
> Flink config
>
> Thanks,
> Timo
>
>
> On 04.02.20 06:27, Jingsong Li wrote:
> > So the interface will be:
> >
> > public interface TableSourceFactory<T> extends TableFactory {
> >     ......
> >
> >     /**
> >      * Creates and configures a {@link TableSource} based on the given
> > {@link Context}.
> >      *
> >      * @param context context of this table source.
> >      * @return the configured table source.
> >      */
> >     default TableSource<T> createTableSource(Context context) {
> >        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
> >        return createTableSource(
> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > tableIdentifier.getObjectName()),
> >              context.getTable());
> >     }
> >     /**
> >      * Context of table source creation. Contains table information and
> > environment information.
> >      */
> >     interface Context {
> >        /**
> >         * @return full identifier of the given {@link CatalogTable}.
> >         */
> >        ObjectIdentifier getTableIdentifier();
> >        /**
> >         * @return table {@link CatalogTable} instance.
> >         */
> >        CatalogTable getTable();
> >        /**
> >         * @return readable config of this table environment.
> >         */
> >        ReadableConfig getTableConfig();
> >     }
> > }
> >
> > public interface TableSinkFactory<T> extends TableFactory {
> >     ......
> >     /**
> >      * Creates and configures a {@link TableSink} based on the given
> > {@link Context}.
> >      *
> >      * @param context context of this table sink.
> >      * @return the configured table sink.
> >      */
> >     default TableSink<T> createTableSink(Context context) {
> >        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
> >        return createTableSink(
> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > tableIdentifier.getObjectName()),
> >              context.getTable());
> >     }
> >     /**
> >      * Context of table sink creation. Contains table information and
> > environment information.
> >      */
> >     interface Context {
> >        /**
> >         * @return full identifier of the given {@link CatalogTable}.
> >         */
> >        ObjectIdentifier getTableIdentifier();
> >        /**
> >         * @return table {@link CatalogTable} instance.
> >         */
> >        CatalogTable getTable();
> >        /**
> >         * @return readable config of this table environment.
> >         */
> >        ReadableConfig getTableConfig();
> >     }
> > }
> >
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <[hidden email]>
> wrote:
> >
> >> Hi all,
> >>
> >> After rethinking and discussion with Kurt, I'd like to remove
> "isBounded".
> >> We can delay this is bounded message to TableSink.
> >> With TableSink refactor, we need consider "consumeDataStream"
> >> and "consumeBoundedStream".
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <[hidden email]>
> wrote:
> >>
> >>> Hi Jark,
> >>>
> >>> Thanks involving, yes, it's hard to understand to add isBounded on the
> >>> source.
> >>> I recommend adding only to sink at present, because sink has upstream.
> >>> Its upstream is either bounded or unbounded.
> >>>
> >>> Hi all,
> >>>
> >>> Let me summarize with your suggestions.
> >>>
> >>> public interface TableSourceFactory<T> extends TableFactory {
> >>>
> >>>     ......
> >>>
> >>>
> >>>     /**
> >>>      * Creates and configures a {@link TableSource} based on the given
> {@link Context}.
> >>>      *
> >>>      * @param context context of this table source.
> >>>      * @return the configured table source.
> >>>      */
> >>>     default TableSource<T> createTableSource(Context context) {
> >>>        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
> >>>        return createTableSource(
> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> tableIdentifier.getObjectName()),
> >>>              context.getTable());
> >>>     }
> >>>
> >>>     /**
> >>>      * Context of table source creation. Contains table information
> and environment information.
> >>>      */
> >>>     interface Context {
> >>>
> >>>        /**
> >>>         * @return full identifier of the given {@link CatalogTable}.
> >>>         */
> >>>        ObjectIdentifier getTableIdentifier();
> >>>
> >>>        /**
> >>>         * @return table {@link CatalogTable} instance.
> >>>         */
> >>>        CatalogTable getTable();
> >>>
> >>>        /**
> >>>         * @return readable config of this table environment.
> >>>         */
> >>>        ReadableConfig getTableConfig();
> >>>     }
> >>> }
> >>>
> >>> public interface TableSinkFactory<T> extends TableFactory {
> >>>
> >>>     ......
> >>>
> >>>     /**
> >>>      * Creates and configures a {@link TableSink} based on the given
> {@link Context}.
> >>>      *
> >>>      * @param context context of this table sink.
> >>>      * @return the configured table sink.
> >>>      */
> >>>     default TableSink<T> createTableSink(Context context) {
> >>>        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
> >>>        return createTableSink(
> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> tableIdentifier.getObjectName()),
> >>>              context.getTable());
> >>>     }
> >>>
> >>>     /**
> >>>      * Context of table sink creation. Contains table information and
> environment information.
> >>>      */
> >>>     interface Context {
> >>>
> >>>        /**
> >>>         * @return full identifier of the given {@link CatalogTable}.
> >>>         */
> >>>        ObjectIdentifier getTableIdentifier();
> >>>
> >>>        /**
> >>>         * @return table {@link CatalogTable} instance.
> >>>         */
> >>>        CatalogTable getTable();
> >>>
> >>>        /**
> >>>         * @return readable config of this table environment.
> >>>         */
> >>>        ReadableConfig getTableConfig();
> >>>
> >>>        /**
> >>>         * @return Input whether or not it is bounded.
> >>>         */
> >>>        boolean isBounded();
> >>>     }
> >>> }
> >>>
> >>> If there is no objection, I will start a vote thread. (if necessary, I
> >>> can also edit a FLIP).
> >>>
> >>> Best,
> >>> Jingsong Lee
> >>>
> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <[hidden email]>
> >>> wrote:
> >>>
> >>>> Thanks Bowen and Timo for involving.
> >>>>
> >>>> Hi Bowen,
> >>>>
> >>>>> 1. is it better to have explicit APIs like
> >>>> "createBatchTableSource(...)"
> >>>> I think it is better to keep one method, since in [1], we have reached
> >>>> one in DataStream layer to maintain a single API in "env.source". I
> think
> >>>> it is good to not split batch and stream, And our
> TableSource/TableSink are
> >>>> the same class for both batch and streaming too.
> >>>>
> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class.
> >>>> As Timo said, We may have more parameters to add in the future, take a
> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by
> little.
> >>>>
> >>>> Hi Timo,
> >>>>
> >>>> Your suggestion about Context looks good to me.
> >>>> "TablePath" used in Hive for updating the catalog information of this
> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
> >>>>
> >>>>> Can we postpone the change of TableValidators?
> >>>> Yes, ConfigOption validation looks good to me. It seems that you have
> >>>> been thinking about this for a long time. It's very good. Looking
> forward
> >>>> to the promotion of FLIP-54.
> >>>>
> >>>> [1]
> >>>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>>>
> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]>
> wrote:
> >>>>
> >>>>> Hi Jingsong,
> >>>>>
> >>>>> +1 for adding a context in the source and sink factories. A context
> >>>>> class also allows for future modifications without touching the
> >>>>> TableFactory interface again.
> >>>>>
> >>>>> How about:
> >>>>>
> >>>>> interface TableSourceFactory {
> >>>>>       interface Context {
> >>>>>          // ...
> >>>>>       }
> >>>>> }
> >>>>>
> >>>>> Because I find the name `CatalogTableContext` confusing and we can
> >>>>> bound
> >>>>> the interface to the factory class itself as an inner interface.
> >>>>>
> >>>>> Readable access to configuration sounds also right to me. Can we
> remove
> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
> >>>>> factory should know the path. And if so, it should be an
> >>>>> `ObjectIdentifier` instead to also know about the catalog we are
> using.
> >>>>>
> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because we
> >>>>> would like to use terminology around boundedness rather than
> >>>>> streaming/batch.
> >>>>>
> >>>>> @Bowen: We are in the process of unifying the APIs and thus
> explicitly
> >>>>> avoid specialized methods in the future.
> >>>>>
> >>>>> Can we postpone the change of TableValidators? I don't think that
> every
> >>>>> factory needs a schema validator. Ideally, the factory should just
> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
> >>>>> validation logic as mentioned in the validation part of FLIP-54[1].
> But
> >>>>> currently our config options are not rich enough to have a unified
> >>>>> validation. Additionally, the factory should return some properties
> >>>>> such
> >>>>> as "supports event-time" for the schema validation outside of the
> >>>>> factory itself.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 16.01.20 00:51, Bowen Li wrote:
> >>>>>> Hi Jingsong,
> >>>>>>
> >>>>>> The 1st and 2nd pain points you described are very valid, as I'm
> more
> >>>>>> familiar with them. I agree these are shortcomings of the current
> >>>>> Flink SQL
> >>>>>> design.
> >>>>>>
> >>>>>> A couple comments on your 1st proposal:
> >>>>>>
> >>>>>> 1. is it better to have explicit APIs like
> >>>>> "createBatchTableSource(...)"
> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory (would
> be
> >>>>>> similar for sink factory) to let planner handle which mode
> (streaming
> >>>>> vs
> >>>>>> batch) of source should be instantiated? That way we don't need to
> >>>>> always
> >>>>>> let connector developers handling an if-else on isStreamingMode.
> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class.
> >>>>> The
> >>>>>> path, table, and config are fairly independent of each other. So why
> >>>>> not
> >>>>>> pass the config in as 3rd parameter as `createXxxTableSource(path,
> >>>>>> catalogTable, tableConfig)?
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <[hidden email]
> >
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi dev,
> >>>>>>>
> >>>>>>> I'd like to kick off a discussion on the improvement of
> >>>>> TableSourceFactory
> >>>>>>> and TableSinkFactory.
> >>>>>>>
> >>>>>>> Motivation:
> >>>>>>> Now the main needs and problems are:
> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really
> >>>>> need to be
> >>>>>>> controlled by the user's table configuration. In the era of
> catalog,
> >>>>> we
> >>>>>>> can't put these config in connector properties, which is too
> >>>>> inconvenient.
> >>>>>>> 2.Connector can't know if this is batch or stream execution mode.
> >>>>> But the
> >>>>>>> sink implementation of batch and stream is totally different. I
> >>>>> understand
> >>>>>>> there is an update mode property now, but it splits the batch and
> >>>>> stream in
> >>>>>>> the catalog dimension. In fact, this information can be obtained
> >>>>> through
> >>>>>>> the current TableEnvironment.
> >>>>>>> 3.No interface to call validation. Now our validation is more util
> >>>>> classes.
> >>>>>>> It depends on whether or not the connector calls. Now we have some
> >>>>> new
> >>>>>>> validations to add, such as [2], which is really confuse uses, even
> >>>>>>> developers. Another problem is that our SQL update (DDL) does not
> >>>>> have
> >>>>>>> validation [3]. It is better to report an error when executing DDL,
> >>>>>>> otherwise it will confuse the user.
> >>>>>>>
> >>>>>>> Proposed change draft for 1 and 2:
> >>>>>>>
> >>>>>>> interface CatalogTableContext {
> >>>>>>>      ObjectPath getTablePath();
> >>>>>>>      CatalogTable getTable();
> >>>>>>>      ReadableConfig getTableConfig();
> >>>>>>>      boolean isStreamingMode();
> >>>>>>> }
> >>>>>>>
> >>>>>>> public interface TableSourceFactory<T> extends TableFactory {
> >>>>>>>
> >>>>>>>      default TableSource<T> createTableSource(CatalogTableContext
> >>>>> context) {
> >>>>>>>         return createTableSource(context.getTablePath(),
> >>>>> context.getTable());
> >>>>>>>      }
> >>>>>>>
> >>>>>>>      ......
> >>>>>>> }
> >>>>>>>
> >>>>>>> Proposed change draft for 3:
> >>>>>>>
> >>>>>>> public interface TableFactory {
> >>>>>>>
> >>>>>>>      TableValidators validators();
> >>>>>>>
> >>>>>>>      interface TableValidators {
> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
> >>>>>>>         TableSchemaValidator schemaValidator();
> >>>>>>>         FormatDescriptorValidator formatValidator();
> >>>>>>>      }
> >>>>>>> }
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
> >>>>>>> [2]
> >>>>>>>
> >>>>>>>
> >>>>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jingsong Lee
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>>
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >
> >
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jingsong Li
Hi all,

As Jark suggested in VOTE thread.
JIRA created: https://issues.apache.org/jira/browse/FLINK-15912

Best,
Jingsong Lee

On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <[hidden email]> wrote:

> Hi Timo,
>
> G ood catch!
>
> I really love the idea 2, a full Flink config looks very good to me.
>
> Try to understand your first one, actually we don't have `TableIdentifier`
> class now. But TableFactory already indicate table. So I am OK.
>
> New Context should be:
>
>    /**
>     * Context of table source creation. Contains table information and environment information.
>     */
>    interface Context {
>       /**
>        * @return full identifier of the given {@link CatalogTable}.
>        */
>       ObjectIdentifier getObjectIdentifier();
>       /**
>        * @return table {@link CatalogTable} instance.
>        */
>       CatalogTable getTable();
>       /**
>        * @return readable config of this table environment.
>        */
>       ReadableConfig getConfiguration();
>    }
>
>
> Best,
> Jingsong Lee
>
> On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <[hidden email]> wrote:
>
>> Hi Jingsong,
>>
>> some last minute changes from my side:
>>
>> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
>> obvious. Otherwise people expect a `TableIdentifier` class being
>> returned here.
>>
>> 2. rename `getTableConfig` to `getConfiguration()` in the future this
>> will not only be a "table" config but might give access to the full
>> Flink config
>>
>> Thanks,
>> Timo
>>
>>
>> On 04.02.20 06:27, Jingsong Li wrote:
>> > So the interface will be:
>> >
>> > public interface TableSourceFactory<T> extends TableFactory {
>> >     ......
>> >
>> >     /**
>> >      * Creates and configures a {@link TableSource} based on the given
>> > {@link Context}.
>> >      *
>> >      * @param context context of this table source.
>> >      * @return the configured table source.
>> >      */
>> >     default TableSource<T> createTableSource(Context context) {
>> >        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>> >        return createTableSource(
>> >              new ObjectPath(tableIdentifier.getDatabaseName(),
>> > tableIdentifier.getObjectName()),
>> >              context.getTable());
>> >     }
>> >     /**
>> >      * Context of table source creation. Contains table information and
>> > environment information.
>> >      */
>> >     interface Context {
>> >        /**
>> >         * @return full identifier of the given {@link CatalogTable}.
>> >         */
>> >        ObjectIdentifier getTableIdentifier();
>> >        /**
>> >         * @return table {@link CatalogTable} instance.
>> >         */
>> >        CatalogTable getTable();
>> >        /**
>> >         * @return readable config of this table environment.
>> >         */
>> >        ReadableConfig getTableConfig();
>> >     }
>> > }
>> >
>> > public interface TableSinkFactory<T> extends TableFactory {
>> >     ......
>> >     /**
>> >      * Creates and configures a {@link TableSink} based on the given
>> > {@link Context}.
>> >      *
>> >      * @param context context of this table sink.
>> >      * @return the configured table sink.
>> >      */
>> >     default TableSink<T> createTableSink(Context context) {
>> >        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>> >        return createTableSink(
>> >              new ObjectPath(tableIdentifier.getDatabaseName(),
>> > tableIdentifier.getObjectName()),
>> >              context.getTable());
>> >     }
>> >     /**
>> >      * Context of table sink creation. Contains table information and
>> > environment information.
>> >      */
>> >     interface Context {
>> >        /**
>> >         * @return full identifier of the given {@link CatalogTable}.
>> >         */
>> >        ObjectIdentifier getTableIdentifier();
>> >        /**
>> >         * @return table {@link CatalogTable} instance.
>> >         */
>> >        CatalogTable getTable();
>> >        /**
>> >         * @return readable config of this table environment.
>> >         */
>> >        ReadableConfig getTableConfig();
>> >     }
>> > }
>> >
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <[hidden email]>
>> wrote:
>> >
>> >> Hi all,
>> >>
>> >> After rethinking and discussion with Kurt, I'd like to remove
>> "isBounded".
>> >> We can delay this is bounded message to TableSink.
>> >> With TableSink refactor, we need consider "consumeDataStream"
>> >> and "consumeBoundedStream".
>> >>
>> >> Best,
>> >> Jingsong Lee
>> >>
>> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <[hidden email]>
>> wrote:
>> >>
>> >>> Hi Jark,
>> >>>
>> >>> Thanks involving, yes, it's hard to understand to add isBounded on the
>> >>> source.
>> >>> I recommend adding only to sink at present, because sink has upstream.
>> >>> Its upstream is either bounded or unbounded.
>> >>>
>> >>> Hi all,
>> >>>
>> >>> Let me summarize with your suggestions.
>> >>>
>> >>> public interface TableSourceFactory<T> extends TableFactory {
>> >>>
>> >>>     ......
>> >>>
>> >>>
>> >>>     /**
>> >>>      * Creates and configures a {@link TableSource} based on the
>> given {@link Context}.
>> >>>      *
>> >>>      * @param context context of this table source.
>> >>>      * @return the configured table source.
>> >>>      */
>> >>>     default TableSource<T> createTableSource(Context context) {
>> >>>        ObjectIdentifier tableIdentifier =
>> context.getTableIdentifier();
>> >>>        return createTableSource(
>> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
>> tableIdentifier.getObjectName()),
>> >>>              context.getTable());
>> >>>     }
>> >>>
>> >>>     /**
>> >>>      * Context of table source creation. Contains table information
>> and environment information.
>> >>>      */
>> >>>     interface Context {
>> >>>
>> >>>        /**
>> >>>         * @return full identifier of the given {@link CatalogTable}.
>> >>>         */
>> >>>        ObjectIdentifier getTableIdentifier();
>> >>>
>> >>>        /**
>> >>>         * @return table {@link CatalogTable} instance.
>> >>>         */
>> >>>        CatalogTable getTable();
>> >>>
>> >>>        /**
>> >>>         * @return readable config of this table environment.
>> >>>         */
>> >>>        ReadableConfig getTableConfig();
>> >>>     }
>> >>> }
>> >>>
>> >>> public interface TableSinkFactory<T> extends TableFactory {
>> >>>
>> >>>     ......
>> >>>
>> >>>     /**
>> >>>      * Creates and configures a {@link TableSink} based on the given
>> {@link Context}.
>> >>>      *
>> >>>      * @param context context of this table sink.
>> >>>      * @return the configured table sink.
>> >>>      */
>> >>>     default TableSink<T> createTableSink(Context context) {
>> >>>        ObjectIdentifier tableIdentifier =
>> context.getTableIdentifier();
>> >>>        return createTableSink(
>> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
>> tableIdentifier.getObjectName()),
>> >>>              context.getTable());
>> >>>     }
>> >>>
>> >>>     /**
>> >>>      * Context of table sink creation. Contains table information and
>> environment information.
>> >>>      */
>> >>>     interface Context {
>> >>>
>> >>>        /**
>> >>>         * @return full identifier of the given {@link CatalogTable}.
>> >>>         */
>> >>>        ObjectIdentifier getTableIdentifier();
>> >>>
>> >>>        /**
>> >>>         * @return table {@link CatalogTable} instance.
>> >>>         */
>> >>>        CatalogTable getTable();
>> >>>
>> >>>        /**
>> >>>         * @return readable config of this table environment.
>> >>>         */
>> >>>        ReadableConfig getTableConfig();
>> >>>
>> >>>        /**
>> >>>         * @return Input whether or not it is bounded.
>> >>>         */
>> >>>        boolean isBounded();
>> >>>     }
>> >>> }
>> >>>
>> >>> If there is no objection, I will start a vote thread. (if necessary, I
>> >>> can also edit a FLIP).
>> >>>
>> >>> Best,
>> >>> Jingsong Lee
>> >>>
>> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <[hidden email]>
>> >>> wrote:
>> >>>
>> >>>> Thanks Bowen and Timo for involving.
>> >>>>
>> >>>> Hi Bowen,
>> >>>>
>> >>>>> 1. is it better to have explicit APIs like
>> >>>> "createBatchTableSource(...)"
>> >>>> I think it is better to keep one method, since in [1], we have
>> reached
>> >>>> one in DataStream layer to maintain a single API in "env.source". I
>> think
>> >>>> it is good to not split batch and stream, And our
>> TableSource/TableSink are
>> >>>> the same class for both batch and streaming too.
>> >>>>
>> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class.
>> >>>> As Timo said, We may have more parameters to add in the future, take
>> a
>> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by
>> little.
>> >>>>
>> >>>> Hi Timo,
>> >>>>
>> >>>> Your suggestion about Context looks good to me.
>> >>>> "TablePath" used in Hive for updating the catalog information of this
>> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
>> >>>>
>> >>>>> Can we postpone the change of TableValidators?
>> >>>> Yes, ConfigOption validation looks good to me. It seems that you have
>> >>>> been thinking about this for a long time. It's very good. Looking
>> forward
>> >>>> to the promotion of FLIP-54.
>> >>>>
>> >>>> [1]
>> >>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
>> >>>>
>> >>>> Best,
>> >>>> Jingsong Lee
>> >>>>
>> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]>
>> wrote:
>> >>>>
>> >>>>> Hi Jingsong,
>> >>>>>
>> >>>>> +1 for adding a context in the source and sink factories. A context
>> >>>>> class also allows for future modifications without touching the
>> >>>>> TableFactory interface again.
>> >>>>>
>> >>>>> How about:
>> >>>>>
>> >>>>> interface TableSourceFactory {
>> >>>>>       interface Context {
>> >>>>>          // ...
>> >>>>>       }
>> >>>>> }
>> >>>>>
>> >>>>> Because I find the name `CatalogTableContext` confusing and we can
>> >>>>> bound
>> >>>>> the interface to the factory class itself as an inner interface.
>> >>>>>
>> >>>>> Readable access to configuration sounds also right to me. Can we
>> remove
>> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
>> >>>>> factory should know the path. And if so, it should be an
>> >>>>> `ObjectIdentifier` instead to also know about the catalog we are
>> using.
>> >>>>>
>> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because
>> we
>> >>>>> would like to use terminology around boundedness rather than
>> >>>>> streaming/batch.
>> >>>>>
>> >>>>> @Bowen: We are in the process of unifying the APIs and thus
>> explicitly
>> >>>>> avoid specialized methods in the future.
>> >>>>>
>> >>>>> Can we postpone the change of TableValidators? I don't think that
>> every
>> >>>>> factory needs a schema validator. Ideally, the factory should just
>> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
>> >>>>> validation logic as mentioned in the validation part of FLIP-54[1].
>> But
>> >>>>> currently our config options are not rich enough to have a unified
>> >>>>> validation. Additionally, the factory should return some properties
>> >>>>> such
>> >>>>> as "supports event-time" for the schema validation outside of the
>> >>>>> factory itself.
>> >>>>>
>> >>>>> Regards,
>> >>>>> Timo
>> >>>>>
>> >>>>> [1]
>> >>>>>
>> >>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On 16.01.20 00:51, Bowen Li wrote:
>> >>>>>> Hi Jingsong,
>> >>>>>>
>> >>>>>> The 1st and 2nd pain points you described are very valid, as I'm
>> more
>> >>>>>> familiar with them. I agree these are shortcomings of the current
>> >>>>> Flink SQL
>> >>>>>> design.
>> >>>>>>
>> >>>>>> A couple comments on your 1st proposal:
>> >>>>>>
>> >>>>>> 1. is it better to have explicit APIs like
>> >>>>> "createBatchTableSource(...)"
>> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory (would
>> be
>> >>>>>> similar for sink factory) to let planner handle which mode
>> (streaming
>> >>>>> vs
>> >>>>>> batch) of source should be instantiated? That way we don't need to
>> >>>>> always
>> >>>>>> let connector developers handling an if-else on isStreamingMode.
>> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
>> class.
>> >>>>> The
>> >>>>>> path, table, and config are fairly independent of each other. So
>> why
>> >>>>> not
>> >>>>>> pass the config in as 3rd parameter as `createXxxTableSource(path,
>> >>>>>> catalogTable, tableConfig)?
>> >>>>>>
>> >>>>>>
>> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <
>> [hidden email]>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi dev,
>> >>>>>>>
>> >>>>>>> I'd like to kick off a discussion on the improvement of
>> >>>>> TableSourceFactory
>> >>>>>>> and TableSinkFactory.
>> >>>>>>>
>> >>>>>>> Motivation:
>> >>>>>>> Now the main needs and problems are:
>> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really
>> >>>>> need to be
>> >>>>>>> controlled by the user's table configuration. In the era of
>> catalog,
>> >>>>> we
>> >>>>>>> can't put these config in connector properties, which is too
>> >>>>> inconvenient.
>> >>>>>>> 2.Connector can't know if this is batch or stream execution mode.
>> >>>>> But the
>> >>>>>>> sink implementation of batch and stream is totally different. I
>> >>>>> understand
>> >>>>>>> there is an update mode property now, but it splits the batch and
>> >>>>> stream in
>> >>>>>>> the catalog dimension. In fact, this information can be obtained
>> >>>>> through
>> >>>>>>> the current TableEnvironment.
>> >>>>>>> 3.No interface to call validation. Now our validation is more util
>> >>>>> classes.
>> >>>>>>> It depends on whether or not the connector calls. Now we have some
>> >>>>> new
>> >>>>>>> validations to add, such as [2], which is really confuse uses,
>> even
>> >>>>>>> developers. Another problem is that our SQL update (DDL) does not
>> >>>>> have
>> >>>>>>> validation [3]. It is better to report an error when executing
>> DDL,
>> >>>>>>> otherwise it will confuse the user.
>> >>>>>>>
>> >>>>>>> Proposed change draft for 1 and 2:
>> >>>>>>>
>> >>>>>>> interface CatalogTableContext {
>> >>>>>>>      ObjectPath getTablePath();
>> >>>>>>>      CatalogTable getTable();
>> >>>>>>>      ReadableConfig getTableConfig();
>> >>>>>>>      boolean isStreamingMode();
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>> public interface TableSourceFactory<T> extends TableFactory {
>> >>>>>>>
>> >>>>>>>      default TableSource<T> createTableSource(CatalogTableContext
>> >>>>> context) {
>> >>>>>>>         return createTableSource(context.getTablePath(),
>> >>>>> context.getTable());
>> >>>>>>>      }
>> >>>>>>>
>> >>>>>>>      ......
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>> Proposed change draft for 3:
>> >>>>>>>
>> >>>>>>> public interface TableFactory {
>> >>>>>>>
>> >>>>>>>      TableValidators validators();
>> >>>>>>>
>> >>>>>>>      interface TableValidators {
>> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
>> >>>>>>>         TableSchemaValidator schemaValidator();
>> >>>>>>>         FormatDescriptorValidator formatValidator();
>> >>>>>>>      }
>> >>>>>>> }
>> >>>>>>>
>> >>>>>>> What do you think?
>> >>>>>>>
>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
>> >>>>>>> [2]
>> >>>>>>>
>> >>>>>>>
>> >>>>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Jingsong Lee
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>> --
>> >>>> Best, Jingsong Lee
>> >>>>
>> >>>
>> >>>
>> >>> --
>> >>> Best, Jingsong Lee
>> >>>
>> >>
>> >>
>> >> --
>> >> Best, Jingsong Lee
>> >>
>> >
>> >
>>
>>
>
> --
> Best, Jingsong Lee
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Rui Li
+1, thanks for the efforts.

On Wed, Feb 5, 2020 at 4:00 PM Jingsong Li <[hidden email]> wrote:

> Hi all,
>
> As Jark suggested in VOTE thread.
> JIRA created: https://issues.apache.org/jira/browse/FLINK-15912
>
> Best,
> Jingsong Lee
>
> On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <[hidden email]>
> wrote:
>
> > Hi Timo,
> >
> > G ood catch!
> >
> > I really love the idea 2, a full Flink config looks very good to me.
> >
> > Try to understand your first one, actually we don't have
> `TableIdentifier`
> > class now. But TableFactory already indicate table. So I am OK.
> >
> > New Context should be:
> >
> >    /**
> >     * Context of table source creation. Contains table information and
> environment information.
> >     */
> >    interface Context {
> >       /**
> >        * @return full identifier of the given {@link CatalogTable}.
> >        */
> >       ObjectIdentifier getObjectIdentifier();
> >       /**
> >        * @return table {@link CatalogTable} instance.
> >        */
> >       CatalogTable getTable();
> >       /**
> >        * @return readable config of this table environment.
> >        */
> >       ReadableConfig getConfiguration();
> >    }
> >
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <[hidden email]> wrote:
> >
> >> Hi Jingsong,
> >>
> >> some last minute changes from my side:
> >>
> >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
> >> obvious. Otherwise people expect a `TableIdentifier` class being
> >> returned here.
> >>
> >> 2. rename `getTableConfig` to `getConfiguration()` in the future this
> >> will not only be a "table" config but might give access to the full
> >> Flink config
> >>
> >> Thanks,
> >> Timo
> >>
> >>
> >> On 04.02.20 06:27, Jingsong Li wrote:
> >> > So the interface will be:
> >> >
> >> > public interface TableSourceFactory<T> extends TableFactory {
> >> >     ......
> >> >
> >> >     /**
> >> >      * Creates and configures a {@link TableSource} based on the given
> >> > {@link Context}.
> >> >      *
> >> >      * @param context context of this table source.
> >> >      * @return the configured table source.
> >> >      */
> >> >     default TableSource<T> createTableSource(Context context) {
> >> >        ObjectIdentifier tableIdentifier =
> context.getTableIdentifier();
> >> >        return createTableSource(
> >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> >> > tableIdentifier.getObjectName()),
> >> >              context.getTable());
> >> >     }
> >> >     /**
> >> >      * Context of table source creation. Contains table information
> and
> >> > environment information.
> >> >      */
> >> >     interface Context {
> >> >        /**
> >> >         * @return full identifier of the given {@link CatalogTable}.
> >> >         */
> >> >        ObjectIdentifier getTableIdentifier();
> >> >        /**
> >> >         * @return table {@link CatalogTable} instance.
> >> >         */
> >> >        CatalogTable getTable();
> >> >        /**
> >> >         * @return readable config of this table environment.
> >> >         */
> >> >        ReadableConfig getTableConfig();
> >> >     }
> >> > }
> >> >
> >> > public interface TableSinkFactory<T> extends TableFactory {
> >> >     ......
> >> >     /**
> >> >      * Creates and configures a {@link TableSink} based on the given
> >> > {@link Context}.
> >> >      *
> >> >      * @param context context of this table sink.
> >> >      * @return the configured table sink.
> >> >      */
> >> >     default TableSink<T> createTableSink(Context context) {
> >> >        ObjectIdentifier tableIdentifier =
> context.getTableIdentifier();
> >> >        return createTableSink(
> >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> >> > tableIdentifier.getObjectName()),
> >> >              context.getTable());
> >> >     }
> >> >     /**
> >> >      * Context of table sink creation. Contains table information and
> >> > environment information.
> >> >      */
> >> >     interface Context {
> >> >        /**
> >> >         * @return full identifier of the given {@link CatalogTable}.
> >> >         */
> >> >        ObjectIdentifier getTableIdentifier();
> >> >        /**
> >> >         * @return table {@link CatalogTable} instance.
> >> >         */
> >> >        CatalogTable getTable();
> >> >        /**
> >> >         * @return readable config of this table environment.
> >> >         */
> >> >        ReadableConfig getTableConfig();
> >> >     }
> >> > }
> >> >
> >> >
> >> > Best,
> >> > Jingsong Lee
> >> >
> >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <[hidden email]>
> >> wrote:
> >> >
> >> >> Hi all,
> >> >>
> >> >> After rethinking and discussion with Kurt, I'd like to remove
> >> "isBounded".
> >> >> We can delay this is bounded message to TableSink.
> >> >> With TableSink refactor, we need consider "consumeDataStream"
> >> >> and "consumeBoundedStream".
> >> >>
> >> >> Best,
> >> >> Jingsong Lee
> >> >>
> >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <[hidden email]>
> >> wrote:
> >> >>
> >> >>> Hi Jark,
> >> >>>
> >> >>> Thanks involving, yes, it's hard to understand to add isBounded on
> the
> >> >>> source.
> >> >>> I recommend adding only to sink at present, because sink has
> upstream.
> >> >>> Its upstream is either bounded or unbounded.
> >> >>>
> >> >>> Hi all,
> >> >>>
> >> >>> Let me summarize with your suggestions.
> >> >>>
> >> >>> public interface TableSourceFactory<T> extends TableFactory {
> >> >>>
> >> >>>     ......
> >> >>>
> >> >>>
> >> >>>     /**
> >> >>>      * Creates and configures a {@link TableSource} based on the
> >> given {@link Context}.
> >> >>>      *
> >> >>>      * @param context context of this table source.
> >> >>>      * @return the configured table source.
> >> >>>      */
> >> >>>     default TableSource<T> createTableSource(Context context) {
> >> >>>        ObjectIdentifier tableIdentifier =
> >> context.getTableIdentifier();
> >> >>>        return createTableSource(
> >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> >> tableIdentifier.getObjectName()),
> >> >>>              context.getTable());
> >> >>>     }
> >> >>>
> >> >>>     /**
> >> >>>      * Context of table source creation. Contains table information
> >> and environment information.
> >> >>>      */
> >> >>>     interface Context {
> >> >>>
> >> >>>        /**
> >> >>>         * @return full identifier of the given {@link CatalogTable}.
> >> >>>         */
> >> >>>        ObjectIdentifier getTableIdentifier();
> >> >>>
> >> >>>        /**
> >> >>>         * @return table {@link CatalogTable} instance.
> >> >>>         */
> >> >>>        CatalogTable getTable();
> >> >>>
> >> >>>        /**
> >> >>>         * @return readable config of this table environment.
> >> >>>         */
> >> >>>        ReadableConfig getTableConfig();
> >> >>>     }
> >> >>> }
> >> >>>
> >> >>> public interface TableSinkFactory<T> extends TableFactory {
> >> >>>
> >> >>>     ......
> >> >>>
> >> >>>     /**
> >> >>>      * Creates and configures a {@link TableSink} based on the given
> >> {@link Context}.
> >> >>>      *
> >> >>>      * @param context context of this table sink.
> >> >>>      * @return the configured table sink.
> >> >>>      */
> >> >>>     default TableSink<T> createTableSink(Context context) {
> >> >>>        ObjectIdentifier tableIdentifier =
> >> context.getTableIdentifier();
> >> >>>        return createTableSink(
> >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> >> tableIdentifier.getObjectName()),
> >> >>>              context.getTable());
> >> >>>     }
> >> >>>
> >> >>>     /**
> >> >>>      * Context of table sink creation. Contains table information
> and
> >> environment information.
> >> >>>      */
> >> >>>     interface Context {
> >> >>>
> >> >>>        /**
> >> >>>         * @return full identifier of the given {@link CatalogTable}.
> >> >>>         */
> >> >>>        ObjectIdentifier getTableIdentifier();
> >> >>>
> >> >>>        /**
> >> >>>         * @return table {@link CatalogTable} instance.
> >> >>>         */
> >> >>>        CatalogTable getTable();
> >> >>>
> >> >>>        /**
> >> >>>         * @return readable config of this table environment.
> >> >>>         */
> >> >>>        ReadableConfig getTableConfig();
> >> >>>
> >> >>>        /**
> >> >>>         * @return Input whether or not it is bounded.
> >> >>>         */
> >> >>>        boolean isBounded();
> >> >>>     }
> >> >>> }
> >> >>>
> >> >>> If there is no objection, I will start a vote thread. (if
> necessary, I
> >> >>> can also edit a FLIP).
> >> >>>
> >> >>> Best,
> >> >>> Jingsong Lee
> >> >>>
> >> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <[hidden email]
> >
> >> >>> wrote:
> >> >>>
> >> >>>> Thanks Bowen and Timo for involving.
> >> >>>>
> >> >>>> Hi Bowen,
> >> >>>>
> >> >>>>> 1. is it better to have explicit APIs like
> >> >>>> "createBatchTableSource(...)"
> >> >>>> I think it is better to keep one method, since in [1], we have
> >> reached
> >> >>>> one in DataStream layer to maintain a single API in "env.source". I
> >> think
> >> >>>> it is good to not split batch and stream, And our
> >> TableSource/TableSink are
> >> >>>> the same class for both batch and streaming too.
> >> >>>>
> >> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> class.
> >> >>>> As Timo said, We may have more parameters to add in the future,
> take
> >> a
> >> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by
> >> little.
> >> >>>>
> >> >>>> Hi Timo,
> >> >>>>
> >> >>>> Your suggestion about Context looks good to me.
> >> >>>> "TablePath" used in Hive for updating the catalog information of
> this
> >> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
> >> >>>>
> >> >>>>> Can we postpone the change of TableValidators?
> >> >>>> Yes, ConfigOption validation looks good to me. It seems that you
> have
> >> >>>> been thinking about this for a long time. It's very good. Looking
> >> forward
> >> >>>> to the promotion of FLIP-54.
> >> >>>>
> >> >>>> [1]
> >> >>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
> >> >>>>
> >> >>>> Best,
> >> >>>> Jingsong Lee
> >> >>>>
> >> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]>
> >> wrote:
> >> >>>>
> >> >>>>> Hi Jingsong,
> >> >>>>>
> >> >>>>> +1 for adding a context in the source and sink factories. A
> context
> >> >>>>> class also allows for future modifications without touching the
> >> >>>>> TableFactory interface again.
> >> >>>>>
> >> >>>>> How about:
> >> >>>>>
> >> >>>>> interface TableSourceFactory {
> >> >>>>>       interface Context {
> >> >>>>>          // ...
> >> >>>>>       }
> >> >>>>> }
> >> >>>>>
> >> >>>>> Because I find the name `CatalogTableContext` confusing and we can
> >> >>>>> bound
> >> >>>>> the interface to the factory class itself as an inner interface.
> >> >>>>>
> >> >>>>> Readable access to configuration sounds also right to me. Can we
> >> remove
> >> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
> >> >>>>> factory should know the path. And if so, it should be an
> >> >>>>> `ObjectIdentifier` instead to also know about the catalog we are
> >> using.
> >> >>>>>
> >> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because
> >> we
> >> >>>>> would like to use terminology around boundedness rather than
> >> >>>>> streaming/batch.
> >> >>>>>
> >> >>>>> @Bowen: We are in the process of unifying the APIs and thus
> >> explicitly
> >> >>>>> avoid specialized methods in the future.
> >> >>>>>
> >> >>>>> Can we postpone the change of TableValidators? I don't think that
> >> every
> >> >>>>> factory needs a schema validator. Ideally, the factory should just
> >> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
> >> >>>>> validation logic as mentioned in the validation part of
> FLIP-54[1].
> >> But
> >> >>>>> currently our config options are not rich enough to have a unified
> >> >>>>> validation. Additionally, the factory should return some
> properties
> >> >>>>> such
> >> >>>>> as "supports event-time" for the schema validation outside of the
> >> >>>>> factory itself.
> >> >>>>>
> >> >>>>> Regards,
> >> >>>>> Timo
> >> >>>>>
> >> >>>>> [1]
> >> >>>>>
> >> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> On 16.01.20 00:51, Bowen Li wrote:
> >> >>>>>> Hi Jingsong,
> >> >>>>>>
> >> >>>>>> The 1st and 2nd pain points you described are very valid, as I'm
> >> more
> >> >>>>>> familiar with them. I agree these are shortcomings of the current
> >> >>>>> Flink SQL
> >> >>>>>> design.
> >> >>>>>>
> >> >>>>>> A couple comments on your 1st proposal:
> >> >>>>>>
> >> >>>>>> 1. is it better to have explicit APIs like
> >> >>>>> "createBatchTableSource(...)"
> >> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory
> (would
> >> be
> >> >>>>>> similar for sink factory) to let planner handle which mode
> >> (streaming
> >> >>>>> vs
> >> >>>>>> batch) of source should be instantiated? That way we don't need
> to
> >> >>>>> always
> >> >>>>>> let connector developers handling an if-else on isStreamingMode.
> >> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> >> class.
> >> >>>>> The
> >> >>>>>> path, table, and config are fairly independent of each other. So
> >> why
> >> >>>>> not
> >> >>>>>> pass the config in as 3rd parameter as
> `createXxxTableSource(path,
> >> >>>>>> catalogTable, tableConfig)?
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <
> >> [hidden email]>
> >> >>>>> wrote:
> >> >>>>>>
> >> >>>>>>> Hi dev,
> >> >>>>>>>
> >> >>>>>>> I'd like to kick off a discussion on the improvement of
> >> >>>>> TableSourceFactory
> >> >>>>>>> and TableSinkFactory.
> >> >>>>>>>
> >> >>>>>>> Motivation:
> >> >>>>>>> Now the main needs and problems are:
> >> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really
> >> >>>>> need to be
> >> >>>>>>> controlled by the user's table configuration. In the era of
> >> catalog,
> >> >>>>> we
> >> >>>>>>> can't put these config in connector properties, which is too
> >> >>>>> inconvenient.
> >> >>>>>>> 2.Connector can't know if this is batch or stream execution
> mode.
> >> >>>>> But the
> >> >>>>>>> sink implementation of batch and stream is totally different. I
> >> >>>>> understand
> >> >>>>>>> there is an update mode property now, but it splits the batch
> and
> >> >>>>> stream in
> >> >>>>>>> the catalog dimension. In fact, this information can be obtained
> >> >>>>> through
> >> >>>>>>> the current TableEnvironment.
> >> >>>>>>> 3.No interface to call validation. Now our validation is more
> util
> >> >>>>> classes.
> >> >>>>>>> It depends on whether or not the connector calls. Now we have
> some
> >> >>>>> new
> >> >>>>>>> validations to add, such as [2], which is really confuse uses,
> >> even
> >> >>>>>>> developers. Another problem is that our SQL update (DDL) does
> not
> >> >>>>> have
> >> >>>>>>> validation [3]. It is better to report an error when executing
> >> DDL,
> >> >>>>>>> otherwise it will confuse the user.
> >> >>>>>>>
> >> >>>>>>> Proposed change draft for 1 and 2:
> >> >>>>>>>
> >> >>>>>>> interface CatalogTableContext {
> >> >>>>>>>      ObjectPath getTablePath();
> >> >>>>>>>      CatalogTable getTable();
> >> >>>>>>>      ReadableConfig getTableConfig();
> >> >>>>>>>      boolean isStreamingMode();
> >> >>>>>>> }
> >> >>>>>>>
> >> >>>>>>> public interface TableSourceFactory<T> extends TableFactory {
> >> >>>>>>>
> >> >>>>>>>      default TableSource<T>
> createTableSource(CatalogTableContext
> >> >>>>> context) {
> >> >>>>>>>         return createTableSource(context.getTablePath(),
> >> >>>>> context.getTable());
> >> >>>>>>>      }
> >> >>>>>>>
> >> >>>>>>>      ......
> >> >>>>>>> }
> >> >>>>>>>
> >> >>>>>>> Proposed change draft for 3:
> >> >>>>>>>
> >> >>>>>>> public interface TableFactory {
> >> >>>>>>>
> >> >>>>>>>      TableValidators validators();
> >> >>>>>>>
> >> >>>>>>>      interface TableValidators {
> >> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
> >> >>>>>>>         TableSchemaValidator schemaValidator();
> >> >>>>>>>         FormatDescriptorValidator formatValidator();
> >> >>>>>>>      }
> >> >>>>>>> }
> >> >>>>>>>
> >> >>>>>>> What do you think?
> >> >>>>>>>
> >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
> >> >>>>>>> [2]
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
> >> >>>>>>>
> >> >>>>>>> Best,
> >> >>>>>>> Jingsong Lee
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>> --
> >> >>>> Best, Jingsong Lee
> >> >>>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> Best, Jingsong Lee
> >> >>>
> >> >>
> >> >>
> >> >> --
> >> >> Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >>
> >>
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best, Jingsong Lee
>


--
Best regards!
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jark Wu-2
Hi all,

I would like to add an additional method `getClassloader()` into the
context.
Because a TableFactory may require this classloader to find another
TableFactory,
e.g. we will find format factory in KafkaTableSourceSinkFactory.
See FLINK-15992.

I don't think we need a new VOTE for this, I just want to make this
discussion more publicly.
What do you think?

Best,
Jark

On Wed, 5 Feb 2020 at 16:05, Rui Li <[hidden email]> wrote:

> +1, thanks for the efforts.
>
> On Wed, Feb 5, 2020 at 4:00 PM Jingsong Li <[hidden email]> wrote:
>
> > Hi all,
> >
> > As Jark suggested in VOTE thread.
> > JIRA created: https://issues.apache.org/jira/browse/FLINK-15912
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <[hidden email]>
> > wrote:
> >
> > > Hi Timo,
> > >
> > > G ood catch!
> > >
> > > I really love the idea 2, a full Flink config looks very good to me.
> > >
> > > Try to understand your first one, actually we don't have
> > `TableIdentifier`
> > > class now. But TableFactory already indicate table. So I am OK.
> > >
> > > New Context should be:
> > >
> > >    /**
> > >     * Context of table source creation. Contains table information and
> > environment information.
> > >     */
> > >    interface Context {
> > >       /**
> > >        * @return full identifier of the given {@link CatalogTable}.
> > >        */
> > >       ObjectIdentifier getObjectIdentifier();
> > >       /**
> > >        * @return table {@link CatalogTable} instance.
> > >        */
> > >       CatalogTable getTable();
> > >       /**
> > >        * @return readable config of this table environment.
> > >        */
> > >       ReadableConfig getConfiguration();
> > >    }
> > >
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <[hidden email]>
> wrote:
> > >
> > >> Hi Jingsong,
> > >>
> > >> some last minute changes from my side:
> > >>
> > >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the
> API
> > >> obvious. Otherwise people expect a `TableIdentifier` class being
> > >> returned here.
> > >>
> > >> 2. rename `getTableConfig` to `getConfiguration()` in the future this
> > >> will not only be a "table" config but might give access to the full
> > >> Flink config
> > >>
> > >> Thanks,
> > >> Timo
> > >>
> > >>
> > >> On 04.02.20 06:27, Jingsong Li wrote:
> > >> > So the interface will be:
> > >> >
> > >> > public interface TableSourceFactory<T> extends TableFactory {
> > >> >     ......
> > >> >
> > >> >     /**
> > >> >      * Creates and configures a {@link TableSource} based on the
> given
> > >> > {@link Context}.
> > >> >      *
> > >> >      * @param context context of this table source.
> > >> >      * @return the configured table source.
> > >> >      */
> > >> >     default TableSource<T> createTableSource(Context context) {
> > >> >        ObjectIdentifier tableIdentifier =
> > context.getTableIdentifier();
> > >> >        return createTableSource(
> > >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > >> > tableIdentifier.getObjectName()),
> > >> >              context.getTable());
> > >> >     }
> > >> >     /**
> > >> >      * Context of table source creation. Contains table information
> > and
> > >> > environment information.
> > >> >      */
> > >> >     interface Context {
> > >> >        /**
> > >> >         * @return full identifier of the given {@link CatalogTable}.
> > >> >         */
> > >> >        ObjectIdentifier getTableIdentifier();
> > >> >        /**
> > >> >         * @return table {@link CatalogTable} instance.
> > >> >         */
> > >> >        CatalogTable getTable();
> > >> >        /**
> > >> >         * @return readable config of this table environment.
> > >> >         */
> > >> >        ReadableConfig getTableConfig();
> > >> >     }
> > >> > }
> > >> >
> > >> > public interface TableSinkFactory<T> extends TableFactory {
> > >> >     ......
> > >> >     /**
> > >> >      * Creates and configures a {@link TableSink} based on the given
> > >> > {@link Context}.
> > >> >      *
> > >> >      * @param context context of this table sink.
> > >> >      * @return the configured table sink.
> > >> >      */
> > >> >     default TableSink<T> createTableSink(Context context) {
> > >> >        ObjectIdentifier tableIdentifier =
> > context.getTableIdentifier();
> > >> >        return createTableSink(
> > >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > >> > tableIdentifier.getObjectName()),
> > >> >              context.getTable());
> > >> >     }
> > >> >     /**
> > >> >      * Context of table sink creation. Contains table information
> and
> > >> > environment information.
> > >> >      */
> > >> >     interface Context {
> > >> >        /**
> > >> >         * @return full identifier of the given {@link CatalogTable}.
> > >> >         */
> > >> >        ObjectIdentifier getTableIdentifier();
> > >> >        /**
> > >> >         * @return table {@link CatalogTable} instance.
> > >> >         */
> > >> >        CatalogTable getTable();
> > >> >        /**
> > >> >         * @return readable config of this table environment.
> > >> >         */
> > >> >        ReadableConfig getTableConfig();
> > >> >     }
> > >> > }
> > >> >
> > >> >
> > >> > Best,
> > >> > Jingsong Lee
> > >> >
> > >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <[hidden email]>
> > >> wrote:
> > >> >
> > >> >> Hi all,
> > >> >>
> > >> >> After rethinking and discussion with Kurt, I'd like to remove
> > >> "isBounded".
> > >> >> We can delay this is bounded message to TableSink.
> > >> >> With TableSink refactor, we need consider "consumeDataStream"
> > >> >> and "consumeBoundedStream".
> > >> >>
> > >> >> Best,
> > >> >> Jingsong Lee
> > >> >>
> > >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <[hidden email]
> >
> > >> wrote:
> > >> >>
> > >> >>> Hi Jark,
> > >> >>>
> > >> >>> Thanks involving, yes, it's hard to understand to add isBounded on
> > the
> > >> >>> source.
> > >> >>> I recommend adding only to sink at present, because sink has
> > upstream.
> > >> >>> Its upstream is either bounded or unbounded.
> > >> >>>
> > >> >>> Hi all,
> > >> >>>
> > >> >>> Let me summarize with your suggestions.
> > >> >>>
> > >> >>> public interface TableSourceFactory<T> extends TableFactory {
> > >> >>>
> > >> >>>     ......
> > >> >>>
> > >> >>>
> > >> >>>     /**
> > >> >>>      * Creates and configures a {@link TableSource} based on the
> > >> given {@link Context}.
> > >> >>>      *
> > >> >>>      * @param context context of this table source.
> > >> >>>      * @return the configured table source.
> > >> >>>      */
> > >> >>>     default TableSource<T> createTableSource(Context context) {
> > >> >>>        ObjectIdentifier tableIdentifier =
> > >> context.getTableIdentifier();
> > >> >>>        return createTableSource(
> > >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> > >> tableIdentifier.getObjectName()),
> > >> >>>              context.getTable());
> > >> >>>     }
> > >> >>>
> > >> >>>     /**
> > >> >>>      * Context of table source creation. Contains table
> information
> > >> and environment information.
> > >> >>>      */
> > >> >>>     interface Context {
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return full identifier of the given {@link
> CatalogTable}.
> > >> >>>         */
> > >> >>>        ObjectIdentifier getTableIdentifier();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return table {@link CatalogTable} instance.
> > >> >>>         */
> > >> >>>        CatalogTable getTable();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return readable config of this table environment.
> > >> >>>         */
> > >> >>>        ReadableConfig getTableConfig();
> > >> >>>     }
> > >> >>> }
> > >> >>>
> > >> >>> public interface TableSinkFactory<T> extends TableFactory {
> > >> >>>
> > >> >>>     ......
> > >> >>>
> > >> >>>     /**
> > >> >>>      * Creates and configures a {@link TableSink} based on the
> given
> > >> {@link Context}.
> > >> >>>      *
> > >> >>>      * @param context context of this table sink.
> > >> >>>      * @return the configured table sink.
> > >> >>>      */
> > >> >>>     default TableSink<T> createTableSink(Context context) {
> > >> >>>        ObjectIdentifier tableIdentifier =
> > >> context.getTableIdentifier();
> > >> >>>        return createTableSink(
> > >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> > >> tableIdentifier.getObjectName()),
> > >> >>>              context.getTable());
> > >> >>>     }
> > >> >>>
> > >> >>>     /**
> > >> >>>      * Context of table sink creation. Contains table information
> > and
> > >> environment information.
> > >> >>>      */
> > >> >>>     interface Context {
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return full identifier of the given {@link
> CatalogTable}.
> > >> >>>         */
> > >> >>>        ObjectIdentifier getTableIdentifier();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return table {@link CatalogTable} instance.
> > >> >>>         */
> > >> >>>        CatalogTable getTable();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return readable config of this table environment.
> > >> >>>         */
> > >> >>>        ReadableConfig getTableConfig();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return Input whether or not it is bounded.
> > >> >>>         */
> > >> >>>        boolean isBounded();
> > >> >>>     }
> > >> >>> }
> > >> >>>
> > >> >>> If there is no objection, I will start a vote thread. (if
> > necessary, I
> > >> >>> can also edit a FLIP).
> > >> >>>
> > >> >>> Best,
> > >> >>> Jingsong Lee
> > >> >>>
> > >> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <
> [hidden email]
> > >
> > >> >>> wrote:
> > >> >>>
> > >> >>>> Thanks Bowen and Timo for involving.
> > >> >>>>
> > >> >>>> Hi Bowen,
> > >> >>>>
> > >> >>>>> 1. is it better to have explicit APIs like
> > >> >>>> "createBatchTableSource(...)"
> > >> >>>> I think it is better to keep one method, since in [1], we have
> > >> reached
> > >> >>>> one in DataStream layer to maintain a single API in
> "env.source". I
> > >> think
> > >> >>>> it is good to not split batch and stream, And our
> > >> TableSource/TableSink are
> > >> >>>> the same class for both batch and streaming too.
> > >> >>>>
> > >> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> > class.
> > >> >>>> As Timo said, We may have more parameters to add in the future,
> > take
> > >> a
> > >> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little
> by
> > >> little.
> > >> >>>>
> > >> >>>> Hi Timo,
> > >> >>>>
> > >> >>>> Your suggestion about Context looks good to me.
> > >> >>>> "TablePath" used in Hive for updating the catalog information of
> > this
> > >> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
> > >> >>>>
> > >> >>>>> Can we postpone the change of TableValidators?
> > >> >>>> Yes, ConfigOption validation looks good to me. It seems that you
> > have
> > >> >>>> been thinking about this for a long time. It's very good. Looking
> > >> forward
> > >> >>>> to the promotion of FLIP-54.
> > >> >>>>
> > >> >>>> [1]
> > >> >>>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
> > >> >>>>
> > >> >>>> Best,
> > >> >>>> Jingsong Lee
> > >> >>>>
> > >> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <[hidden email]
> >
> > >> wrote:
> > >> >>>>
> > >> >>>>> Hi Jingsong,
> > >> >>>>>
> > >> >>>>> +1 for adding a context in the source and sink factories. A
> > context
> > >> >>>>> class also allows for future modifications without touching the
> > >> >>>>> TableFactory interface again.
> > >> >>>>>
> > >> >>>>> How about:
> > >> >>>>>
> > >> >>>>> interface TableSourceFactory {
> > >> >>>>>       interface Context {
> > >> >>>>>          // ...
> > >> >>>>>       }
> > >> >>>>> }
> > >> >>>>>
> > >> >>>>> Because I find the name `CatalogTableContext` confusing and we
> can
> > >> >>>>> bound
> > >> >>>>> the interface to the factory class itself as an inner interface.
> > >> >>>>>
> > >> >>>>> Readable access to configuration sounds also right to me. Can we
> > >> remove
> > >> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason
> why a
> > >> >>>>> factory should know the path. And if so, it should be an
> > >> >>>>> `ObjectIdentifier` instead to also know about the catalog we are
> > >> using.
> > >> >>>>>
> > >> >>>>> The `isStreamingMode()` should be renamed to `isBounded()`
> because
> > >> we
> > >> >>>>> would like to use terminology around boundedness rather than
> > >> >>>>> streaming/batch.
> > >> >>>>>
> > >> >>>>> @Bowen: We are in the process of unifying the APIs and thus
> > >> explicitly
> > >> >>>>> avoid specialized methods in the future.
> > >> >>>>>
> > >> >>>>> Can we postpone the change of TableValidators? I don't think
> that
> > >> every
> > >> >>>>> factory needs a schema validator. Ideally, the factory should
> just
> > >> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains
> the
> > >> >>>>> validation logic as mentioned in the validation part of
> > FLIP-54[1].
> > >> But
> > >> >>>>> currently our config options are not rich enough to have a
> unified
> > >> >>>>> validation. Additionally, the factory should return some
> > properties
> > >> >>>>> such
> > >> >>>>> as "supports event-time" for the schema validation outside of
> the
> > >> >>>>> factory itself.
> > >> >>>>>
> > >> >>>>> Regards,
> > >> >>>>> Timo
> > >> >>>>>
> > >> >>>>> [1]
> > >> >>>>>
> > >> >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> > >> >>>>>
> > >> >>>>>
> > >> >>>>>
> > >> >>>>> On 16.01.20 00:51, Bowen Li wrote:
> > >> >>>>>> Hi Jingsong,
> > >> >>>>>>
> > >> >>>>>> The 1st and 2nd pain points you described are very valid, as
> I'm
> > >> more
> > >> >>>>>> familiar with them. I agree these are shortcomings of the
> current
> > >> >>>>> Flink SQL
> > >> >>>>>> design.
> > >> >>>>>>
> > >> >>>>>> A couple comments on your 1st proposal:
> > >> >>>>>>
> > >> >>>>>> 1. is it better to have explicit APIs like
> > >> >>>>> "createBatchTableSource(...)"
> > >> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory
> > (would
> > >> be
> > >> >>>>>> similar for sink factory) to let planner handle which mode
> > >> (streaming
> > >> >>>>> vs
> > >> >>>>>> batch) of source should be instantiated? That way we don't need
> > to
> > >> >>>>> always
> > >> >>>>>> let connector developers handling an if-else on
> isStreamingMode.
> > >> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> > >> class.
> > >> >>>>> The
> > >> >>>>>> path, table, and config are fairly independent of each other.
> So
> > >> why
> > >> >>>>> not
> > >> >>>>>> pass the config in as 3rd parameter as
> > `createXxxTableSource(path,
> > >> >>>>>> catalogTable, tableConfig)?
> > >> >>>>>>
> > >> >>>>>>
> > >> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <
> > >> [hidden email]>
> > >> >>>>> wrote:
> > >> >>>>>>
> > >> >>>>>>> Hi dev,
> > >> >>>>>>>
> > >> >>>>>>> I'd like to kick off a discussion on the improvement of
> > >> >>>>> TableSourceFactory
> > >> >>>>>>> and TableSinkFactory.
> > >> >>>>>>>
> > >> >>>>>>> Motivation:
> > >> >>>>>>> Now the main needs and problems are:
> > >> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors
> really
> > >> >>>>> need to be
> > >> >>>>>>> controlled by the user's table configuration. In the era of
> > >> catalog,
> > >> >>>>> we
> > >> >>>>>>> can't put these config in connector properties, which is too
> > >> >>>>> inconvenient.
> > >> >>>>>>> 2.Connector can't know if this is batch or stream execution
> > mode.
> > >> >>>>> But the
> > >> >>>>>>> sink implementation of batch and stream is totally different.
> I
> > >> >>>>> understand
> > >> >>>>>>> there is an update mode property now, but it splits the batch
> > and
> > >> >>>>> stream in
> > >> >>>>>>> the catalog dimension. In fact, this information can be
> obtained
> > >> >>>>> through
> > >> >>>>>>> the current TableEnvironment.
> > >> >>>>>>> 3.No interface to call validation. Now our validation is more
> > util
> > >> >>>>> classes.
> > >> >>>>>>> It depends on whether or not the connector calls. Now we have
> > some
> > >> >>>>> new
> > >> >>>>>>> validations to add, such as [2], which is really confuse uses,
> > >> even
> > >> >>>>>>> developers. Another problem is that our SQL update (DDL) does
> > not
> > >> >>>>> have
> > >> >>>>>>> validation [3]. It is better to report an error when executing
> > >> DDL,
> > >> >>>>>>> otherwise it will confuse the user.
> > >> >>>>>>>
> > >> >>>>>>> Proposed change draft for 1 and 2:
> > >> >>>>>>>
> > >> >>>>>>> interface CatalogTableContext {
> > >> >>>>>>>      ObjectPath getTablePath();
> > >> >>>>>>>      CatalogTable getTable();
> > >> >>>>>>>      ReadableConfig getTableConfig();
> > >> >>>>>>>      boolean isStreamingMode();
> > >> >>>>>>> }
> > >> >>>>>>>
> > >> >>>>>>> public interface TableSourceFactory<T> extends TableFactory {
> > >> >>>>>>>
> > >> >>>>>>>      default TableSource<T>
> > createTableSource(CatalogTableContext
> > >> >>>>> context) {
> > >> >>>>>>>         return createTableSource(context.getTablePath(),
> > >> >>>>> context.getTable());
> > >> >>>>>>>      }
> > >> >>>>>>>
> > >> >>>>>>>      ......
> > >> >>>>>>> }
> > >> >>>>>>>
> > >> >>>>>>> Proposed change draft for 3:
> > >> >>>>>>>
> > >> >>>>>>> public interface TableFactory {
> > >> >>>>>>>
> > >> >>>>>>>      TableValidators validators();
> > >> >>>>>>>
> > >> >>>>>>>      interface TableValidators {
> > >> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
> > >> >>>>>>>         TableSchemaValidator schemaValidator();
> > >> >>>>>>>         FormatDescriptorValidator formatValidator();
> > >> >>>>>>>      }
> > >> >>>>>>> }
> > >> >>>>>>>
> > >> >>>>>>> What do you think?
> > >> >>>>>>>
> > >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
> > >> >>>>>>> [2]
> > >> >>>>>>>
> > >> >>>>>>>
> > >> >>>>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> > >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
> > >> >>>>>>>
> > >> >>>>>>> Best,
> > >> >>>>>>> Jingsong Lee
> > >> >>>>>>>
> > >> >>>>>>
> > >> >>>>>
> > >> >>>>>
> > >> >>>>
> > >> >>>> --
> > >> >>>> Best, Jingsong Lee
> > >> >>>>
> > >> >>>
> > >> >>>
> > >> >>> --
> > >> >>> Best, Jingsong Lee
> > >> >>>
> > >> >>
> > >> >>
> > >> >> --
> > >> >> Best, Jingsong Lee
> > >> >>
> > >> >
> > >> >
> > >>
> > >>
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best regards!
> Rui Li
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve TableFactory

Jingsong Li
Hi Jark,

I think user ClassLoader is clear in SQL-CLI. I agree with you that we can
add ClassLoader to Context.

But how to implement user ClassLoader in TableEnvironment, there is no
ClassLoader in TableEnvironment. (Maybe EnvironmentSettings could contains
user ClassLoader in the future)

So I recommend maybe we can have a whole story design about ClassLoader in
Table.

Guowei and Yingjie is working on supporting Table API connectors in
Plugins, it is ClassLoader thing, I think they can have some inputs.

Best,
Jingsong Lee

On Fri, Feb 21, 2020 at 4:02 PM Jark Wu <[hidden email]> wrote:

> Hi all,
>
> I would like to add an additional method `getClassloader()` into the
> context.
> Because a TableFactory may require this classloader to find another
> TableFactory,
> e.g. we will find format factory in KafkaTableSourceSinkFactory.
> See FLINK-15992.
>
> I don't think we need a new VOTE for this, I just want to make this
> discussion more publicly.
> What do you think?
>
> Best,
> Jark
>
> On Wed, 5 Feb 2020 at 16:05, Rui Li <[hidden email]> wrote:
>
> > +1, thanks for the efforts.
> >
> > On Wed, Feb 5, 2020 at 4:00 PM Jingsong Li <[hidden email]>
> wrote:
> >
> > > Hi all,
> > >
> > > As Jark suggested in VOTE thread.
> > > JIRA created: https://issues.apache.org/jira/browse/FLINK-15912
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <[hidden email]>
> > > wrote:
> > >
> > > > Hi Timo,
> > > >
> > > > G ood catch!
> > > >
> > > > I really love the idea 2, a full Flink config looks very good to me.
> > > >
> > > > Try to understand your first one, actually we don't have
> > > `TableIdentifier`
> > > > class now. But TableFactory already indicate table. So I am OK.
> > > >
> > > > New Context should be:
> > > >
> > > >    /**
> > > >     * Context of table source creation. Contains table information
> and
> > > environment information.
> > > >     */
> > > >    interface Context {
> > > >       /**
> > > >        * @return full identifier of the given {@link CatalogTable}.
> > > >        */
> > > >       ObjectIdentifier getObjectIdentifier();
> > > >       /**
> > > >        * @return table {@link CatalogTable} instance.
> > > >        */
> > > >       CatalogTable getTable();
> > > >       /**
> > > >        * @return readable config of this table environment.
> > > >        */
> > > >       ReadableConfig getConfiguration();
> > > >    }
> > > >
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <[hidden email]>
> > wrote:
> > > >
> > > >> Hi Jingsong,
> > > >>
> > > >> some last minute changes from my side:
> > > >>
> > > >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the
> > API
> > > >> obvious. Otherwise people expect a `TableIdentifier` class being
> > > >> returned here.
> > > >>
> > > >> 2. rename `getTableConfig` to `getConfiguration()` in the future
> this
> > > >> will not only be a "table" config but might give access to the full
> > > >> Flink config
> > > >>
> > > >> Thanks,
> > > >> Timo
> > > >>
> > > >>
> > > >> On 04.02.20 06:27, Jingsong Li wrote:
> > > >> > So the interface will be:
> > > >> >
> > > >> > public interface TableSourceFactory<T> extends TableFactory {
> > > >> >     ......
> > > >> >
> > > >> >     /**
> > > >> >      * Creates and configures a {@link TableSource} based on the
> > given
> > > >> > {@link Context}.
> > > >> >      *
> > > >> >      * @param context context of this table source.
> > > >> >      * @return the configured table source.
> > > >> >      */
> > > >> >     default TableSource<T> createTableSource(Context context) {
> > > >> >        ObjectIdentifier tableIdentifier =
> > > context.getTableIdentifier();
> > > >> >        return createTableSource(
> > > >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > > >> > tableIdentifier.getObjectName()),
> > > >> >              context.getTable());
> > > >> >     }
> > > >> >     /**
> > > >> >      * Context of table source creation. Contains table
> information
> > > and
> > > >> > environment information.
> > > >> >      */
> > > >> >     interface Context {
> > > >> >        /**
> > > >> >         * @return full identifier of the given {@link
> CatalogTable}.
> > > >> >         */
> > > >> >        ObjectIdentifier getTableIdentifier();
> > > >> >        /**
> > > >> >         * @return table {@link CatalogTable} instance.
> > > >> >         */
> > > >> >        CatalogTable getTable();
> > > >> >        /**
> > > >> >         * @return readable config of this table environment.
> > > >> >         */
> > > >> >        ReadableConfig getTableConfig();
> > > >> >     }
> > > >> > }
> > > >> >
> > > >> > public interface TableSinkFactory<T> extends TableFactory {
> > > >> >     ......
> > > >> >     /**
> > > >> >      * Creates and configures a {@link TableSink} based on the
> given
> > > >> > {@link Context}.
> > > >> >      *
> > > >> >      * @param context context of this table sink.
> > > >> >      * @return the configured table sink.
> > > >> >      */
> > > >> >     default TableSink<T> createTableSink(Context context) {
> > > >> >        ObjectIdentifier tableIdentifier =
> > > context.getTableIdentifier();
> > > >> >        return createTableSink(
> > > >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > > >> > tableIdentifier.getObjectName()),
> > > >> >              context.getTable());
> > > >> >     }
> > > >> >     /**
> > > >> >      * Context of table sink creation. Contains table information
> > and
> > > >> > environment information.
> > > >> >      */
> > > >> >     interface Context {
> > > >> >        /**
> > > >> >         * @return full identifier of the given {@link
> CatalogTable}.
> > > >> >         */
> > > >> >        ObjectIdentifier getTableIdentifier();
> > > >> >        /**
> > > >> >         * @return table {@link CatalogTable} instance.
> > > >> >         */
> > > >> >        CatalogTable getTable();
> > > >> >        /**
> > > >> >         * @return readable config of this table environment.
> > > >> >         */
> > > >> >        ReadableConfig getTableConfig();
> > > >> >     }
> > > >> > }
> > > >> >
> > > >> >
> > > >> > Best,
> > > >> > Jingsong Lee
> > > >> >
> > > >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <
> [hidden email]>
> > > >> wrote:
> > > >> >
> > > >> >> Hi all,
> > > >> >>
> > > >> >> After rethinking and discussion with Kurt, I'd like to remove
> > > >> "isBounded".
> > > >> >> We can delay this is bounded message to TableSink.
> > > >> >> With TableSink refactor, we need consider "consumeDataStream"
> > > >> >> and "consumeBoundedStream".
> > > >> >>
> > > >> >> Best,
> > > >> >> Jingsong Lee
> > > >> >>
> > > >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <
> [hidden email]
> > >
> > > >> wrote:
> > > >> >>
> > > >> >>> Hi Jark,
> > > >> >>>
> > > >> >>> Thanks involving, yes, it's hard to understand to add isBounded
> on
> > > the
> > > >> >>> source.
> > > >> >>> I recommend adding only to sink at present, because sink has
> > > upstream.
> > > >> >>> Its upstream is either bounded or unbounded.
> > > >> >>>
> > > >> >>> Hi all,
> > > >> >>>
> > > >> >>> Let me summarize with your suggestions.
> > > >> >>>
> > > >> >>> public interface TableSourceFactory<T> extends TableFactory {
> > > >> >>>
> > > >> >>>     ......
> > > >> >>>
> > > >> >>>
> > > >> >>>     /**
> > > >> >>>      * Creates and configures a {@link TableSource} based on the
> > > >> given {@link Context}.
> > > >> >>>      *
> > > >> >>>      * @param context context of this table source.
> > > >> >>>      * @return the configured table source.
> > > >> >>>      */
> > > >> >>>     default TableSource<T> createTableSource(Context context) {
> > > >> >>>        ObjectIdentifier tableIdentifier =
> > > >> context.getTableIdentifier();
> > > >> >>>        return createTableSource(
> > > >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> > > >> tableIdentifier.getObjectName()),
> > > >> >>>              context.getTable());
> > > >> >>>     }
> > > >> >>>
> > > >> >>>     /**
> > > >> >>>      * Context of table source creation. Contains table
> > information
> > > >> and environment information.
> > > >> >>>      */
> > > >> >>>     interface Context {
> > > >> >>>
> > > >> >>>        /**
> > > >> >>>         * @return full identifier of the given {@link
> > CatalogTable}.
> > > >> >>>         */
> > > >> >>>        ObjectIdentifier getTableIdentifier();
> > > >> >>>
> > > >> >>>        /**
> > > >> >>>         * @return table {@link CatalogTable} instance.
> > > >> >>>         */
> > > >> >>>        CatalogTable getTable();
> > > >> >>>
> > > >> >>>        /**
> > > >> >>>         * @return readable config of this table environment.
> > > >> >>>         */
> > > >> >>>        ReadableConfig getTableConfig();
> > > >> >>>     }
> > > >> >>> }
> > > >> >>>
> > > >> >>> public interface TableSinkFactory<T> extends TableFactory {
> > > >> >>>
> > > >> >>>     ......
> > > >> >>>
> > > >> >>>     /**
> > > >> >>>      * Creates and configures a {@link TableSink} based on the
> > given
> > > >> {@link Context}.
> > > >> >>>      *
> > > >> >>>      * @param context context of this table sink.
> > > >> >>>      * @return the configured table sink.
> > > >> >>>      */
> > > >> >>>     default TableSink<T> createTableSink(Context context) {
> > > >> >>>        ObjectIdentifier tableIdentifier =
> > > >> context.getTableIdentifier();
> > > >> >>>        return createTableSink(
> > > >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> > > >> tableIdentifier.getObjectName()),
> > > >> >>>              context.getTable());
> > > >> >>>     }
> > > >> >>>
> > > >> >>>     /**
> > > >> >>>      * Context of table sink creation. Contains table
> information
> > > and
> > > >> environment information.
> > > >> >>>      */
> > > >> >>>     interface Context {
> > > >> >>>
> > > >> >>>        /**
> > > >> >>>         * @return full identifier of the given {@link
> > CatalogTable}.
> > > >> >>>         */
> > > >> >>>        ObjectIdentifier getTableIdentifier();
> > > >> >>>
> > > >> >>>        /**
> > > >> >>>         * @return table {@link CatalogTable} instance.
> > > >> >>>         */
> > > >> >>>        CatalogTable getTable();
> > > >> >>>
> > > >> >>>        /**
> > > >> >>>         * @return readable config of this table environment.
> > > >> >>>         */
> > > >> >>>        ReadableConfig getTableConfig();
> > > >> >>>
> > > >> >>>        /**
> > > >> >>>         * @return Input whether or not it is bounded.
> > > >> >>>         */
> > > >> >>>        boolean isBounded();
> > > >> >>>     }
> > > >> >>> }
> > > >> >>>
> > > >> >>> If there is no objection, I will start a vote thread. (if
> > > necessary, I
> > > >> >>> can also edit a FLIP).
> > > >> >>>
> > > >> >>> Best,
> > > >> >>> Jingsong Lee
> > > >> >>>
> > > >> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <
> > [hidden email]
> > > >
> > > >> >>> wrote:
> > > >> >>>
> > > >> >>>> Thanks Bowen and Timo for involving.
> > > >> >>>>
> > > >> >>>> Hi Bowen,
> > > >> >>>>
> > > >> >>>>> 1. is it better to have explicit APIs like
> > > >> >>>> "createBatchTableSource(...)"
> > > >> >>>> I think it is better to keep one method, since in [1], we have
> > > >> reached
> > > >> >>>> one in DataStream layer to maintain a single API in
> > "env.source". I
> > > >> think
> > > >> >>>> it is good to not split batch and stream, And our
> > > >> TableSource/TableSink are
> > > >> >>>> the same class for both batch and streaming too.
> > > >> >>>>
> > > >> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> > > class.
> > > >> >>>> As Timo said, We may have more parameters to add in the future,
> > > take
> > > >> a
> > > >> >>>> look to "AbstractRichFunction.RuntimeContext", It's added
> little
> > by
> > > >> little.
> > > >> >>>>
> > > >> >>>> Hi Timo,
> > > >> >>>>
> > > >> >>>> Your suggestion about Context looks good to me.
> > > >> >>>> "TablePath" used in Hive for updating the catalog information
> of
> > > this
> > > >> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
> > > >> >>>>
> > > >> >>>>> Can we postpone the change of TableValidators?
> > > >> >>>> Yes, ConfigOption validation looks good to me. It seems that
> you
> > > have
> > > >> >>>> been thinking about this for a long time. It's very good.
> Looking
> > > >> forward
> > > >> >>>> to the promotion of FLIP-54.
> > > >> >>>>
> > > >> >>>> [1]
> > > >> >>>>
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
> > > >> >>>>
> > > >> >>>> Best,
> > > >> >>>> Jingsong Lee
> > > >> >>>>
> > > >> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <
> [hidden email]
> > >
> > > >> wrote:
> > > >> >>>>
> > > >> >>>>> Hi Jingsong,
> > > >> >>>>>
> > > >> >>>>> +1 for adding a context in the source and sink factories. A
> > > context
> > > >> >>>>> class also allows for future modifications without touching
> the
> > > >> >>>>> TableFactory interface again.
> > > >> >>>>>
> > > >> >>>>> How about:
> > > >> >>>>>
> > > >> >>>>> interface TableSourceFactory {
> > > >> >>>>>       interface Context {
> > > >> >>>>>          // ...
> > > >> >>>>>       }
> > > >> >>>>> }
> > > >> >>>>>
> > > >> >>>>> Because I find the name `CatalogTableContext` confusing and we
> > can
> > > >> >>>>> bound
> > > >> >>>>> the interface to the factory class itself as an inner
> interface.
> > > >> >>>>>
> > > >> >>>>> Readable access to configuration sounds also right to me. Can
> we
> > > >> remove
> > > >> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason
> > why a
> > > >> >>>>> factory should know the path. And if so, it should be an
> > > >> >>>>> `ObjectIdentifier` instead to also know about the catalog we
> are
> > > >> using.
> > > >> >>>>>
> > > >> >>>>> The `isStreamingMode()` should be renamed to `isBounded()`
> > because
> > > >> we
> > > >> >>>>> would like to use terminology around boundedness rather than
> > > >> >>>>> streaming/batch.
> > > >> >>>>>
> > > >> >>>>> @Bowen: We are in the process of unifying the APIs and thus
> > > >> explicitly
> > > >> >>>>> avoid specialized methods in the future.
> > > >> >>>>>
> > > >> >>>>> Can we postpone the change of TableValidators? I don't think
> > that
> > > >> every
> > > >> >>>>> factory needs a schema validator. Ideally, the factory should
> > just
> > > >> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains
> > the
> > > >> >>>>> validation logic as mentioned in the validation part of
> > > FLIP-54[1].
> > > >> But
> > > >> >>>>> currently our config options are not rich enough to have a
> > unified
> > > >> >>>>> validation. Additionally, the factory should return some
> > > properties
> > > >> >>>>> such
> > > >> >>>>> as "supports event-time" for the schema validation outside of
> > the
> > > >> >>>>> factory itself.
> > > >> >>>>>
> > > >> >>>>> Regards,
> > > >> >>>>> Timo
> > > >> >>>>>
> > > >> >>>>> [1]
> > > >> >>>>>
> > > >> >>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> > > >> >>>>>
> > > >> >>>>>
> > > >> >>>>>
> > > >> >>>>> On 16.01.20 00:51, Bowen Li wrote:
> > > >> >>>>>> Hi Jingsong,
> > > >> >>>>>>
> > > >> >>>>>> The 1st and 2nd pain points you described are very valid, as
> > I'm
> > > >> more
> > > >> >>>>>> familiar with them. I agree these are shortcomings of the
> > current
> > > >> >>>>> Flink SQL
> > > >> >>>>>> design.
> > > >> >>>>>>
> > > >> >>>>>> A couple comments on your 1st proposal:
> > > >> >>>>>>
> > > >> >>>>>> 1. is it better to have explicit APIs like
> > > >> >>>>> "createBatchTableSource(...)"
> > > >> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory
> > > (would
> > > >> be
> > > >> >>>>>> similar for sink factory) to let planner handle which mode
> > > >> (streaming
> > > >> >>>>> vs
> > > >> >>>>>> batch) of source should be instantiated? That way we don't
> need
> > > to
> > > >> >>>>> always
> > > >> >>>>>> let connector developers handling an if-else on
> > isStreamingMode.
> > > >> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> > > >> class.
> > > >> >>>>> The
> > > >> >>>>>> path, table, and config are fairly independent of each other.
> > So
> > > >> why
> > > >> >>>>> not
> > > >> >>>>>> pass the config in as 3rd parameter as
> > > `createXxxTableSource(path,
> > > >> >>>>>> catalogTable, tableConfig)?
> > > >> >>>>>>
> > > >> >>>>>>
> > > >> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <
> > > >> [hidden email]>
> > > >> >>>>> wrote:
> > > >> >>>>>>
> > > >> >>>>>>> Hi dev,
> > > >> >>>>>>>
> > > >> >>>>>>> I'd like to kick off a discussion on the improvement of
> > > >> >>>>> TableSourceFactory
> > > >> >>>>>>> and TableSinkFactory.
> > > >> >>>>>>>
> > > >> >>>>>>> Motivation:
> > > >> >>>>>>> Now the main needs and problems are:
> > > >> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors
> > really
> > > >> >>>>> need to be
> > > >> >>>>>>> controlled by the user's table configuration. In the era of
> > > >> catalog,
> > > >> >>>>> we
> > > >> >>>>>>> can't put these config in connector properties, which is too
> > > >> >>>>> inconvenient.
> > > >> >>>>>>> 2.Connector can't know if this is batch or stream execution
> > > mode.
> > > >> >>>>> But the
> > > >> >>>>>>> sink implementation of batch and stream is totally
> different.
> > I
> > > >> >>>>> understand
> > > >> >>>>>>> there is an update mode property now, but it splits the
> batch
> > > and
> > > >> >>>>> stream in
> > > >> >>>>>>> the catalog dimension. In fact, this information can be
> > obtained
> > > >> >>>>> through
> > > >> >>>>>>> the current TableEnvironment.
> > > >> >>>>>>> 3.No interface to call validation. Now our validation is
> more
> > > util
> > > >> >>>>> classes.
> > > >> >>>>>>> It depends on whether or not the connector calls. Now we
> have
> > > some
> > > >> >>>>> new
> > > >> >>>>>>> validations to add, such as [2], which is really confuse
> uses,
> > > >> even
> > > >> >>>>>>> developers. Another problem is that our SQL update (DDL)
> does
> > > not
> > > >> >>>>> have
> > > >> >>>>>>> validation [3]. It is better to report an error when
> executing
> > > >> DDL,
> > > >> >>>>>>> otherwise it will confuse the user.
> > > >> >>>>>>>
> > > >> >>>>>>> Proposed change draft for 1 and 2:
> > > >> >>>>>>>
> > > >> >>>>>>> interface CatalogTableContext {
> > > >> >>>>>>>      ObjectPath getTablePath();
> > > >> >>>>>>>      CatalogTable getTable();
> > > >> >>>>>>>      ReadableConfig getTableConfig();
> > > >> >>>>>>>      boolean isStreamingMode();
> > > >> >>>>>>> }
> > > >> >>>>>>>
> > > >> >>>>>>> public interface TableSourceFactory<T> extends TableFactory
> {
> > > >> >>>>>>>
> > > >> >>>>>>>      default TableSource<T>
> > > createTableSource(CatalogTableContext
> > > >> >>>>> context) {
> > > >> >>>>>>>         return createTableSource(context.getTablePath(),
> > > >> >>>>> context.getTable());
> > > >> >>>>>>>      }
> > > >> >>>>>>>
> > > >> >>>>>>>      ......
> > > >> >>>>>>> }
> > > >> >>>>>>>
> > > >> >>>>>>> Proposed change draft for 3:
> > > >> >>>>>>>
> > > >> >>>>>>> public interface TableFactory {
> > > >> >>>>>>>
> > > >> >>>>>>>      TableValidators validators();
> > > >> >>>>>>>
> > > >> >>>>>>>      interface TableValidators {
> > > >> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
> > > >> >>>>>>>         TableSchemaValidator schemaValidator();
> > > >> >>>>>>>         FormatDescriptorValidator formatValidator();
> > > >> >>>>>>>      }
> > > >> >>>>>>> }
> > > >> >>>>>>>
> > > >> >>>>>>> What do you think?
> > > >> >>>>>>>
> > > >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
> > > >> >>>>>>> [2]
> > > >> >>>>>>>
> > > >> >>>>>>>
> > > >> >>>>>
> > > >>
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> > > >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
> > > >> >>>>>>>
> > > >> >>>>>>> Best,
> > > >> >>>>>>> Jingsong Lee
> > > >> >>>>>>>
> > > >> >>>>>>
> > > >> >>>>>
> > > >> >>>>>
> > > >> >>>>
> > > >> >>>> --
> > > >> >>>> Best, Jingsong Lee
> > > >> >>>>
> > > >> >>>
> > > >> >>>
> > > >> >>> --
> > > >> >>> Best, Jingsong Lee
> > > >> >>>
> > > >> >>
> > > >> >>
> > > >> >> --
> > > >> >> Best, Jingsong Lee
> > > >> >>
> > > >> >
> > > >> >
> > > >>
> > > >>
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


--
Best, Jingsong Lee