[DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

bowen.li
Hi dev,

I'd like to kick off a discussion on adding JDBC catalogs, specifically
Postgres catalog in Flink [1].

Currently users have to manually create schemas in Flink source/sink
mirroring tables in their relational databases in use cases like JDBC
read/write and consuming CDC. Many users have complaint about the
unnecessary, redundant, manual work. Any mismatch can lead to a failing
Flink job at runtime instead of compile time. All these have been quite
unpleasant, resulting in a broken user experience.

We want to provide a JDBC catalog interface and a Postgres implementation
for Flink as a start to connect to all kinds of relational databases,
enabling Flink SQL to 1) retrieve table schema automatically without
requiring user writes duped DDL 2) check at compile time for schema errors.
It will greatly streamline user experiences when using Flink to deal with
popular relational databases like Postgres, MySQL, MariaDB, AWS Aurora, etc.

Note that the problem and solution are actually very general to Flink when
connecting to all kinds of external systems. We just focus on solving that
for relational databases in this FLIP.

Thanks,
Bowen

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

Jark Wu-2
Thanks Bowen for driving this.

+1 to this feature.

My concern is that why introducing a `PostgresJDBCCatalog`, not a generic
`JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
From my understanding, JDBC catalog is similar to JDBC source/sink. For
JDBC source/sink, we have a generic
implementation for JDBC and delegate operations to JDBCDialect. Different
driver may have different implementation of
JDBCDialect, e.g `quoteIdentifier()`.

For JDBC catalog, I guess maybe we can do it in the same way, i.e. a
generic JDBCCatalog implementation and delegate
operations to JDBCDialect, and we will have `listDataBase()`,
`listTables()` interfaces in JDBCDialect. The benefit is that:
0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also need to quote
identifiers.
1) we can easily to support a new database catalog (e.g. mysql) by
implementing new dialects (e.g. MySQLDialect).
2) this can keep the same behavior as JDBC source/sink, i.e.
connector.type=jdbc, catalog.type=jdbc

Best,
Jark


On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]> wrote:

> Hi dev,
>
> I'd like to kick off a discussion on adding JDBC catalogs, specifically
> Postgres catalog in Flink [1].
>
> Currently users have to manually create schemas in Flink source/sink
> mirroring tables in their relational databases in use cases like JDBC
> read/write and consuming CDC. Many users have complaint about the
> unnecessary, redundant, manual work. Any mismatch can lead to a failing
> Flink job at runtime instead of compile time. All these have been quite
> unpleasant, resulting in a broken user experience.
>
> We want to provide a JDBC catalog interface and a Postgres implementation
> for Flink as a start to connect to all kinds of relational databases,
> enabling Flink SQL to 1) retrieve table schema automatically without
> requiring user writes duped DDL 2) check at compile time for schema errors.
> It will greatly streamline user experiences when using Flink to deal with
> popular relational databases like Postgres, MySQL, MariaDB, AWS Aurora,
> etc.
>
> Note that the problem and solution are actually very general to Flink when
> connecting to all kinds of external systems. We just focus on solving that
> for relational databases in this FLIP.
>
> Thanks,
> Bowen
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

Jingsong Li
Thanks Bowen for driving this,

+1 for this, The DDL schema definition is a headache for users, and catalog
is a solution to this problem.

I have some questions and suggestions:

- We can provide a Builder for Catalog, In my opinion, defaultDatabase,
username, pwd can be included in JDBC DB url.

- About timestamp and time, write down the specific Flink precision of
Postgres?

- I think there is a part missing in your document, that is how to use this
catalog. If you can write a complete example, I think it will be much
clearer.

- So a thing is what TableFactory will this catalog use? For example,
JDBCTableSourceSinkFactory has different parameters for source or sink? How
do you think about it?

Best,
Jingsong Lee

On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]> wrote:

> Thanks Bowen for driving this.
>
> +1 to this feature.
>
> My concern is that why introducing a `PostgresJDBCCatalog`, not a generic
> `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> From my understanding, JDBC catalog is similar to JDBC source/sink. For
> JDBC source/sink, we have a generic
> implementation for JDBC and delegate operations to JDBCDialect. Different
> driver may have different implementation of
> JDBCDialect, e.g `quoteIdentifier()`.
>
> For JDBC catalog, I guess maybe we can do it in the same way, i.e. a
> generic JDBCCatalog implementation and delegate
> operations to JDBCDialect, and we will have `listDataBase()`,
> `listTables()` interfaces in JDBCDialect. The benefit is that:
> 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also need to quote
> identifiers.
> 1) we can easily to support a new database catalog (e.g. mysql) by
> implementing new dialects (e.g. MySQLDialect).
> 2) this can keep the same behavior as JDBC source/sink, i.e.
> connector.type=jdbc, catalog.type=jdbc
>
> Best,
> Jark
>
>
> On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]> wrote:
>
> > Hi dev,
> >
> > I'd like to kick off a discussion on adding JDBC catalogs, specifically
> > Postgres catalog in Flink [1].
> >
> > Currently users have to manually create schemas in Flink source/sink
> > mirroring tables in their relational databases in use cases like JDBC
> > read/write and consuming CDC. Many users have complaint about the
> > unnecessary, redundant, manual work. Any mismatch can lead to a failing
> > Flink job at runtime instead of compile time. All these have been quite
> > unpleasant, resulting in a broken user experience.
> >
> > We want to provide a JDBC catalog interface and a Postgres implementation
> > for Flink as a start to connect to all kinds of relational databases,
> > enabling Flink SQL to 1) retrieve table schema automatically without
> > requiring user writes duped DDL 2) check at compile time for schema
> errors.
> > It will greatly streamline user experiences when using Flink to deal with
> > popular relational databases like Postgres, MySQL, MariaDB, AWS Aurora,
> > etc.
> >
> > Note that the problem and solution are actually very general to Flink
> when
> > connecting to all kinds of external systems. We just focus on solving
> that
> > for relational databases in this FLIP.
> >
> > Thanks,
> > Bowen
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

bowen.li
Hi Jark and Jingsong,

Thanks for your review. Please see my reply in line.

> why introducing a `PostgresJDBCCatalog`, not a generic `JDBCCatalog`
(catalog.type = 'postgres' vs 'jdbc') ?

Thanks for the reminding and I looked at JDBCDialect. A generic,
user-facing JDBCCatalog with catalog.type = jdbc and find specific db
implementations (pg v.s. mysql v.s. ...) is more aligned with how jdbc
sink/source is handled, indeed. However, the catalogs would also need to
execute the query and parse query results in a db-dependent way. E.g. jdbc
catalog needs to establish connections to different databases within a db
instance on demand. So just having JDBCDialect won't be enough.

I think we can do the following:
  - provide a user-facing JDBCCatalog, composing a db-specific impl like
PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify "jdbc" as
type in both Table API and SQL CLI, internally it will create a db-specific
impl depending on jdbc base url.
  - some statements can reside in JDBCDialect. Query execution and result
parsing logic would be located in db-specific impls.

- We can provide a Builder for Catalog, In my opinion, defaultDatabase,
username, pwd can be included in JDBC DB url.

I don't see much value in providing a builder for jdbc catalogs, as they
only have 4 or 5 required params, no optional ones. I prefer users just
provide a base url without default db, usrname, pwd so we don't need to
parse url all around, as I mentioned jdbc catalog may need to establish
connections to different databases in a db instance,

- About timestamp and time, write down the specific Flink precision of
Postgres?

I've documented that. It's 0-6

- I think there is a part missing in your document, that is how to use this
catalog. If you can write a complete example, I think it will be much
clearer.

I added some examples in both table api and SQL Cli. It will be no
different from existing catalogs.

- So a thing is what TableFactory will this catalog use? For example,
JDBCTableSourceSinkFactory has different parameters for source or sink? How
do you think about it?

This catalog will directly call JDBCTableSourceSinkFactory without going
thru service discovery because we are sure it's a jdbc table. I added it to
the doc.

For the different params besides schema, as we discussed offline,
unfortunately we can't do anything right now until Flink DDL/DML are able
to distinguish 3 types of params - external data's metada, source/sink
runtime params, and Flink semantics params. The latter two can't be
provided by catalogs. The problem is actually general to all catalogs, not
just JDBCCatalog. I'm pushing for such an effort to solve it. At this
moment we can only use some default params for some cases, and the other
cases cannot take advantage of the JDBC catalog and users still have to
write DDL manually.

Thanks,
Bowen

On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <[hidden email]> wrote:

> Thanks Bowen for driving this,
>
> +1 for this, The DDL schema definition is a headache for users, and catalog
> is a solution to this problem.
>
> I have some questions and suggestions:
>
> - We can provide a Builder for Catalog, In my opinion, defaultDatabase,
> username, pwd can be included in JDBC DB url.
>
> - About timestamp and time, write down the specific Flink precision of
> Postgres?
>
> - I think there is a part missing in your document, that is how to use this
> catalog. If you can write a complete example, I think it will be much
> clearer.
>
> - So a thing is what TableFactory will this catalog use? For example,
> JDBCTableSourceSinkFactory has different parameters for source or sink? How
> do you think about it?
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]> wrote:
>
> > Thanks Bowen for driving this.
> >
> > +1 to this feature.
> >
> > My concern is that why introducing a `PostgresJDBCCatalog`, not a generic
> > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> > From my understanding, JDBC catalog is similar to JDBC source/sink. For
> > JDBC source/sink, we have a generic
> > implementation for JDBC and delegate operations to JDBCDialect. Different
> > driver may have different implementation of
> > JDBCDialect, e.g `quoteIdentifier()`.
> >
> > For JDBC catalog, I guess maybe we can do it in the same way, i.e. a
> > generic JDBCCatalog implementation and delegate
> > operations to JDBCDialect, and we will have `listDataBase()`,
> > `listTables()` interfaces in JDBCDialect. The benefit is that:
> > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also need to
> quote
> > identifiers.
> > 1) we can easily to support a new database catalog (e.g. mysql) by
> > implementing new dialects (e.g. MySQLDialect).
> > 2) this can keep the same behavior as JDBC source/sink, i.e.
> > connector.type=jdbc, catalog.type=jdbc
> >
> > Best,
> > Jark
> >
> >
> > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]> wrote:
> >
> > > Hi dev,
> > >
> > > I'd like to kick off a discussion on adding JDBC catalogs, specifically
> > > Postgres catalog in Flink [1].
> > >
> > > Currently users have to manually create schemas in Flink source/sink
> > > mirroring tables in their relational databases in use cases like JDBC
> > > read/write and consuming CDC. Many users have complaint about the
> > > unnecessary, redundant, manual work. Any mismatch can lead to a failing
> > > Flink job at runtime instead of compile time. All these have been quite
> > > unpleasant, resulting in a broken user experience.
> > >
> > > We want to provide a JDBC catalog interface and a Postgres
> implementation
> > > for Flink as a start to connect to all kinds of relational databases,
> > > enabling Flink SQL to 1) retrieve table schema automatically without
> > > requiring user writes duped DDL 2) check at compile time for schema
> > errors.
> > > It will greatly streamline user experiences when using Flink to deal
> with
> > > popular relational databases like Postgres, MySQL, MariaDB, AWS Aurora,
> > > etc.
> > >
> > > Note that the problem and solution are actually very general to Flink
> > when
> > > connecting to all kinds of external systems. We just focus on solving
> > that
> > > for relational databases in this FLIP.
> > >
> > > Thanks,
> > > Bowen
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> > >
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

Jark Wu-2
Thanks Bowen for the reply,

A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good to me.

I have some other minor comments when I went through the updated
documentation:

1) 'base_url' configuration: We are following the configuration format
guideline [1] which suggest to use dash (-) instead of underline (_).
     And I'm a little confused the meaning of "base_url" at the first
glance, another idea is split it into several configurations: 'driver',
'hostname', 'port'.

2) 'default-database' is optional, then which database will be used or what
is the behavior when the default database is not selected.

3) a builder for jdbc catalogs: I agree with Jingsong to provide a builder.
Because there is optional configuration here (the default database),
   and providind Builder as the API will be easier for evolution, I'm not
sure we won't add/modify parameters in the future.

[1]:
https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes

On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]> wrote:

> Hi Jark and Jingsong,
>
> Thanks for your review. Please see my reply in line.
>
> > why introducing a `PostgresJDBCCatalog`, not a generic `JDBCCatalog`
> (catalog.type = 'postgres' vs 'jdbc') ?
>
> Thanks for the reminding and I looked at JDBCDialect. A generic,
> user-facing JDBCCatalog with catalog.type = jdbc and find specific db
> implementations (pg v.s. mysql v.s. ...) is more aligned with how jdbc
> sink/source is handled, indeed. However, the catalogs would also need to
> execute the query and parse query results in a db-dependent way. E.g. jdbc
> catalog needs to establish connections to different databases within a db
> instance on demand. So just having JDBCDialect won't be enough.
>
> I think we can do the following:
>   - provide a user-facing JDBCCatalog, composing a db-specific impl like
> PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify "jdbc" as
> type in both Table API and SQL CLI, internally it will create a db-specific
> impl depending on jdbc base url.
>   - some statements can reside in JDBCDialect. Query execution and result
> parsing logic would be located in db-specific impls.
>
> - We can provide a Builder for Catalog, In my opinion, defaultDatabase,
> username, pwd can be included in JDBC DB url.
>
> I don't see much value in providing a builder for jdbc catalogs, as they
> only have 4 or 5 required params, no optional ones. I prefer users just
> provide a base url without default db, usrname, pwd so we don't need to
> parse url all around, as I mentioned jdbc catalog may need to establish
> connections to different databases in a db instance,
>
> - About timestamp and time, write down the specific Flink precision of
> Postgres?
>
> I've documented that. It's 0-6
>
> - I think there is a part missing in your document, that is how to use this
> catalog. If you can write a complete example, I think it will be much
> clearer.
>
> I added some examples in both table api and SQL Cli. It will be no
> different from existing catalogs.
>
> - So a thing is what TableFactory will this catalog use? For example,
> JDBCTableSourceSinkFactory has different parameters for source or sink? How
> do you think about it?
>
> This catalog will directly call JDBCTableSourceSinkFactory without going
> thru service discovery because we are sure it's a jdbc table. I added it to
> the doc.
>
> For the different params besides schema, as we discussed offline,
> unfortunately we can't do anything right now until Flink DDL/DML are able
> to distinguish 3 types of params - external data's metada, source/sink
> runtime params, and Flink semantics params. The latter two can't be
> provided by catalogs. The problem is actually general to all catalogs, not
> just JDBCCatalog. I'm pushing for such an effort to solve it. At this
> moment we can only use some default params for some cases, and the other
> cases cannot take advantage of the JDBC catalog and users still have to
> write DDL manually.
>
> Thanks,
> Bowen
>
> On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <[hidden email]> wrote:
>
> > Thanks Bowen for driving this,
> >
> > +1 for this, The DDL schema definition is a headache for users, and
> catalog
> > is a solution to this problem.
> >
> > I have some questions and suggestions:
> >
> > - We can provide a Builder for Catalog, In my opinion, defaultDatabase,
> > username, pwd can be included in JDBC DB url.
> >
> > - About timestamp and time, write down the specific Flink precision of
> > Postgres?
> >
> > - I think there is a part missing in your document, that is how to use
> this
> > catalog. If you can write a complete example, I think it will be much
> > clearer.
> >
> > - So a thing is what TableFactory will this catalog use? For example,
> > JDBCTableSourceSinkFactory has different parameters for source or sink?
> How
> > do you think about it?
> >
> > Best,
> > Jingsong Lee
> >
> > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]> wrote:
> >
> > > Thanks Bowen for driving this.
> > >
> > > +1 to this feature.
> > >
> > > My concern is that why introducing a `PostgresJDBCCatalog`, not a
> generic
> > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> > > From my understanding, JDBC catalog is similar to JDBC source/sink. For
> > > JDBC source/sink, we have a generic
> > > implementation for JDBC and delegate operations to JDBCDialect.
> Different
> > > driver may have different implementation of
> > > JDBCDialect, e.g `quoteIdentifier()`.
> > >
> > > For JDBC catalog, I guess maybe we can do it in the same way, i.e. a
> > > generic JDBCCatalog implementation and delegate
> > > operations to JDBCDialect, and we will have `listDataBase()`,
> > > `listTables()` interfaces in JDBCDialect. The benefit is that:
> > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also need to
> > quote
> > > identifiers.
> > > 1) we can easily to support a new database catalog (e.g. mysql) by
> > > implementing new dialects (e.g. MySQLDialect).
> > > 2) this can keep the same behavior as JDBC source/sink, i.e.
> > > connector.type=jdbc, catalog.type=jdbc
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]> wrote:
> > >
> > > > Hi dev,
> > > >
> > > > I'd like to kick off a discussion on adding JDBC catalogs,
> specifically
> > > > Postgres catalog in Flink [1].
> > > >
> > > > Currently users have to manually create schemas in Flink source/sink
> > > > mirroring tables in their relational databases in use cases like JDBC
> > > > read/write and consuming CDC. Many users have complaint about the
> > > > unnecessary, redundant, manual work. Any mismatch can lead to a
> failing
> > > > Flink job at runtime instead of compile time. All these have been
> quite
> > > > unpleasant, resulting in a broken user experience.
> > > >
> > > > We want to provide a JDBC catalog interface and a Postgres
> > implementation
> > > > for Flink as a start to connect to all kinds of relational databases,
> > > > enabling Flink SQL to 1) retrieve table schema automatically without
> > > > requiring user writes duped DDL 2) check at compile time for schema
> > > errors.
> > > > It will greatly streamline user experiences when using Flink to deal
> > with
> > > > popular relational databases like Postgres, MySQL, MariaDB, AWS
> Aurora,
> > > > etc.
> > > >
> > > > Note that the problem and solution are actually very general to Flink
> > > when
> > > > connecting to all kinds of external systems. We just focus on solving
> > > that
> > > > for relational databases in this FLIP.
> > > >
> > > > Thanks,
> > > > Bowen
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

Danny Chan
Thanks Bowen for bringing up this discussion ~

I think the JDBC catalog is a useful feature.

Just one question about the "Flink-Postgres Metaspace Mapping” part:

Since the PostgreSQL does not have catalog but schema under database, why not mapping the PG-database to Flink catalog and PG-schema to Flink database

Flink Catalog Metaspace Structure
Postgres Metaspace Structure
catalog name (defined in Flink only)
database name
database name
schema name
table name
table name

Because the “catalog/database/table” are just the categories to manage the tables, and they are not standard, this is just a suggestion from my side

I’m a little worried about the extensibility of the current categories of Flink, i.e. how about there are JDBC vendors that have something other than “schema”(for example “namespace” or something like that) ? For example, here is what I found for MySQL[1]

>As defined in the MySQL Glossary:
In MySQL, physically, a schema is synonymous with a database. You can substitute the keyword SCHEMA instead of DATABASE in MySQL SQL syntax, for example using CREATE SCHEMA instead of CREATE DATABASE.


[1] https://stackoverflow.com/questions/11618277/difference-between-schema-database-in-mysql


Best,
Danny Chan
在 2020年1月10日 +0800 AM10:34,Jark Wu <[hidden email]>,写道:

> Thanks Bowen for the reply,
>
> A user-facing JDBCCatalog and 'catalog.type' = 'jdbc' sounds good to me.
>
> I have some other minor comments when I went through the updated
> documentation:
>
> 1) 'base_url' configuration: We are following the configuration format
> guideline [1] which suggest to use dash (-) instead of underline (_).
> And I'm a little confused the meaning of "base_url" at the first
> glance, another idea is split it into several configurations: 'driver',
> 'hostname', 'port'.
>
> 2) 'default-database' is optional, then which database will be used or what
> is the behavior when the default database is not selected.
>
> 3) a builder for jdbc catalogs: I agree with Jingsong to provide a builder.
> Because there is optional configuration here (the default database),
> and providind Builder as the API will be easier for evolution, I'm not
> sure we won't add/modify parameters in the future.
>
> [1]:
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
>
> On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]> wrote:
>
> > Hi Jark and Jingsong,
> >
> > Thanks for your review. Please see my reply in line.
> >
> > > why introducing a `PostgresJDBCCatalog`, not a generic `JDBCCatalog`
> > (catalog.type = 'postgres' vs 'jdbc') ?
> >
> > Thanks for the reminding and I looked at JDBCDialect. A generic,
> > user-facing JDBCCatalog with catalog.type = jdbc and find specific db
> > implementations (pg v.s. mysql v.s. ...) is more aligned with how jdbc
> > sink/source is handled, indeed. However, the catalogs would also need to
> > execute the query and parse query results in a db-dependent way. E.g. jdbc
> > catalog needs to establish connections to different databases within a db
> > instance on demand. So just having JDBCDialect won't be enough.
> >
> > I think we can do the following:
> > - provide a user-facing JDBCCatalog, composing a db-specific impl like
> > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify "jdbc" as
> > type in both Table API and SQL CLI, internally it will create a db-specific
> > impl depending on jdbc base url.
> > - some statements can reside in JDBCDialect. Query execution and result
> > parsing logic would be located in db-specific impls.
> >
> > - We can provide a Builder for Catalog, In my opinion, defaultDatabase,
> > username, pwd can be included in JDBC DB url.
> >
> > I don't see much value in providing a builder for jdbc catalogs, as they
> > only have 4 or 5 required params, no optional ones. I prefer users just
> > provide a base url without default db, usrname, pwd so we don't need to
> > parse url all around, as I mentioned jdbc catalog may need to establish
> > connections to different databases in a db instance,
> >
> > - About timestamp and time, write down the specific Flink precision of
> > Postgres?
> >
> > I've documented that. It's 0-6
> >
> > - I think there is a part missing in your document, that is how to use this
> > catalog. If you can write a complete example, I think it will be much
> > clearer.
> >
> > I added some examples in both table api and SQL Cli. It will be no
> > different from existing catalogs.
> >
> > - So a thing is what TableFactory will this catalog use? For example,
> > JDBCTableSourceSinkFactory has different parameters for source or sink? How
> > do you think about it?
> >
> > This catalog will directly call JDBCTableSourceSinkFactory without going
> > thru service discovery because we are sure it's a jdbc table. I added it to
> > the doc.
> >
> > For the different params besides schema, as we discussed offline,
> > unfortunately we can't do anything right now until Flink DDL/DML are able
> > to distinguish 3 types of params - external data's metada, source/sink
> > runtime params, and Flink semantics params. The latter two can't be
> > provided by catalogs. The problem is actually general to all catalogs, not
> > just JDBCCatalog. I'm pushing for such an effort to solve it. At this
> > moment we can only use some default params for some cases, and the other
> > cases cannot take advantage of the JDBC catalog and users still have to
> > write DDL manually.
> >
> > Thanks,
> > Bowen
> >
> > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <[hidden email]> wrote:
> >
> > > Thanks Bowen for driving this,
> > >
> > > +1 for this, The DDL schema definition is a headache for users, and
> > catalog
> > > is a solution to this problem.
> > >
> > > I have some questions and suggestions:
> > >
> > > - We can provide a Builder for Catalog, In my opinion, defaultDatabase,
> > > username, pwd can be included in JDBC DB url.
> > >
> > > - About timestamp and time, write down the specific Flink precision of
> > > Postgres?
> > >
> > > - I think there is a part missing in your document, that is how to use
> > this
> > > catalog. If you can write a complete example, I think it will be much
> > > clearer.
> > >
> > > - So a thing is what TableFactory will this catalog use? For example,
> > > JDBCTableSourceSinkFactory has different parameters for source or sink?
> > How
> > > do you think about it?
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]> wrote:
> > >
> > > > Thanks Bowen for driving this.
> > > >
> > > > +1 to this feature.
> > > >
> > > > My concern is that why introducing a `PostgresJDBCCatalog`, not a
> > generic
> > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> > > > From my understanding, JDBC catalog is similar to JDBC source/sink. For
> > > > JDBC source/sink, we have a generic
> > > > implementation for JDBC and delegate operations to JDBCDialect.
> > Different
> > > > driver may have different implementation of
> > > > JDBCDialect, e.g `quoteIdentifier()`.
> > > >
> > > > For JDBC catalog, I guess maybe we can do it in the same way, i.e. a
> > > > generic JDBCCatalog implementation and delegate
> > > > operations to JDBCDialect, and we will have `listDataBase()`,
> > > > `listTables()` interfaces in JDBCDialect. The benefit is that:
> > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also need to
> > > quote
> > > > identifiers.
> > > > 1) we can easily to support a new database catalog (e.g. mysql) by
> > > > implementing new dialects (e.g. MySQLDialect).
> > > > 2) this can keep the same behavior as JDBC source/sink, i.e.
> > > > connector.type=jdbc, catalog.type=jdbc
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]> wrote:
> > > >
> > > > > Hi dev,
> > > > >
> > > > > I'd like to kick off a discussion on adding JDBC catalogs,
> > specifically
> > > > > Postgres catalog in Flink [1].
> > > > >
> > > > > Currently users have to manually create schemas in Flink source/sink
> > > > > mirroring tables in their relational databases in use cases like JDBC
> > > > > read/write and consuming CDC. Many users have complaint about the
> > > > > unnecessary, redundant, manual work. Any mismatch can lead to a
> > failing
> > > > > Flink job at runtime instead of compile time. All these have been
> > quite
> > > > > unpleasant, resulting in a broken user experience.
> > > > >
> > > > > We want to provide a JDBC catalog interface and a Postgres
> > > implementation
> > > > > for Flink as a start to connect to all kinds of relational databases,
> > > > > enabling Flink SQL to 1) retrieve table schema automatically without
> > > > > requiring user writes duped DDL 2) check at compile time for schema
> > > > errors.
> > > > > It will greatly streamline user experiences when using Flink to deal
> > > with
> > > > > popular relational databases like Postgres, MySQL, MariaDB, AWS
> > Aurora,
> > > > > etc.
> > > > >
> > > > > Note that the problem and solution are actually very general to Flink
> > > > when
> > > > > connecting to all kinds of external systems. We just focus on solving
> > > > that
> > > > > for relational databases in this FLIP.
> > > > >
> > > > > Thanks,
> > > > > Bowen
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

Jingsong Li
In reply to this post by Jark Wu-2
Hi Bowen, thanks for reply and updating.

> I don't see much value in providing a builder for jdbc catalogs, as they
only have 4 or 5 required params, no optional ones. I prefer users just
provide a base url without default db, usrname, pwd so we don't need to
parse url all around, as I mentioned jdbc catalog may need to establish
connections to different databases in a db instance,

I suggest that the parameters can be completely consistent with the
JDBCTableSource / JDBCTableSink.
If you take a look to JDBC api: "DriverManager.getConnection".
That allow "default db, username, pwd" things optional. They can included
in URL. Of course JDBC api also allows establishing connections to
different databases in a db instance.
So I think we don't need provide a "base_url", we can just provide a real
"url".
To be consistent with JDBC api.

Best,
Jingsong Lee

On Fri, Jan 10, 2020 at 10:34 AM Jark Wu <[hidden email]> wrote:

> Thanks Bowen for the reply,
>
> A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good to me.
>
> I have some other minor comments when I went through the updated
> documentation:
>
> 1) 'base_url' configuration: We are following the configuration format
> guideline [1] which suggest to use dash (-) instead of underline (_).
>      And I'm a little confused the meaning of "base_url" at the first
> glance, another idea is split it into several configurations: 'driver',
> 'hostname', 'port'.
>
> 2) 'default-database' is optional, then which database will be used or what
> is the behavior when the default database is not selected.
>
> 3) a builder for jdbc catalogs: I agree with Jingsong to provide a builder.
> Because there is optional configuration here (the default database),
>    and providind Builder as the API will be easier for evolution, I'm not
> sure we won't add/modify parameters in the future.
>
> [1]:
>
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
>
> On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]> wrote:
>
> > Hi Jark and Jingsong,
> >
> > Thanks for your review. Please see my reply in line.
> >
> > > why introducing a `PostgresJDBCCatalog`, not a generic `JDBCCatalog`
> > (catalog.type = 'postgres' vs 'jdbc') ?
> >
> > Thanks for the reminding and I looked at JDBCDialect. A generic,
> > user-facing JDBCCatalog with catalog.type = jdbc and find specific db
> > implementations (pg v.s. mysql v.s. ...) is more aligned with how jdbc
> > sink/source is handled, indeed. However, the catalogs would also need to
> > execute the query and parse query results in a db-dependent way. E.g.
> jdbc
> > catalog needs to establish connections to different databases within a db
> > instance on demand. So just having JDBCDialect won't be enough.
> >
> > I think we can do the following:
> >   - provide a user-facing JDBCCatalog, composing a db-specific impl like
> > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify "jdbc" as
> > type in both Table API and SQL CLI, internally it will create a
> db-specific
> > impl depending on jdbc base url.
> >   - some statements can reside in JDBCDialect. Query execution and result
> > parsing logic would be located in db-specific impls.
> >
> > - We can provide a Builder for Catalog, In my opinion, defaultDatabase,
> > username, pwd can be included in JDBC DB url.
> >
> > I don't see much value in providing a builder for jdbc catalogs, as they
> > only have 4 or 5 required params, no optional ones. I prefer users just
> > provide a base url without default db, usrname, pwd so we don't need to
> > parse url all around, as I mentioned jdbc catalog may need to establish
> > connections to different databases in a db instance,
> >
> > - About timestamp and time, write down the specific Flink precision of
> > Postgres?
> >
> > I've documented that. It's 0-6
> >
> > - I think there is a part missing in your document, that is how to use
> this
> > catalog. If you can write a complete example, I think it will be much
> > clearer.
> >
> > I added some examples in both table api and SQL Cli. It will be no
> > different from existing catalogs.
> >
> > - So a thing is what TableFactory will this catalog use? For example,
> > JDBCTableSourceSinkFactory has different parameters for source or sink?
> How
> > do you think about it?
> >
> > This catalog will directly call JDBCTableSourceSinkFactory without going
> > thru service discovery because we are sure it's a jdbc table. I added it
> to
> > the doc.
> >
> > For the different params besides schema, as we discussed offline,
> > unfortunately we can't do anything right now until Flink DDL/DML are able
> > to distinguish 3 types of params - external data's metada, source/sink
> > runtime params, and Flink semantics params. The latter two can't be
> > provided by catalogs. The problem is actually general to all catalogs,
> not
> > just JDBCCatalog. I'm pushing for such an effort to solve it. At this
> > moment we can only use some default params for some cases, and the other
> > cases cannot take advantage of the JDBC catalog and users still have to
> > write DDL manually.
> >
> > Thanks,
> > Bowen
> >
> > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <[hidden email]>
> wrote:
> >
> > > Thanks Bowen for driving this,
> > >
> > > +1 for this, The DDL schema definition is a headache for users, and
> > catalog
> > > is a solution to this problem.
> > >
> > > I have some questions and suggestions:
> > >
> > > - We can provide a Builder for Catalog, In my opinion, defaultDatabase,
> > > username, pwd can be included in JDBC DB url.
> > >
> > > - About timestamp and time, write down the specific Flink precision of
> > > Postgres?
> > >
> > > - I think there is a part missing in your document, that is how to use
> > this
> > > catalog. If you can write a complete example, I think it will be much
> > > clearer.
> > >
> > > - So a thing is what TableFactory will this catalog use? For example,
> > > JDBCTableSourceSinkFactory has different parameters for source or sink?
> > How
> > > do you think about it?
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]> wrote:
> > >
> > > > Thanks Bowen for driving this.
> > > >
> > > > +1 to this feature.
> > > >
> > > > My concern is that why introducing a `PostgresJDBCCatalog`, not a
> > generic
> > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> > > > From my understanding, JDBC catalog is similar to JDBC source/sink.
> For
> > > > JDBC source/sink, we have a generic
> > > > implementation for JDBC and delegate operations to JDBCDialect.
> > Different
> > > > driver may have different implementation of
> > > > JDBCDialect, e.g `quoteIdentifier()`.
> > > >
> > > > For JDBC catalog, I guess maybe we can do it in the same way, i.e. a
> > > > generic JDBCCatalog implementation and delegate
> > > > operations to JDBCDialect, and we will have `listDataBase()`,
> > > > `listTables()` interfaces in JDBCDialect. The benefit is that:
> > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also need to
> > > quote
> > > > identifiers.
> > > > 1) we can easily to support a new database catalog (e.g. mysql) by
> > > > implementing new dialects (e.g. MySQLDialect).
> > > > 2) this can keep the same behavior as JDBC source/sink, i.e.
> > > > connector.type=jdbc, catalog.type=jdbc
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > >
> > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]> wrote:
> > > >
> > > > > Hi dev,
> > > > >
> > > > > I'd like to kick off a discussion on adding JDBC catalogs,
> > specifically
> > > > > Postgres catalog in Flink [1].
> > > > >
> > > > > Currently users have to manually create schemas in Flink
> source/sink
> > > > > mirroring tables in their relational databases in use cases like
> JDBC
> > > > > read/write and consuming CDC. Many users have complaint about the
> > > > > unnecessary, redundant, manual work. Any mismatch can lead to a
> > failing
> > > > > Flink job at runtime instead of compile time. All these have been
> > quite
> > > > > unpleasant, resulting in a broken user experience.
> > > > >
> > > > > We want to provide a JDBC catalog interface and a Postgres
> > > implementation
> > > > > for Flink as a start to connect to all kinds of relational
> databases,
> > > > > enabling Flink SQL to 1) retrieve table schema automatically
> without
> > > > > requiring user writes duped DDL 2) check at compile time for schema
> > > > errors.
> > > > > It will greatly streamline user experiences when using Flink to
> deal
> > > with
> > > > > popular relational databases like Postgres, MySQL, MariaDB, AWS
> > Aurora,
> > > > > etc.
> > > > >
> > > > > Note that the problem and solution are actually very general to
> Flink
> > > > when
> > > > > connecting to all kinds of external systems. We just focus on
> solving
> > > > that
> > > > > for relational databases in this FLIP.
> > > > >
> > > > > Thanks,
> > > > > Bowen
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

Zhenghua Gao
Hi Bowen, Thanks for driving this.
I think it would be very convenience to use tables in external DBs with
JDBC Catalog.

I have one concern about "Flink-Postgres Data Type Mapping" part:

In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant
semantic,
and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE

*Best Regards,*
*Zhenghua Gao*


On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li <[hidden email]> wrote:

> Hi Bowen, thanks for reply and updating.
>
> > I don't see much value in providing a builder for jdbc catalogs, as they
> only have 4 or 5 required params, no optional ones. I prefer users just
> provide a base url without default db, usrname, pwd so we don't need to
> parse url all around, as I mentioned jdbc catalog may need to establish
> connections to different databases in a db instance,
>
> I suggest that the parameters can be completely consistent with the
> JDBCTableSource / JDBCTableSink.
> If you take a look to JDBC api: "DriverManager.getConnection".
> That allow "default db, username, pwd" things optional. They can included
> in URL. Of course JDBC api also allows establishing connections to
> different databases in a db instance.
> So I think we don't need provide a "base_url", we can just provide a real
> "url".
> To be consistent with JDBC api.
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 10, 2020 at 10:34 AM Jark Wu <[hidden email]> wrote:
>
> > Thanks Bowen for the reply,
> >
> > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good to me.
> >
> > I have some other minor comments when I went through the updated
> > documentation:
> >
> > 1) 'base_url' configuration: We are following the configuration format
> > guideline [1] which suggest to use dash (-) instead of underline (_).
> >      And I'm a little confused the meaning of "base_url" at the first
> > glance, another idea is split it into several configurations: 'driver',
> > 'hostname', 'port'.
> >
> > 2) 'default-database' is optional, then which database will be used or
> what
> > is the behavior when the default database is not selected.
> >
> > 3) a builder for jdbc catalogs: I agree with Jingsong to provide a
> builder.
> > Because there is optional configuration here (the default database),
> >    and providind Builder as the API will be easier for evolution, I'm not
> > sure we won't add/modify parameters in the future.
> >
> > [1]:
> >
> >
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> >
> > On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]> wrote:
> >
> > > Hi Jark and Jingsong,
> > >
> > > Thanks for your review. Please see my reply in line.
> > >
> > > > why introducing a `PostgresJDBCCatalog`, not a generic `JDBCCatalog`
> > > (catalog.type = 'postgres' vs 'jdbc') ?
> > >
> > > Thanks for the reminding and I looked at JDBCDialect. A generic,
> > > user-facing JDBCCatalog with catalog.type = jdbc and find specific db
> > > implementations (pg v.s. mysql v.s. ...) is more aligned with how jdbc
> > > sink/source is handled, indeed. However, the catalogs would also need
> to
> > > execute the query and parse query results in a db-dependent way. E.g.
> > jdbc
> > > catalog needs to establish connections to different databases within a
> db
> > > instance on demand. So just having JDBCDialect won't be enough.
> > >
> > > I think we can do the following:
> > >   - provide a user-facing JDBCCatalog, composing a db-specific impl
> like
> > > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify "jdbc" as
> > > type in both Table API and SQL CLI, internally it will create a
> > db-specific
> > > impl depending on jdbc base url.
> > >   - some statements can reside in JDBCDialect. Query execution and
> result
> > > parsing logic would be located in db-specific impls.
> > >
> > > - We can provide a Builder for Catalog, In my opinion, defaultDatabase,
> > > username, pwd can be included in JDBC DB url.
> > >
> > > I don't see much value in providing a builder for jdbc catalogs, as
> they
> > > only have 4 or 5 required params, no optional ones. I prefer users just
> > > provide a base url without default db, usrname, pwd so we don't need to
> > > parse url all around, as I mentioned jdbc catalog may need to establish
> > > connections to different databases in a db instance,
> > >
> > > - About timestamp and time, write down the specific Flink precision of
> > > Postgres?
> > >
> > > I've documented that. It's 0-6
> > >
> > > - I think there is a part missing in your document, that is how to use
> > this
> > > catalog. If you can write a complete example, I think it will be much
> > > clearer.
> > >
> > > I added some examples in both table api and SQL Cli. It will be no
> > > different from existing catalogs.
> > >
> > > - So a thing is what TableFactory will this catalog use? For example,
> > > JDBCTableSourceSinkFactory has different parameters for source or sink?
> > How
> > > do you think about it?
> > >
> > > This catalog will directly call JDBCTableSourceSinkFactory without
> going
> > > thru service discovery because we are sure it's a jdbc table. I added
> it
> > to
> > > the doc.
> > >
> > > For the different params besides schema, as we discussed offline,
> > > unfortunately we can't do anything right now until Flink DDL/DML are
> able
> > > to distinguish 3 types of params - external data's metada, source/sink
> > > runtime params, and Flink semantics params. The latter two can't be
> > > provided by catalogs. The problem is actually general to all catalogs,
> > not
> > > just JDBCCatalog. I'm pushing for such an effort to solve it. At this
> > > moment we can only use some default params for some cases, and the
> other
> > > cases cannot take advantage of the JDBC catalog and users still have to
> > > write DDL manually.
> > >
> > > Thanks,
> > > Bowen
> > >
> > > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <[hidden email]>
> > wrote:
> > >
> > > > Thanks Bowen for driving this,
> > > >
> > > > +1 for this, The DDL schema definition is a headache for users, and
> > > catalog
> > > > is a solution to this problem.
> > > >
> > > > I have some questions and suggestions:
> > > >
> > > > - We can provide a Builder for Catalog, In my opinion,
> defaultDatabase,
> > > > username, pwd can be included in JDBC DB url.
> > > >
> > > > - About timestamp and time, write down the specific Flink precision
> of
> > > > Postgres?
> > > >
> > > > - I think there is a part missing in your document, that is how to
> use
> > > this
> > > > catalog. If you can write a complete example, I think it will be much
> > > > clearer.
> > > >
> > > > - So a thing is what TableFactory will this catalog use? For example,
> > > > JDBCTableSourceSinkFactory has different parameters for source or
> sink?
> > > How
> > > > do you think about it?
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]> wrote:
> > > >
> > > > > Thanks Bowen for driving this.
> > > > >
> > > > > +1 to this feature.
> > > > >
> > > > > My concern is that why introducing a `PostgresJDBCCatalog`, not a
> > > generic
> > > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> > > > > From my understanding, JDBC catalog is similar to JDBC source/sink.
> > For
> > > > > JDBC source/sink, we have a generic
> > > > > implementation for JDBC and delegate operations to JDBCDialect.
> > > Different
> > > > > driver may have different implementation of
> > > > > JDBCDialect, e.g `quoteIdentifier()`.
> > > > >
> > > > > For JDBC catalog, I guess maybe we can do it in the same way, i.e.
> a
> > > > > generic JDBCCatalog implementation and delegate
> > > > > operations to JDBCDialect, and we will have `listDataBase()`,
> > > > > `listTables()` interfaces in JDBCDialect. The benefit is that:
> > > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also need
> to
> > > > quote
> > > > > identifiers.
> > > > > 1) we can easily to support a new database catalog (e.g. mysql) by
> > > > > implementing new dialects (e.g. MySQLDialect).
> > > > > 2) this can keep the same behavior as JDBC source/sink, i.e.
> > > > > connector.type=jdbc, catalog.type=jdbc
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > >
> > > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]> wrote:
> > > > >
> > > > > > Hi dev,
> > > > > >
> > > > > > I'd like to kick off a discussion on adding JDBC catalogs,
> > > specifically
> > > > > > Postgres catalog in Flink [1].
> > > > > >
> > > > > > Currently users have to manually create schemas in Flink
> > source/sink
> > > > > > mirroring tables in their relational databases in use cases like
> > JDBC
> > > > > > read/write and consuming CDC. Many users have complaint about the
> > > > > > unnecessary, redundant, manual work. Any mismatch can lead to a
> > > failing
> > > > > > Flink job at runtime instead of compile time. All these have been
> > > quite
> > > > > > unpleasant, resulting in a broken user experience.
> > > > > >
> > > > > > We want to provide a JDBC catalog interface and a Postgres
> > > > implementation
> > > > > > for Flink as a start to connect to all kinds of relational
> > databases,
> > > > > > enabling Flink SQL to 1) retrieve table schema automatically
> > without
> > > > > > requiring user writes duped DDL 2) check at compile time for
> schema
> > > > > errors.
> > > > > > It will greatly streamline user experiences when using Flink to
> > deal
> > > > with
> > > > > > popular relational databases like Postgres, MySQL, MariaDB, AWS
> > > Aurora,
> > > > > > etc.
> > > > > >
> > > > > > Note that the problem and solution are actually very general to
> > Flink
> > > > > when
> > > > > > connecting to all kinds of external systems. We just focus on
> > solving
> > > > > that
> > > > > > for relational databases in this FLIP.
> > > > > >
> > > > > > Thanks,
> > > > > > Bowen
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > > >
> > >
> >
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

bowen.li
Thanks everyone for the prompt feedback. Please see my response below.

> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant
semantic, and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME
ZONE

Zhenghua, you are right that pg's 'timestamp with timezone' should be
translated into flink's 'timestamp with local timezone'. I don't find 'time
with (local) timezone' though, so we may not support that type from pg in
Flink.

> I suggest that the parameters can be completely consistent with the
JDBCTableSource / JDBCTableSink. If you take a look to JDBC api:
"DriverManager.getConnection".
That allow "default db, username, pwd" things optional. They can included
in URL. Of course JDBC api also allows establishing connections to
different databases in a db instance. So I think we don't need provide a
"base_url", we can just provide a real "url". To be consistent with JDBC
api.

Jingsong, what I'm saying is a builder can be added on demand later if
there's enough user requesting it, and doesn't need to be a core part of
the FLIP.

Besides, unfortunately Postgres doesn't allow changing databases via JDBC.

JDBC provides different connecting options as you mentioned, but I'd like
to keep our design and API simple and having to handle extra parsing logic.
And it doesn't shut the door for what you proposed as a future effort.

> Since the PostgreSQL does not have catalog but schema under database, why
not mapping the PG-database to Flink catalog and PG-schema to Flink database

Danny, because 1) there are frequent use cases where users want to switch
databases or referencing objects across databases in a pg instance 2)
schema is an optional namespace layer in pg, it always has a default value
("public") and can be invisible to users if they'd like to as shown in the
FLIP 3) as you mentioned it is specific to postgres, and I don't feel it's
necessary to map Postgres substantially different than others DBMSs with
additional complexity

>'base_url' configuration: We are following the configuration format
guideline [1] which suggest to use dash (-) instead of underline (_). And
I'm a little confused the meaning of "base_url" at the first glance,
another idea is split it into several configurations: 'driver', 'hostname',
'port'.

Jark, I agreed we should use "base-url" in yaml config.

I'm not sure about having hostname and port separately because you can
specify multiple hosts with ports in jdbc, like
"jdbc:dbms/host1:port1,host2:port2/", for connection failovers. Separating
them would make configurations harder.

I will add clear doc and example to avoid any possible confusion.

> 'default-database' is optional, then which database will be used or what
is the behavior when the default database is not selected.

This should be DBMS specific. For postgres, it will be the <access id>
database.


On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao <[hidden email]> wrote:

> Hi Bowen, Thanks for driving this.
> I think it would be very convenience to use tables in external DBs with
> JDBC Catalog.
>
> I have one concern about "Flink-Postgres Data Type Mapping" part:
>
> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant
> semantic,
> and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li <[hidden email]>
> wrote:
>
> > Hi Bowen, thanks for reply and updating.
> >
> > > I don't see much value in providing a builder for jdbc catalogs, as
> they
> > only have 4 or 5 required params, no optional ones. I prefer users just
> > provide a base url without default db, usrname, pwd so we don't need to
> > parse url all around, as I mentioned jdbc catalog may need to establish
> > connections to different databases in a db instance,
> >
> > I suggest that the parameters can be completely consistent with the
> > JDBCTableSource / JDBCTableSink.
> > If you take a look to JDBC api: "DriverManager.getConnection".
> > That allow "default db, username, pwd" things optional. They can included
> > in URL. Of course JDBC api also allows establishing connections to
> > different databases in a db instance.
> > So I think we don't need provide a "base_url", we can just provide a real
> > "url".
> > To be consistent with JDBC api.
> >
> > Best,
> > Jingsong Lee
> >
> > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu <[hidden email]> wrote:
> >
> > > Thanks Bowen for the reply,
> > >
> > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good to
> me.
> > >
> > > I have some other minor comments when I went through the updated
> > > documentation:
> > >
> > > 1) 'base_url' configuration: We are following the configuration format
> > > guideline [1] which suggest to use dash (-) instead of underline (_).
> > >      And I'm a little confused the meaning of "base_url" at the first
> > > glance, another idea is split it into several configurations: 'driver',
> > > 'hostname', 'port'.
> > >
> > > 2) 'default-database' is optional, then which database will be used or
> > what
> > > is the behavior when the default database is not selected.
> > >
> > > 3) a builder for jdbc catalogs: I agree with Jingsong to provide a
> > builder.
> > > Because there is optional configuration here (the default database),
> > >    and providind Builder as the API will be easier for evolution, I'm
> not
> > > sure we won't add/modify parameters in the future.
> > >
> > > [1]:
> > >
> > >
> >
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> > >
> > > On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]> wrote:
> > >
> > > > Hi Jark and Jingsong,
> > > >
> > > > Thanks for your review. Please see my reply in line.
> > > >
> > > > > why introducing a `PostgresJDBCCatalog`, not a generic
> `JDBCCatalog`
> > > > (catalog.type = 'postgres' vs 'jdbc') ?
> > > >
> > > > Thanks for the reminding and I looked at JDBCDialect. A generic,
> > > > user-facing JDBCCatalog with catalog.type = jdbc and find specific db
> > > > implementations (pg v.s. mysql v.s. ...) is more aligned with how
> jdbc
> > > > sink/source is handled, indeed. However, the catalogs would also need
> > to
> > > > execute the query and parse query results in a db-dependent way. E.g.
> > > jdbc
> > > > catalog needs to establish connections to different databases within
> a
> > db
> > > > instance on demand. So just having JDBCDialect won't be enough.
> > > >
> > > > I think we can do the following:
> > > >   - provide a user-facing JDBCCatalog, composing a db-specific impl
> > like
> > > > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify "jdbc"
> as
> > > > type in both Table API and SQL CLI, internally it will create a
> > > db-specific
> > > > impl depending on jdbc base url.
> > > >   - some statements can reside in JDBCDialect. Query execution and
> > result
> > > > parsing logic would be located in db-specific impls.
> > > >
> > > > - We can provide a Builder for Catalog, In my opinion,
> defaultDatabase,
> > > > username, pwd can be included in JDBC DB url.
> > > >
> > > > I don't see much value in providing a builder for jdbc catalogs, as
> > they
> > > > only have 4 or 5 required params, no optional ones. I prefer users
> just
> > > > provide a base url without default db, usrname, pwd so we don't need
> to
> > > > parse url all around, as I mentioned jdbc catalog may need to
> establish
> > > > connections to different databases in a db instance,
> > > >
> > > > - About timestamp and time, write down the specific Flink precision
> of
> > > > Postgres?
> > > >
> > > > I've documented that. It's 0-6
> > > >
> > > > - I think there is a part missing in your document, that is how to
> use
> > > this
> > > > catalog. If you can write a complete example, I think it will be much
> > > > clearer.
> > > >
> > > > I added some examples in both table api and SQL Cli. It will be no
> > > > different from existing catalogs.
> > > >
> > > > - So a thing is what TableFactory will this catalog use? For example,
> > > > JDBCTableSourceSinkFactory has different parameters for source or
> sink?
> > > How
> > > > do you think about it?
> > > >
> > > > This catalog will directly call JDBCTableSourceSinkFactory without
> > going
> > > > thru service discovery because we are sure it's a jdbc table. I added
> > it
> > > to
> > > > the doc.
> > > >
> > > > For the different params besides schema, as we discussed offline,
> > > > unfortunately we can't do anything right now until Flink DDL/DML are
> > able
> > > > to distinguish 3 types of params - external data's metada,
> source/sink
> > > > runtime params, and Flink semantics params. The latter two can't be
> > > > provided by catalogs. The problem is actually general to all
> catalogs,
> > > not
> > > > just JDBCCatalog. I'm pushing for such an effort to solve it. At this
> > > > moment we can only use some default params for some cases, and the
> > other
> > > > cases cannot take advantage of the JDBC catalog and users still have
> to
> > > > write DDL manually.
> > > >
> > > > Thanks,
> > > > Bowen
> > > >
> > > > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <[hidden email]>
> > > wrote:
> > > >
> > > > > Thanks Bowen for driving this,
> > > > >
> > > > > +1 for this, The DDL schema definition is a headache for users, and
> > > > catalog
> > > > > is a solution to this problem.
> > > > >
> > > > > I have some questions and suggestions:
> > > > >
> > > > > - We can provide a Builder for Catalog, In my opinion,
> > defaultDatabase,
> > > > > username, pwd can be included in JDBC DB url.
> > > > >
> > > > > - About timestamp and time, write down the specific Flink precision
> > of
> > > > > Postgres?
> > > > >
> > > > > - I think there is a part missing in your document, that is how to
> > use
> > > > this
> > > > > catalog. If you can write a complete example, I think it will be
> much
> > > > > clearer.
> > > > >
> > > > > - So a thing is what TableFactory will this catalog use? For
> example,
> > > > > JDBCTableSourceSinkFactory has different parameters for source or
> > sink?
> > > > How
> > > > > do you think about it?
> > > > >
> > > > > Best,
> > > > > Jingsong Lee
> > > > >
> > > > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]> wrote:
> > > > >
> > > > > > Thanks Bowen for driving this.
> > > > > >
> > > > > > +1 to this feature.
> > > > > >
> > > > > > My concern is that why introducing a `PostgresJDBCCatalog`, not a
> > > > generic
> > > > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> > > > > > From my understanding, JDBC catalog is similar to JDBC
> source/sink.
> > > For
> > > > > > JDBC source/sink, we have a generic
> > > > > > implementation for JDBC and delegate operations to JDBCDialect.
> > > > Different
> > > > > > driver may have different implementation of
> > > > > > JDBCDialect, e.g `quoteIdentifier()`.
> > > > > >
> > > > > > For JDBC catalog, I guess maybe we can do it in the same way,
> i.e.
> > a
> > > > > > generic JDBCCatalog implementation and delegate
> > > > > > operations to JDBCDialect, and we will have `listDataBase()`,
> > > > > > `listTables()` interfaces in JDBCDialect. The benefit is that:
> > > > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also
> need
> > to
> > > > > quote
> > > > > > identifiers.
> > > > > > 1) we can easily to support a new database catalog (e.g. mysql)
> by
> > > > > > implementing new dialects (e.g. MySQLDialect).
> > > > > > 2) this can keep the same behavior as JDBC source/sink, i.e.
> > > > > > connector.type=jdbc, catalog.type=jdbc
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > >
> > > > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]>
> wrote:
> > > > > >
> > > > > > > Hi dev,
> > > > > > >
> > > > > > > I'd like to kick off a discussion on adding JDBC catalogs,
> > > > specifically
> > > > > > > Postgres catalog in Flink [1].
> > > > > > >
> > > > > > > Currently users have to manually create schemas in Flink
> > > source/sink
> > > > > > > mirroring tables in their relational databases in use cases
> like
> > > JDBC
> > > > > > > read/write and consuming CDC. Many users have complaint about
> the
> > > > > > > unnecessary, redundant, manual work. Any mismatch can lead to a
> > > > failing
> > > > > > > Flink job at runtime instead of compile time. All these have
> been
> > > > quite
> > > > > > > unpleasant, resulting in a broken user experience.
> > > > > > >
> > > > > > > We want to provide a JDBC catalog interface and a Postgres
> > > > > implementation
> > > > > > > for Flink as a start to connect to all kinds of relational
> > > databases,
> > > > > > > enabling Flink SQL to 1) retrieve table schema automatically
> > > without
> > > > > > > requiring user writes duped DDL 2) check at compile time for
> > schema
> > > > > > errors.
> > > > > > > It will greatly streamline user experiences when using Flink to
> > > deal
> > > > > with
> > > > > > > popular relational databases like Postgres, MySQL, MariaDB, AWS
> > > > Aurora,
> > > > > > > etc.
> > > > > > >
> > > > > > > Note that the problem and solution are actually very general to
> > > Flink
> > > > > > when
> > > > > > > connecting to all kinds of external systems. We just focus on
> > > solving
> > > > > > that
> > > > > > > for relational databases in this FLIP.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bowen
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best, Jingsong Lee
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

bowen.li
Hi devs,

I've updated the wiki according to feedbacks. Please take another look.

Thanks!


On Fri, Jan 10, 2020 at 2:24 PM Bowen Li <[hidden email]> wrote:

> Thanks everyone for the prompt feedback. Please see my response below.
>
> > In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
> java.time.Instant semantic, and should be mapped to Flink's TIME/TIMESTAMP
> WITH LOCAL TIME ZONE
>
> Zhenghua, you are right that pg's 'timestamp with timezone' should be
> translated into flink's 'timestamp with local timezone'. I don't find 'time
> with (local) timezone' though, so we may not support that type from pg in
> Flink.
>
> > I suggest that the parameters can be completely consistent with the
> JDBCTableSource / JDBCTableSink. If you take a look to JDBC api:
> "DriverManager.getConnection".
> That allow "default db, username, pwd" things optional. They can included
> in URL. Of course JDBC api also allows establishing connections to
> different databases in a db instance. So I think we don't need provide a
> "base_url", we can just provide a real "url". To be consistent with JDBC
> api.
>
> Jingsong, what I'm saying is a builder can be added on demand later if
> there's enough user requesting it, and doesn't need to be a core part of
> the FLIP.
>
> Besides, unfortunately Postgres doesn't allow changing databases via JDBC.
>
> JDBC provides different connecting options as you mentioned, but I'd like
> to keep our design and API simple and having to handle extra parsing logic.
> And it doesn't shut the door for what you proposed as a future effort.
>
> > Since the PostgreSQL does not have catalog but schema under database,
> why not mapping the PG-database to Flink catalog and PG-schema to Flink
> database
>
> Danny, because 1) there are frequent use cases where users want to switch
> databases or referencing objects across databases in a pg instance 2)
> schema is an optional namespace layer in pg, it always has a default value
> ("public") and can be invisible to users if they'd like to as shown in the
> FLIP 3) as you mentioned it is specific to postgres, and I don't feel it's
> necessary to map Postgres substantially different than others DBMSs with
> additional complexity
>
> >'base_url' configuration: We are following the configuration format
> guideline [1] which suggest to use dash (-) instead of underline (_). And
> I'm a little confused the meaning of "base_url" at the first glance,
> another idea is split it into several configurations: 'driver', 'hostname',
> 'port'.
>
> Jark, I agreed we should use "base-url" in yaml config.
>
> I'm not sure about having hostname and port separately because you can
> specify multiple hosts with ports in jdbc, like
> "jdbc:dbms/host1:port1,host2:port2/", for connection failovers. Separating
> them would make configurations harder.
>
> I will add clear doc and example to avoid any possible confusion.
>
> > 'default-database' is optional, then which database will be used or what
> is the behavior when the default database is not selected.
>
> This should be DBMS specific. For postgres, it will be the <access id>
> database.
>
>
> On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao <[hidden email]> wrote:
>
>> Hi Bowen, Thanks for driving this.
>> I think it would be very convenience to use tables in external DBs with
>> JDBC Catalog.
>>
>> I have one concern about "Flink-Postgres Data Type Mapping" part:
>>
>> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the java.time.Instant
>> semantic,
>> and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li <[hidden email]>
>> wrote:
>>
>> > Hi Bowen, thanks for reply and updating.
>> >
>> > > I don't see much value in providing a builder for jdbc catalogs, as
>> they
>> > only have 4 or 5 required params, no optional ones. I prefer users just
>> > provide a base url without default db, usrname, pwd so we don't need to
>> > parse url all around, as I mentioned jdbc catalog may need to establish
>> > connections to different databases in a db instance,
>> >
>> > I suggest that the parameters can be completely consistent with the
>> > JDBCTableSource / JDBCTableSink.
>> > If you take a look to JDBC api: "DriverManager.getConnection".
>> > That allow "default db, username, pwd" things optional. They can
>> included
>> > in URL. Of course JDBC api also allows establishing connections to
>> > different databases in a db instance.
>> > So I think we don't need provide a "base_url", we can just provide a
>> real
>> > "url".
>> > To be consistent with JDBC api.
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu <[hidden email]> wrote:
>> >
>> > > Thanks Bowen for the reply,
>> > >
>> > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good to
>> me.
>> > >
>> > > I have some other minor comments when I went through the updated
>> > > documentation:
>> > >
>> > > 1) 'base_url' configuration: We are following the configuration format
>> > > guideline [1] which suggest to use dash (-) instead of underline (_).
>> > >      And I'm a little confused the meaning of "base_url" at the first
>> > > glance, another idea is split it into several configurations:
>> 'driver',
>> > > 'hostname', 'port'.
>> > >
>> > > 2) 'default-database' is optional, then which database will be used or
>> > what
>> > > is the behavior when the default database is not selected.
>> > >
>> > > 3) a builder for jdbc catalogs: I agree with Jingsong to provide a
>> > builder.
>> > > Because there is optional configuration here (the default database),
>> > >    and providind Builder as the API will be easier for evolution, I'm
>> not
>> > > sure we won't add/modify parameters in the future.
>> > >
>> > > [1]:
>> > >
>> > >
>> >
>> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
>> > >
>> > > On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]> wrote:
>> > >
>> > > > Hi Jark and Jingsong,
>> > > >
>> > > > Thanks for your review. Please see my reply in line.
>> > > >
>> > > > > why introducing a `PostgresJDBCCatalog`, not a generic
>> `JDBCCatalog`
>> > > > (catalog.type = 'postgres' vs 'jdbc') ?
>> > > >
>> > > > Thanks for the reminding and I looked at JDBCDialect. A generic,
>> > > > user-facing JDBCCatalog with catalog.type = jdbc and find specific
>> db
>> > > > implementations (pg v.s. mysql v.s. ...) is more aligned with how
>> jdbc
>> > > > sink/source is handled, indeed. However, the catalogs would also
>> need
>> > to
>> > > > execute the query and parse query results in a db-dependent way.
>> E.g.
>> > > jdbc
>> > > > catalog needs to establish connections to different databases
>> within a
>> > db
>> > > > instance on demand. So just having JDBCDialect won't be enough.
>> > > >
>> > > > I think we can do the following:
>> > > >   - provide a user-facing JDBCCatalog, composing a db-specific impl
>> > like
>> > > > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify
>> "jdbc" as
>> > > > type in both Table API and SQL CLI, internally it will create a
>> > > db-specific
>> > > > impl depending on jdbc base url.
>> > > >   - some statements can reside in JDBCDialect. Query execution and
>> > result
>> > > > parsing logic would be located in db-specific impls.
>> > > >
>> > > > - We can provide a Builder for Catalog, In my opinion,
>> defaultDatabase,
>> > > > username, pwd can be included in JDBC DB url.
>> > > >
>> > > > I don't see much value in providing a builder for jdbc catalogs, as
>> > they
>> > > > only have 4 or 5 required params, no optional ones. I prefer users
>> just
>> > > > provide a base url without default db, usrname, pwd so we don't
>> need to
>> > > > parse url all around, as I mentioned jdbc catalog may need to
>> establish
>> > > > connections to different databases in a db instance,
>> > > >
>> > > > - About timestamp and time, write down the specific Flink precision
>> of
>> > > > Postgres?
>> > > >
>> > > > I've documented that. It's 0-6
>> > > >
>> > > > - I think there is a part missing in your document, that is how to
>> use
>> > > this
>> > > > catalog. If you can write a complete example, I think it will be
>> much
>> > > > clearer.
>> > > >
>> > > > I added some examples in both table api and SQL Cli. It will be no
>> > > > different from existing catalogs.
>> > > >
>> > > > - So a thing is what TableFactory will this catalog use? For
>> example,
>> > > > JDBCTableSourceSinkFactory has different parameters for source or
>> sink?
>> > > How
>> > > > do you think about it?
>> > > >
>> > > > This catalog will directly call JDBCTableSourceSinkFactory without
>> > going
>> > > > thru service discovery because we are sure it's a jdbc table. I
>> added
>> > it
>> > > to
>> > > > the doc.
>> > > >
>> > > > For the different params besides schema, as we discussed offline,
>> > > > unfortunately we can't do anything right now until Flink DDL/DML are
>> > able
>> > > > to distinguish 3 types of params - external data's metada,
>> source/sink
>> > > > runtime params, and Flink semantics params. The latter two can't be
>> > > > provided by catalogs. The problem is actually general to all
>> catalogs,
>> > > not
>> > > > just JDBCCatalog. I'm pushing for such an effort to solve it. At
>> this
>> > > > moment we can only use some default params for some cases, and the
>> > other
>> > > > cases cannot take advantage of the JDBC catalog and users still
>> have to
>> > > > write DDL manually.
>> > > >
>> > > > Thanks,
>> > > > Bowen
>> > > >
>> > > > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <[hidden email]>
>> > > wrote:
>> > > >
>> > > > > Thanks Bowen for driving this,
>> > > > >
>> > > > > +1 for this, The DDL schema definition is a headache for users,
>> and
>> > > > catalog
>> > > > > is a solution to this problem.
>> > > > >
>> > > > > I have some questions and suggestions:
>> > > > >
>> > > > > - We can provide a Builder for Catalog, In my opinion,
>> > defaultDatabase,
>> > > > > username, pwd can be included in JDBC DB url.
>> > > > >
>> > > > > - About timestamp and time, write down the specific Flink
>> precision
>> > of
>> > > > > Postgres?
>> > > > >
>> > > > > - I think there is a part missing in your document, that is how to
>> > use
>> > > > this
>> > > > > catalog. If you can write a complete example, I think it will be
>> much
>> > > > > clearer.
>> > > > >
>> > > > > - So a thing is what TableFactory will this catalog use? For
>> example,
>> > > > > JDBCTableSourceSinkFactory has different parameters for source or
>> > sink?
>> > > > How
>> > > > > do you think about it?
>> > > > >
>> > > > > Best,
>> > > > > Jingsong Lee
>> > > > >
>> > > > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]> wrote:
>> > > > >
>> > > > > > Thanks Bowen for driving this.
>> > > > > >
>> > > > > > +1 to this feature.
>> > > > > >
>> > > > > > My concern is that why introducing a `PostgresJDBCCatalog`, not
>> a
>> > > > generic
>> > > > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
>> > > > > > From my understanding, JDBC catalog is similar to JDBC
>> source/sink.
>> > > For
>> > > > > > JDBC source/sink, we have a generic
>> > > > > > implementation for JDBC and delegate operations to JDBCDialect.
>> > > > Different
>> > > > > > driver may have different implementation of
>> > > > > > JDBCDialect, e.g `quoteIdentifier()`.
>> > > > > >
>> > > > > > For JDBC catalog, I guess maybe we can do it in the same way,
>> i.e.
>> > a
>> > > > > > generic JDBCCatalog implementation and delegate
>> > > > > > operations to JDBCDialect, and we will have `listDataBase()`,
>> > > > > > `listTables()` interfaces in JDBCDialect. The benefit is that:
>> > > > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also
>> need
>> > to
>> > > > > quote
>> > > > > > identifiers.
>> > > > > > 1) we can easily to support a new database catalog (e.g. mysql)
>> by
>> > > > > > implementing new dialects (e.g. MySQLDialect).
>> > > > > > 2) this can keep the same behavior as JDBC source/sink, i.e.
>> > > > > > connector.type=jdbc, catalog.type=jdbc
>> > > > > >
>> > > > > > Best,
>> > > > > > Jark
>> > > > > >
>> > > > > >
>> > > > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]>
>> wrote:
>> > > > > >
>> > > > > > > Hi dev,
>> > > > > > >
>> > > > > > > I'd like to kick off a discussion on adding JDBC catalogs,
>> > > > specifically
>> > > > > > > Postgres catalog in Flink [1].
>> > > > > > >
>> > > > > > > Currently users have to manually create schemas in Flink
>> > > source/sink
>> > > > > > > mirroring tables in their relational databases in use cases
>> like
>> > > JDBC
>> > > > > > > read/write and consuming CDC. Many users have complaint about
>> the
>> > > > > > > unnecessary, redundant, manual work. Any mismatch can lead to
>> a
>> > > > failing
>> > > > > > > Flink job at runtime instead of compile time. All these have
>> been
>> > > > quite
>> > > > > > > unpleasant, resulting in a broken user experience.
>> > > > > > >
>> > > > > > > We want to provide a JDBC catalog interface and a Postgres
>> > > > > implementation
>> > > > > > > for Flink as a start to connect to all kinds of relational
>> > > databases,
>> > > > > > > enabling Flink SQL to 1) retrieve table schema automatically
>> > > without
>> > > > > > > requiring user writes duped DDL 2) check at compile time for
>> > schema
>> > > > > > errors.
>> > > > > > > It will greatly streamline user experiences when using Flink
>> to
>> > > deal
>> > > > > with
>> > > > > > > popular relational databases like Postgres, MySQL, MariaDB,
>> AWS
>> > > > Aurora,
>> > > > > > > etc.
>> > > > > > >
>> > > > > > > Note that the problem and solution are actually very general
>> to
>> > > Flink
>> > > > > > when
>> > > > > > > connecting to all kinds of external systems. We just focus on
>> > > solving
>> > > > > > that
>> > > > > > > for relational databases in this FLIP.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Bowen
>> > > > > > >
>> > > > > > > [1]
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > Best, Jingsong Lee
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > --
>> > Best, Jingsong Lee
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

Flavio Pompermaier
Hi all,
I'm happy to see a lot of interest in easing the integration with JDBC data
sources. Maybe this could be a rare situation (not in my experience
however..) but what if I have to connect to the same type of source (e.g.
Mysql) with 2 incompatible version...? How can I load the 2 (or more)
connectors jars without causing conflicts?

Il Mar 14 Gen 2020, 23:32 Bowen Li <[hidden email]> ha scritto:

> Hi devs,
>
> I've updated the wiki according to feedbacks. Please take another look.
>
> Thanks!
>
>
> On Fri, Jan 10, 2020 at 2:24 PM Bowen Li <[hidden email]> wrote:
>
> > Thanks everyone for the prompt feedback. Please see my response below.
> >
> > > In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
> > java.time.Instant semantic, and should be mapped to Flink's
> TIME/TIMESTAMP
> > WITH LOCAL TIME ZONE
> >
> > Zhenghua, you are right that pg's 'timestamp with timezone' should be
> > translated into flink's 'timestamp with local timezone'. I don't find
> 'time
> > with (local) timezone' though, so we may not support that type from pg in
> > Flink.
> >
> > > I suggest that the parameters can be completely consistent with the
> > JDBCTableSource / JDBCTableSink. If you take a look to JDBC api:
> > "DriverManager.getConnection".
> > That allow "default db, username, pwd" things optional. They can included
> > in URL. Of course JDBC api also allows establishing connections to
> > different databases in a db instance. So I think we don't need provide a
> > "base_url", we can just provide a real "url". To be consistent with JDBC
> > api.
> >
> > Jingsong, what I'm saying is a builder can be added on demand later if
> > there's enough user requesting it, and doesn't need to be a core part of
> > the FLIP.
> >
> > Besides, unfortunately Postgres doesn't allow changing databases via
> JDBC.
> >
> > JDBC provides different connecting options as you mentioned, but I'd like
> > to keep our design and API simple and having to handle extra parsing
> logic.
> > And it doesn't shut the door for what you proposed as a future effort.
> >
> > > Since the PostgreSQL does not have catalog but schema under database,
> > why not mapping the PG-database to Flink catalog and PG-schema to Flink
> > database
> >
> > Danny, because 1) there are frequent use cases where users want to switch
> > databases or referencing objects across databases in a pg instance 2)
> > schema is an optional namespace layer in pg, it always has a default
> value
> > ("public") and can be invisible to users if they'd like to as shown in
> the
> > FLIP 3) as you mentioned it is specific to postgres, and I don't feel
> it's
> > necessary to map Postgres substantially different than others DBMSs with
> > additional complexity
> >
> > >'base_url' configuration: We are following the configuration format
> > guideline [1] which suggest to use dash (-) instead of underline (_). And
> > I'm a little confused the meaning of "base_url" at the first glance,
> > another idea is split it into several configurations: 'driver',
> 'hostname',
> > 'port'.
> >
> > Jark, I agreed we should use "base-url" in yaml config.
> >
> > I'm not sure about having hostname and port separately because you can
> > specify multiple hosts with ports in jdbc, like
> > "jdbc:dbms/host1:port1,host2:port2/", for connection failovers.
> Separating
> > them would make configurations harder.
> >
> > I will add clear doc and example to avoid any possible confusion.
> >
> > > 'default-database' is optional, then which database will be used or
> what
> > is the behavior when the default database is not selected.
> >
> > This should be DBMS specific. For postgres, it will be the <access id>
> > database.
> >
> >
> > On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao <[hidden email]> wrote:
> >
> >> Hi Bowen, Thanks for driving this.
> >> I think it would be very convenience to use tables in external DBs with
> >> JDBC Catalog.
> >>
> >> I have one concern about "Flink-Postgres Data Type Mapping" part:
> >>
> >> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
> java.time.Instant
> >> semantic,
> >> and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE
> >>
> >> *Best Regards,*
> >> *Zhenghua Gao*
> >>
> >>
> >> On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li <[hidden email]>
> >> wrote:
> >>
> >> > Hi Bowen, thanks for reply and updating.
> >> >
> >> > > I don't see much value in providing a builder for jdbc catalogs, as
> >> they
> >> > only have 4 or 5 required params, no optional ones. I prefer users
> just
> >> > provide a base url without default db, usrname, pwd so we don't need
> to
> >> > parse url all around, as I mentioned jdbc catalog may need to
> establish
> >> > connections to different databases in a db instance,
> >> >
> >> > I suggest that the parameters can be completely consistent with the
> >> > JDBCTableSource / JDBCTableSink.
> >> > If you take a look to JDBC api: "DriverManager.getConnection".
> >> > That allow "default db, username, pwd" things optional. They can
> >> included
> >> > in URL. Of course JDBC api also allows establishing connections to
> >> > different databases in a db instance.
> >> > So I think we don't need provide a "base_url", we can just provide a
> >> real
> >> > "url".
> >> > To be consistent with JDBC api.
> >> >
> >> > Best,
> >> > Jingsong Lee
> >> >
> >> > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu <[hidden email]> wrote:
> >> >
> >> > > Thanks Bowen for the reply,
> >> > >
> >> > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good
> to
> >> me.
> >> > >
> >> > > I have some other minor comments when I went through the updated
> >> > > documentation:
> >> > >
> >> > > 1) 'base_url' configuration: We are following the configuration
> format
> >> > > guideline [1] which suggest to use dash (-) instead of underline
> (_).
> >> > >      And I'm a little confused the meaning of "base_url" at the
> first
> >> > > glance, another idea is split it into several configurations:
> >> 'driver',
> >> > > 'hostname', 'port'.
> >> > >
> >> > > 2) 'default-database' is optional, then which database will be used
> or
> >> > what
> >> > > is the behavior when the default database is not selected.
> >> > >
> >> > > 3) a builder for jdbc catalogs: I agree with Jingsong to provide a
> >> > builder.
> >> > > Because there is optional configuration here (the default database),
> >> > >    and providind Builder as the API will be easier for evolution,
> I'm
> >> not
> >> > > sure we won't add/modify parameters in the future.
> >> > >
> >> > > [1]:
> >> > >
> >> > >
> >> >
> >>
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> >> > >
> >> > > On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]> wrote:
> >> > >
> >> > > > Hi Jark and Jingsong,
> >> > > >
> >> > > > Thanks for your review. Please see my reply in line.
> >> > > >
> >> > > > > why introducing a `PostgresJDBCCatalog`, not a generic
> >> `JDBCCatalog`
> >> > > > (catalog.type = 'postgres' vs 'jdbc') ?
> >> > > >
> >> > > > Thanks for the reminding and I looked at JDBCDialect. A generic,
> >> > > > user-facing JDBCCatalog with catalog.type = jdbc and find specific
> >> db
> >> > > > implementations (pg v.s. mysql v.s. ...) is more aligned with how
> >> jdbc
> >> > > > sink/source is handled, indeed. However, the catalogs would also
> >> need
> >> > to
> >> > > > execute the query and parse query results in a db-dependent way.
> >> E.g.
> >> > > jdbc
> >> > > > catalog needs to establish connections to different databases
> >> within a
> >> > db
> >> > > > instance on demand. So just having JDBCDialect won't be enough.
> >> > > >
> >> > > > I think we can do the following:
> >> > > >   - provide a user-facing JDBCCatalog, composing a db-specific
> impl
> >> > like
> >> > > > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify
> >> "jdbc" as
> >> > > > type in both Table API and SQL CLI, internally it will create a
> >> > > db-specific
> >> > > > impl depending on jdbc base url.
> >> > > >   - some statements can reside in JDBCDialect. Query execution and
> >> > result
> >> > > > parsing logic would be located in db-specific impls.
> >> > > >
> >> > > > - We can provide a Builder for Catalog, In my opinion,
> >> defaultDatabase,
> >> > > > username, pwd can be included in JDBC DB url.
> >> > > >
> >> > > > I don't see much value in providing a builder for jdbc catalogs,
> as
> >> > they
> >> > > > only have 4 or 5 required params, no optional ones. I prefer users
> >> just
> >> > > > provide a base url without default db, usrname, pwd so we don't
> >> need to
> >> > > > parse url all around, as I mentioned jdbc catalog may need to
> >> establish
> >> > > > connections to different databases in a db instance,
> >> > > >
> >> > > > - About timestamp and time, write down the specific Flink
> precision
> >> of
> >> > > > Postgres?
> >> > > >
> >> > > > I've documented that. It's 0-6
> >> > > >
> >> > > > - I think there is a part missing in your document, that is how to
> >> use
> >> > > this
> >> > > > catalog. If you can write a complete example, I think it will be
> >> much
> >> > > > clearer.
> >> > > >
> >> > > > I added some examples in both table api and SQL Cli. It will be no
> >> > > > different from existing catalogs.
> >> > > >
> >> > > > - So a thing is what TableFactory will this catalog use? For
> >> example,
> >> > > > JDBCTableSourceSinkFactory has different parameters for source or
> >> sink?
> >> > > How
> >> > > > do you think about it?
> >> > > >
> >> > > > This catalog will directly call JDBCTableSourceSinkFactory without
> >> > going
> >> > > > thru service discovery because we are sure it's a jdbc table. I
> >> added
> >> > it
> >> > > to
> >> > > > the doc.
> >> > > >
> >> > > > For the different params besides schema, as we discussed offline,
> >> > > > unfortunately we can't do anything right now until Flink DDL/DML
> are
> >> > able
> >> > > > to distinguish 3 types of params - external data's metada,
> >> source/sink
> >> > > > runtime params, and Flink semantics params. The latter two can't
> be
> >> > > > provided by catalogs. The problem is actually general to all
> >> catalogs,
> >> > > not
> >> > > > just JDBCCatalog. I'm pushing for such an effort to solve it. At
> >> this
> >> > > > moment we can only use some default params for some cases, and the
> >> > other
> >> > > > cases cannot take advantage of the JDBC catalog and users still
> >> have to
> >> > > > write DDL manually.
> >> > > >
> >> > > > Thanks,
> >> > > > Bowen
> >> > > >
> >> > > > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <
> [hidden email]>
> >> > > wrote:
> >> > > >
> >> > > > > Thanks Bowen for driving this,
> >> > > > >
> >> > > > > +1 for this, The DDL schema definition is a headache for users,
> >> and
> >> > > > catalog
> >> > > > > is a solution to this problem.
> >> > > > >
> >> > > > > I have some questions and suggestions:
> >> > > > >
> >> > > > > - We can provide a Builder for Catalog, In my opinion,
> >> > defaultDatabase,
> >> > > > > username, pwd can be included in JDBC DB url.
> >> > > > >
> >> > > > > - About timestamp and time, write down the specific Flink
> >> precision
> >> > of
> >> > > > > Postgres?
> >> > > > >
> >> > > > > - I think there is a part missing in your document, that is how
> to
> >> > use
> >> > > > this
> >> > > > > catalog. If you can write a complete example, I think it will be
> >> much
> >> > > > > clearer.
> >> > > > >
> >> > > > > - So a thing is what TableFactory will this catalog use? For
> >> example,
> >> > > > > JDBCTableSourceSinkFactory has different parameters for source
> or
> >> > sink?
> >> > > > How
> >> > > > > do you think about it?
> >> > > > >
> >> > > > > Best,
> >> > > > > Jingsong Lee
> >> > > > >
> >> > > > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]>
> wrote:
> >> > > > >
> >> > > > > > Thanks Bowen for driving this.
> >> > > > > >
> >> > > > > > +1 to this feature.
> >> > > > > >
> >> > > > > > My concern is that why introducing a `PostgresJDBCCatalog`,
> not
> >> a
> >> > > > generic
> >> > > > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> >> > > > > > From my understanding, JDBC catalog is similar to JDBC
> >> source/sink.
> >> > > For
> >> > > > > > JDBC source/sink, we have a generic
> >> > > > > > implementation for JDBC and delegate operations to
> JDBCDialect.
> >> > > > Different
> >> > > > > > driver may have different implementation of
> >> > > > > > JDBCDialect, e.g `quoteIdentifier()`.
> >> > > > > >
> >> > > > > > For JDBC catalog, I guess maybe we can do it in the same way,
> >> i.e.
> >> > a
> >> > > > > > generic JDBCCatalog implementation and delegate
> >> > > > > > operations to JDBCDialect, and we will have `listDataBase()`,
> >> > > > > > `listTables()` interfaces in JDBCDialect. The benefit is that:
> >> > > > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog also
> >> need
> >> > to
> >> > > > > quote
> >> > > > > > identifiers.
> >> > > > > > 1) we can easily to support a new database catalog (e.g.
> mysql)
> >> by
> >> > > > > > implementing new dialects (e.g. MySQLDialect).
> >> > > > > > 2) this can keep the same behavior as JDBC source/sink, i.e.
> >> > > > > > connector.type=jdbc, catalog.type=jdbc
> >> > > > > >
> >> > > > > > Best,
> >> > > > > > Jark
> >> > > > > >
> >> > > > > >
> >> > > > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]>
> >> wrote:
> >> > > > > >
> >> > > > > > > Hi dev,
> >> > > > > > >
> >> > > > > > > I'd like to kick off a discussion on adding JDBC catalogs,
> >> > > > specifically
> >> > > > > > > Postgres catalog in Flink [1].
> >> > > > > > >
> >> > > > > > > Currently users have to manually create schemas in Flink
> >> > > source/sink
> >> > > > > > > mirroring tables in their relational databases in use cases
> >> like
> >> > > JDBC
> >> > > > > > > read/write and consuming CDC. Many users have complaint
> about
> >> the
> >> > > > > > > unnecessary, redundant, manual work. Any mismatch can lead
> to
> >> a
> >> > > > failing
> >> > > > > > > Flink job at runtime instead of compile time. All these have
> >> been
> >> > > > quite
> >> > > > > > > unpleasant, resulting in a broken user experience.
> >> > > > > > >
> >> > > > > > > We want to provide a JDBC catalog interface and a Postgres
> >> > > > > implementation
> >> > > > > > > for Flink as a start to connect to all kinds of relational
> >> > > databases,
> >> > > > > > > enabling Flink SQL to 1) retrieve table schema automatically
> >> > > without
> >> > > > > > > requiring user writes duped DDL 2) check at compile time for
> >> > schema
> >> > > > > > errors.
> >> > > > > > > It will greatly streamline user experiences when using Flink
> >> to
> >> > > deal
> >> > > > > with
> >> > > > > > > popular relational databases like Postgres, MySQL, MariaDB,
> >> AWS
> >> > > > Aurora,
> >> > > > > > > etc.
> >> > > > > > >
> >> > > > > > > Note that the problem and solution are actually very general
> >> to
> >> > > Flink
> >> > > > > > when
> >> > > > > > > connecting to all kinds of external systems. We just focus
> on
> >> > > solving
> >> > > > > > that
> >> > > > > > > for relational databases in this FLIP.
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Bowen
> >> > > > > > >
> >> > > > > > > [1]
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > Best, Jingsong Lee
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> > --
> >> > Best, Jingsong Lee
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

bowen.li
Hi Flavio,

First, this is a generic question on how flink-jdbc is set up, not specific
to jdbc catalog, thus is better to be on its own thread.

But to just quickly answer your question, you need to see where the
incompatibility is. There may be incompatibility on 1) jdbc drivers and 2)
the databases. 1) is fairly stable and back-compatible. 2) normally has
things to do with your queries, not the driver.



On Tue, Jan 21, 2020 at 3:21 PM Flavio Pompermaier <[hidden email]>
wrote:

> Hi all,
> I'm happy to see a lot of interest in easing the integration with JDBC data
> sources. Maybe this could be a rare situation (not in my experience
> however..) but what if I have to connect to the same type of source (e.g.
> Mysql) with 2 incompatible version...? How can I load the 2 (or more)
> connectors jars without causing conflicts?
>
> Il Mar 14 Gen 2020, 23:32 Bowen Li <[hidden email]> ha scritto:
>
> > Hi devs,
> >
> > I've updated the wiki according to feedbacks. Please take another look.
> >
> > Thanks!
> >
> >
> > On Fri, Jan 10, 2020 at 2:24 PM Bowen Li <[hidden email]> wrote:
> >
> > > Thanks everyone for the prompt feedback. Please see my response below.
> > >
> > > > In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
> > > java.time.Instant semantic, and should be mapped to Flink's
> > TIME/TIMESTAMP
> > > WITH LOCAL TIME ZONE
> > >
> > > Zhenghua, you are right that pg's 'timestamp with timezone' should be
> > > translated into flink's 'timestamp with local timezone'. I don't find
> > 'time
> > > with (local) timezone' though, so we may not support that type from pg
> in
> > > Flink.
> > >
> > > > I suggest that the parameters can be completely consistent with the
> > > JDBCTableSource / JDBCTableSink. If you take a look to JDBC api:
> > > "DriverManager.getConnection".
> > > That allow "default db, username, pwd" things optional. They can
> included
> > > in URL. Of course JDBC api also allows establishing connections to
> > > different databases in a db instance. So I think we don't need provide
> a
> > > "base_url", we can just provide a real "url". To be consistent with
> JDBC
> > > api.
> > >
> > > Jingsong, what I'm saying is a builder can be added on demand later if
> > > there's enough user requesting it, and doesn't need to be a core part
> of
> > > the FLIP.
> > >
> > > Besides, unfortunately Postgres doesn't allow changing databases via
> > JDBC.
> > >
> > > JDBC provides different connecting options as you mentioned, but I'd
> like
> > > to keep our design and API simple and having to handle extra parsing
> > logic.
> > > And it doesn't shut the door for what you proposed as a future effort.
> > >
> > > > Since the PostgreSQL does not have catalog but schema under database,
> > > why not mapping the PG-database to Flink catalog and PG-schema to Flink
> > > database
> > >
> > > Danny, because 1) there are frequent use cases where users want to
> switch
> > > databases or referencing objects across databases in a pg instance 2)
> > > schema is an optional namespace layer in pg, it always has a default
> > value
> > > ("public") and can be invisible to users if they'd like to as shown in
> > the
> > > FLIP 3) as you mentioned it is specific to postgres, and I don't feel
> > it's
> > > necessary to map Postgres substantially different than others DBMSs
> with
> > > additional complexity
> > >
> > > >'base_url' configuration: We are following the configuration format
> > > guideline [1] which suggest to use dash (-) instead of underline (_).
> And
> > > I'm a little confused the meaning of "base_url" at the first glance,
> > > another idea is split it into several configurations: 'driver',
> > 'hostname',
> > > 'port'.
> > >
> > > Jark, I agreed we should use "base-url" in yaml config.
> > >
> > > I'm not sure about having hostname and port separately because you can
> > > specify multiple hosts with ports in jdbc, like
> > > "jdbc:dbms/host1:port1,host2:port2/", for connection failovers.
> > Separating
> > > them would make configurations harder.
> > >
> > > I will add clear doc and example to avoid any possible confusion.
> > >
> > > > 'default-database' is optional, then which database will be used or
> > what
> > > is the behavior when the default database is not selected.
> > >
> > > This should be DBMS specific. For postgres, it will be the <access id>
> > > database.
> > >
> > >
> > > On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao <[hidden email]> wrote:
> > >
> > >> Hi Bowen, Thanks for driving this.
> > >> I think it would be very convenience to use tables in external DBs
> with
> > >> JDBC Catalog.
> > >>
> > >> I have one concern about "Flink-Postgres Data Type Mapping" part:
> > >>
> > >> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
> > java.time.Instant
> > >> semantic,
> > >> and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE
> > >>
> > >> *Best Regards,*
> > >> *Zhenghua Gao*
> > >>
> > >>
> > >> On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li <[hidden email]>
> > >> wrote:
> > >>
> > >> > Hi Bowen, thanks for reply and updating.
> > >> >
> > >> > > I don't see much value in providing a builder for jdbc catalogs,
> as
> > >> they
> > >> > only have 4 or 5 required params, no optional ones. I prefer users
> > just
> > >> > provide a base url without default db, usrname, pwd so we don't need
> > to
> > >> > parse url all around, as I mentioned jdbc catalog may need to
> > establish
> > >> > connections to different databases in a db instance,
> > >> >
> > >> > I suggest that the parameters can be completely consistent with the
> > >> > JDBCTableSource / JDBCTableSink.
> > >> > If you take a look to JDBC api: "DriverManager.getConnection".
> > >> > That allow "default db, username, pwd" things optional. They can
> > >> included
> > >> > in URL. Of course JDBC api also allows establishing connections to
> > >> > different databases in a db instance.
> > >> > So I think we don't need provide a "base_url", we can just provide a
> > >> real
> > >> > "url".
> > >> > To be consistent with JDBC api.
> > >> >
> > >> > Best,
> > >> > Jingsong Lee
> > >> >
> > >> > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu <[hidden email]> wrote:
> > >> >
> > >> > > Thanks Bowen for the reply,
> > >> > >
> > >> > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds good
> > to
> > >> me.
> > >> > >
> > >> > > I have some other minor comments when I went through the updated
> > >> > > documentation:
> > >> > >
> > >> > > 1) 'base_url' configuration: We are following the configuration
> > format
> > >> > > guideline [1] which suggest to use dash (-) instead of underline
> > (_).
> > >> > >      And I'm a little confused the meaning of "base_url" at the
> > first
> > >> > > glance, another idea is split it into several configurations:
> > >> 'driver',
> > >> > > 'hostname', 'port'.
> > >> > >
> > >> > > 2) 'default-database' is optional, then which database will be
> used
> > or
> > >> > what
> > >> > > is the behavior when the default database is not selected.
> > >> > >
> > >> > > 3) a builder for jdbc catalogs: I agree with Jingsong to provide a
> > >> > builder.
> > >> > > Because there is optional configuration here (the default
> database),
> > >> > >    and providind Builder as the API will be easier for evolution,
> > I'm
> > >> not
> > >> > > sure we won't add/modify parameters in the future.
> > >> > >
> > >> > > [1]:
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
> > >> > >
> > >> > > On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]>
> wrote:
> > >> > >
> > >> > > > Hi Jark and Jingsong,
> > >> > > >
> > >> > > > Thanks for your review. Please see my reply in line.
> > >> > > >
> > >> > > > > why introducing a `PostgresJDBCCatalog`, not a generic
> > >> `JDBCCatalog`
> > >> > > > (catalog.type = 'postgres' vs 'jdbc') ?
> > >> > > >
> > >> > > > Thanks for the reminding and I looked at JDBCDialect. A generic,
> > >> > > > user-facing JDBCCatalog with catalog.type = jdbc and find
> specific
> > >> db
> > >> > > > implementations (pg v.s. mysql v.s. ...) is more aligned with
> how
> > >> jdbc
> > >> > > > sink/source is handled, indeed. However, the catalogs would also
> > >> need
> > >> > to
> > >> > > > execute the query and parse query results in a db-dependent way.
> > >> E.g.
> > >> > > jdbc
> > >> > > > catalog needs to establish connections to different databases
> > >> within a
> > >> > db
> > >> > > > instance on demand. So just having JDBCDialect won't be enough.
> > >> > > >
> > >> > > > I think we can do the following:
> > >> > > >   - provide a user-facing JDBCCatalog, composing a db-specific
> > impl
> > >> > like
> > >> > > > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify
> > >> "jdbc" as
> > >> > > > type in both Table API and SQL CLI, internally it will create a
> > >> > > db-specific
> > >> > > > impl depending on jdbc base url.
> > >> > > >   - some statements can reside in JDBCDialect. Query execution
> and
> > >> > result
> > >> > > > parsing logic would be located in db-specific impls.
> > >> > > >
> > >> > > > - We can provide a Builder for Catalog, In my opinion,
> > >> defaultDatabase,
> > >> > > > username, pwd can be included in JDBC DB url.
> > >> > > >
> > >> > > > I don't see much value in providing a builder for jdbc catalogs,
> > as
> > >> > they
> > >> > > > only have 4 or 5 required params, no optional ones. I prefer
> users
> > >> just
> > >> > > > provide a base url without default db, usrname, pwd so we don't
> > >> need to
> > >> > > > parse url all around, as I mentioned jdbc catalog may need to
> > >> establish
> > >> > > > connections to different databases in a db instance,
> > >> > > >
> > >> > > > - About timestamp and time, write down the specific Flink
> > precision
> > >> of
> > >> > > > Postgres?
> > >> > > >
> > >> > > > I've documented that. It's 0-6
> > >> > > >
> > >> > > > - I think there is a part missing in your document, that is how
> to
> > >> use
> > >> > > this
> > >> > > > catalog. If you can write a complete example, I think it will be
> > >> much
> > >> > > > clearer.
> > >> > > >
> > >> > > > I added some examples in both table api and SQL Cli. It will be
> no
> > >> > > > different from existing catalogs.
> > >> > > >
> > >> > > > - So a thing is what TableFactory will this catalog use? For
> > >> example,
> > >> > > > JDBCTableSourceSinkFactory has different parameters for source
> or
> > >> sink?
> > >> > > How
> > >> > > > do you think about it?
> > >> > > >
> > >> > > > This catalog will directly call JDBCTableSourceSinkFactory
> without
> > >> > going
> > >> > > > thru service discovery because we are sure it's a jdbc table. I
> > >> added
> > >> > it
> > >> > > to
> > >> > > > the doc.
> > >> > > >
> > >> > > > For the different params besides schema, as we discussed
> offline,
> > >> > > > unfortunately we can't do anything right now until Flink DDL/DML
> > are
> > >> > able
> > >> > > > to distinguish 3 types of params - external data's metada,
> > >> source/sink
> > >> > > > runtime params, and Flink semantics params. The latter two can't
> > be
> > >> > > > provided by catalogs. The problem is actually general to all
> > >> catalogs,
> > >> > > not
> > >> > > > just JDBCCatalog. I'm pushing for such an effort to solve it. At
> > >> this
> > >> > > > moment we can only use some default params for some cases, and
> the
> > >> > other
> > >> > > > cases cannot take advantage of the JDBC catalog and users still
> > >> have to
> > >> > > > write DDL manually.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Bowen
> > >> > > >
> > >> > > > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <
> > [hidden email]>
> > >> > > wrote:
> > >> > > >
> > >> > > > > Thanks Bowen for driving this,
> > >> > > > >
> > >> > > > > +1 for this, The DDL schema definition is a headache for
> users,
> > >> and
> > >> > > > catalog
> > >> > > > > is a solution to this problem.
> > >> > > > >
> > >> > > > > I have some questions and suggestions:
> > >> > > > >
> > >> > > > > - We can provide a Builder for Catalog, In my opinion,
> > >> > defaultDatabase,
> > >> > > > > username, pwd can be included in JDBC DB url.
> > >> > > > >
> > >> > > > > - About timestamp and time, write down the specific Flink
> > >> precision
> > >> > of
> > >> > > > > Postgres?
> > >> > > > >
> > >> > > > > - I think there is a part missing in your document, that is
> how
> > to
> > >> > use
> > >> > > > this
> > >> > > > > catalog. If you can write a complete example, I think it will
> be
> > >> much
> > >> > > > > clearer.
> > >> > > > >
> > >> > > > > - So a thing is what TableFactory will this catalog use? For
> > >> example,
> > >> > > > > JDBCTableSourceSinkFactory has different parameters for source
> > or
> > >> > sink?
> > >> > > > How
> > >> > > > > do you think about it?
> > >> > > > >
> > >> > > > > Best,
> > >> > > > > Jingsong Lee
> > >> > > > >
> > >> > > > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]>
> > wrote:
> > >> > > > >
> > >> > > > > > Thanks Bowen for driving this.
> > >> > > > > >
> > >> > > > > > +1 to this feature.
> > >> > > > > >
> > >> > > > > > My concern is that why introducing a `PostgresJDBCCatalog`,
> > not
> > >> a
> > >> > > > generic
> > >> > > > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
> > >> > > > > > From my understanding, JDBC catalog is similar to JDBC
> > >> source/sink.
> > >> > > For
> > >> > > > > > JDBC source/sink, we have a generic
> > >> > > > > > implementation for JDBC and delegate operations to
> > JDBCDialect.
> > >> > > > Different
> > >> > > > > > driver may have different implementation of
> > >> > > > > > JDBCDialect, e.g `quoteIdentifier()`.
> > >> > > > > >
> > >> > > > > > For JDBC catalog, I guess maybe we can do it in the same
> way,
> > >> i.e.
> > >> > a
> > >> > > > > > generic JDBCCatalog implementation and delegate
> > >> > > > > > operations to JDBCDialect, and we will have
> `listDataBase()`,
> > >> > > > > > `listTables()` interfaces in JDBCDialect. The benefit is
> that:
> > >> > > > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog
> also
> > >> need
> > >> > to
> > >> > > > > quote
> > >> > > > > > identifiers.
> > >> > > > > > 1) we can easily to support a new database catalog (e.g.
> > mysql)
> > >> by
> > >> > > > > > implementing new dialects (e.g. MySQLDialect).
> > >> > > > > > 2) this can keep the same behavior as JDBC source/sink, i.e.
> > >> > > > > > connector.type=jdbc, catalog.type=jdbc
> > >> > > > > >
> > >> > > > > > Best,
> > >> > > > > > Jark
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]>
> > >> wrote:
> > >> > > > > >
> > >> > > > > > > Hi dev,
> > >> > > > > > >
> > >> > > > > > > I'd like to kick off a discussion on adding JDBC catalogs,
> > >> > > > specifically
> > >> > > > > > > Postgres catalog in Flink [1].
> > >> > > > > > >
> > >> > > > > > > Currently users have to manually create schemas in Flink
> > >> > > source/sink
> > >> > > > > > > mirroring tables in their relational databases in use
> cases
> > >> like
> > >> > > JDBC
> > >> > > > > > > read/write and consuming CDC. Many users have complaint
> > about
> > >> the
> > >> > > > > > > unnecessary, redundant, manual work. Any mismatch can lead
> > to
> > >> a
> > >> > > > failing
> > >> > > > > > > Flink job at runtime instead of compile time. All these
> have
> > >> been
> > >> > > > quite
> > >> > > > > > > unpleasant, resulting in a broken user experience.
> > >> > > > > > >
> > >> > > > > > > We want to provide a JDBC catalog interface and a Postgres
> > >> > > > > implementation
> > >> > > > > > > for Flink as a start to connect to all kinds of relational
> > >> > > databases,
> > >> > > > > > > enabling Flink SQL to 1) retrieve table schema
> automatically
> > >> > > without
> > >> > > > > > > requiring user writes duped DDL 2) check at compile time
> for
> > >> > schema
> > >> > > > > > errors.
> > >> > > > > > > It will greatly streamline user experiences when using
> Flink
> > >> to
> > >> > > deal
> > >> > > > > with
> > >> > > > > > > popular relational databases like Postgres, MySQL,
> MariaDB,
> > >> AWS
> > >> > > > Aurora,
> > >> > > > > > > etc.
> > >> > > > > > >
> > >> > > > > > > Note that the problem and solution are actually very
> general
> > >> to
> > >> > > Flink
> > >> > > > > > when
> > >> > > > > > > connecting to all kinds of external systems. We just focus
> > on
> > >> > > solving
> > >> > > > > > that
> > >> > > > > > > for relational databases in this FLIP.
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > > Bowen
> > >> > > > > > >
> > >> > > > > > > [1]
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > > Best, Jingsong Lee
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > Best, Jingsong Lee
> > >> >
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-92: JDBC catalog and Postgres catalog

bowen.li
Hi all,

If there's no more comments, I would like to kick off a vote for this FLIP
[1].

FYI, the flip number is changed to 93 since there was a race condition of
taking 92.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-93%3A+JDBC+catalog+and+Postgres+catalog

On Wed, Jan 22, 2020 at 11:05 AM Bowen Li <[hidden email]> wrote:

> Hi Flavio,
>
> First, this is a generic question on how flink-jdbc is set up, not
> specific to jdbc catalog, thus is better to be on its own thread.
>
> But to just quickly answer your question, you need to see where the
> incompatibility is. There may be incompatibility on 1) jdbc drivers and 2)
> the databases. 1) is fairly stable and back-compatible. 2) normally has
> things to do with your queries, not the driver.
>
>
>
> On Tue, Jan 21, 2020 at 3:21 PM Flavio Pompermaier <[hidden email]>
> wrote:
>
>> Hi all,
>> I'm happy to see a lot of interest in easing the integration with JDBC
>> data
>> sources. Maybe this could be a rare situation (not in my experience
>> however..) but what if I have to connect to the same type of source (e.g.
>> Mysql) with 2 incompatible version...? How can I load the 2 (or more)
>> connectors jars without causing conflicts?
>>
>> Il Mar 14 Gen 2020, 23:32 Bowen Li <[hidden email]> ha scritto:
>>
>> > Hi devs,
>> >
>> > I've updated the wiki according to feedbacks. Please take another look.
>> >
>> > Thanks!
>> >
>> >
>> > On Fri, Jan 10, 2020 at 2:24 PM Bowen Li <[hidden email]> wrote:
>> >
>> > > Thanks everyone for the prompt feedback. Please see my response below.
>> > >
>> > > > In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
>> > > java.time.Instant semantic, and should be mapped to Flink's
>> > TIME/TIMESTAMP
>> > > WITH LOCAL TIME ZONE
>> > >
>> > > Zhenghua, you are right that pg's 'timestamp with timezone' should be
>> > > translated into flink's 'timestamp with local timezone'. I don't find
>> > 'time
>> > > with (local) timezone' though, so we may not support that type from
>> pg in
>> > > Flink.
>> > >
>> > > > I suggest that the parameters can be completely consistent with the
>> > > JDBCTableSource / JDBCTableSink. If you take a look to JDBC api:
>> > > "DriverManager.getConnection".
>> > > That allow "default db, username, pwd" things optional. They can
>> included
>> > > in URL. Of course JDBC api also allows establishing connections to
>> > > different databases in a db instance. So I think we don't need
>> provide a
>> > > "base_url", we can just provide a real "url". To be consistent with
>> JDBC
>> > > api.
>> > >
>> > > Jingsong, what I'm saying is a builder can be added on demand later if
>> > > there's enough user requesting it, and doesn't need to be a core part
>> of
>> > > the FLIP.
>> > >
>> > > Besides, unfortunately Postgres doesn't allow changing databases via
>> > JDBC.
>> > >
>> > > JDBC provides different connecting options as you mentioned, but I'd
>> like
>> > > to keep our design and API simple and having to handle extra parsing
>> > logic.
>> > > And it doesn't shut the door for what you proposed as a future effort.
>> > >
>> > > > Since the PostgreSQL does not have catalog but schema under
>> database,
>> > > why not mapping the PG-database to Flink catalog and PG-schema to
>> Flink
>> > > database
>> > >
>> > > Danny, because 1) there are frequent use cases where users want to
>> switch
>> > > databases or referencing objects across databases in a pg instance 2)
>> > > schema is an optional namespace layer in pg, it always has a default
>> > value
>> > > ("public") and can be invisible to users if they'd like to as shown in
>> > the
>> > > FLIP 3) as you mentioned it is specific to postgres, and I don't feel
>> > it's
>> > > necessary to map Postgres substantially different than others DBMSs
>> with
>> > > additional complexity
>> > >
>> > > >'base_url' configuration: We are following the configuration format
>> > > guideline [1] which suggest to use dash (-) instead of underline (_).
>> And
>> > > I'm a little confused the meaning of "base_url" at the first glance,
>> > > another idea is split it into several configurations: 'driver',
>> > 'hostname',
>> > > 'port'.
>> > >
>> > > Jark, I agreed we should use "base-url" in yaml config.
>> > >
>> > > I'm not sure about having hostname and port separately because you can
>> > > specify multiple hosts with ports in jdbc, like
>> > > "jdbc:dbms/host1:port1,host2:port2/", for connection failovers.
>> > Separating
>> > > them would make configurations harder.
>> > >
>> > > I will add clear doc and example to avoid any possible confusion.
>> > >
>> > > > 'default-database' is optional, then which database will be used or
>> > what
>> > > is the behavior when the default database is not selected.
>> > >
>> > > This should be DBMS specific. For postgres, it will be the <access id>
>> > > database.
>> > >
>> > >
>> > > On Thu, Jan 9, 2020 at 9:48 PM Zhenghua Gao <[hidden email]> wrote:
>> > >
>> > >> Hi Bowen, Thanks for driving this.
>> > >> I think it would be very convenience to use tables in external DBs
>> with
>> > >> JDBC Catalog.
>> > >>
>> > >> I have one concern about "Flink-Postgres Data Type Mapping" part:
>> > >>
>> > >> In Postgress, the TIME/TIMESTAMP WITH TIME ZONE has the
>> > java.time.Instant
>> > >> semantic,
>> > >> and should be mapped to Flink's TIME/TIMESTAMP WITH LOCAL TIME ZONE
>> > >>
>> > >> *Best Regards,*
>> > >> *Zhenghua Gao*
>> > >>
>> > >>
>> > >> On Fri, Jan 10, 2020 at 11:09 AM Jingsong Li <[hidden email]
>> >
>> > >> wrote:
>> > >>
>> > >> > Hi Bowen, thanks for reply and updating.
>> > >> >
>> > >> > > I don't see much value in providing a builder for jdbc catalogs,
>> as
>> > >> they
>> > >> > only have 4 or 5 required params, no optional ones. I prefer users
>> > just
>> > >> > provide a base url without default db, usrname, pwd so we don't
>> need
>> > to
>> > >> > parse url all around, as I mentioned jdbc catalog may need to
>> > establish
>> > >> > connections to different databases in a db instance,
>> > >> >
>> > >> > I suggest that the parameters can be completely consistent with the
>> > >> > JDBCTableSource / JDBCTableSink.
>> > >> > If you take a look to JDBC api: "DriverManager.getConnection".
>> > >> > That allow "default db, username, pwd" things optional. They can
>> > >> included
>> > >> > in URL. Of course JDBC api also allows establishing connections to
>> > >> > different databases in a db instance.
>> > >> > So I think we don't need provide a "base_url", we can just provide
>> a
>> > >> real
>> > >> > "url".
>> > >> > To be consistent with JDBC api.
>> > >> >
>> > >> > Best,
>> > >> > Jingsong Lee
>> > >> >
>> > >> > On Fri, Jan 10, 2020 at 10:34 AM Jark Wu <[hidden email]> wrote:
>> > >> >
>> > >> > > Thanks Bowen for the reply,
>> > >> > >
>> > >> > > A user-facing JDBCCatalog and 'catalog.type' = 'jdbc'  sounds
>> good
>> > to
>> > >> me.
>> > >> > >
>> > >> > > I have some other minor comments when I went through the updated
>> > >> > > documentation:
>> > >> > >
>> > >> > > 1) 'base_url' configuration: We are following the configuration
>> > format
>> > >> > > guideline [1] which suggest to use dash (-) instead of underline
>> > (_).
>> > >> > >      And I'm a little confused the meaning of "base_url" at the
>> > first
>> > >> > > glance, another idea is split it into several configurations:
>> > >> 'driver',
>> > >> > > 'hostname', 'port'.
>> > >> > >
>> > >> > > 2) 'default-database' is optional, then which database will be
>> used
>> > or
>> > >> > what
>> > >> > > is the behavior when the default database is not selected.
>> > >> > >
>> > >> > > 3) a builder for jdbc catalogs: I agree with Jingsong to provide
>> a
>> > >> > builder.
>> > >> > > Because there is optional configuration here (the default
>> database),
>> > >> > >    and providind Builder as the API will be easier for evolution,
>> > I'm
>> > >> not
>> > >> > > sure we won't add/modify parameters in the future.
>> > >> > >
>> > >> > > [1]:
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://flink.apache.org/contributing/code-style-and-quality-components.html#configuration-changes
>> > >> > >
>> > >> > > On Fri, 10 Jan 2020 at 04:52, Bowen Li <[hidden email]>
>> wrote:
>> > >> > >
>> > >> > > > Hi Jark and Jingsong,
>> > >> > > >
>> > >> > > > Thanks for your review. Please see my reply in line.
>> > >> > > >
>> > >> > > > > why introducing a `PostgresJDBCCatalog`, not a generic
>> > >> `JDBCCatalog`
>> > >> > > > (catalog.type = 'postgres' vs 'jdbc') ?
>> > >> > > >
>> > >> > > > Thanks for the reminding and I looked at JDBCDialect. A
>> generic,
>> > >> > > > user-facing JDBCCatalog with catalog.type = jdbc and find
>> specific
>> > >> db
>> > >> > > > implementations (pg v.s. mysql v.s. ...) is more aligned with
>> how
>> > >> jdbc
>> > >> > > > sink/source is handled, indeed. However, the catalogs would
>> also
>> > >> need
>> > >> > to
>> > >> > > > execute the query and parse query results in a db-dependent
>> way.
>> > >> E.g.
>> > >> > > jdbc
>> > >> > > > catalog needs to establish connections to different databases
>> > >> within a
>> > >> > db
>> > >> > > > instance on demand. So just having JDBCDialect won't be enough.
>> > >> > > >
>> > >> > > > I think we can do the following:
>> > >> > > >   - provide a user-facing JDBCCatalog, composing a db-specific
>> > impl
>> > >> > like
>> > >> > > > PostgresJDBCCatalog and MySQLJDBCCatalog. Users still specify
>> > >> "jdbc" as
>> > >> > > > type in both Table API and SQL CLI, internally it will create a
>> > >> > > db-specific
>> > >> > > > impl depending on jdbc base url.
>> > >> > > >   - some statements can reside in JDBCDialect. Query execution
>> and
>> > >> > result
>> > >> > > > parsing logic would be located in db-specific impls.
>> > >> > > >
>> > >> > > > - We can provide a Builder for Catalog, In my opinion,
>> > >> defaultDatabase,
>> > >> > > > username, pwd can be included in JDBC DB url.
>> > >> > > >
>> > >> > > > I don't see much value in providing a builder for jdbc
>> catalogs,
>> > as
>> > >> > they
>> > >> > > > only have 4 or 5 required params, no optional ones. I prefer
>> users
>> > >> just
>> > >> > > > provide a base url without default db, usrname, pwd so we don't
>> > >> need to
>> > >> > > > parse url all around, as I mentioned jdbc catalog may need to
>> > >> establish
>> > >> > > > connections to different databases in a db instance,
>> > >> > > >
>> > >> > > > - About timestamp and time, write down the specific Flink
>> > precision
>> > >> of
>> > >> > > > Postgres?
>> > >> > > >
>> > >> > > > I've documented that. It's 0-6
>> > >> > > >
>> > >> > > > - I think there is a part missing in your document, that is
>> how to
>> > >> use
>> > >> > > this
>> > >> > > > catalog. If you can write a complete example, I think it will
>> be
>> > >> much
>> > >> > > > clearer.
>> > >> > > >
>> > >> > > > I added some examples in both table api and SQL Cli. It will
>> be no
>> > >> > > > different from existing catalogs.
>> > >> > > >
>> > >> > > > - So a thing is what TableFactory will this catalog use? For
>> > >> example,
>> > >> > > > JDBCTableSourceSinkFactory has different parameters for source
>> or
>> > >> sink?
>> > >> > > How
>> > >> > > > do you think about it?
>> > >> > > >
>> > >> > > > This catalog will directly call JDBCTableSourceSinkFactory
>> without
>> > >> > going
>> > >> > > > thru service discovery because we are sure it's a jdbc table. I
>> > >> added
>> > >> > it
>> > >> > > to
>> > >> > > > the doc.
>> > >> > > >
>> > >> > > > For the different params besides schema, as we discussed
>> offline,
>> > >> > > > unfortunately we can't do anything right now until Flink
>> DDL/DML
>> > are
>> > >> > able
>> > >> > > > to distinguish 3 types of params - external data's metada,
>> > >> source/sink
>> > >> > > > runtime params, and Flink semantics params. The latter two
>> can't
>> > be
>> > >> > > > provided by catalogs. The problem is actually general to all
>> > >> catalogs,
>> > >> > > not
>> > >> > > > just JDBCCatalog. I'm pushing for such an effort to solve it.
>> At
>> > >> this
>> > >> > > > moment we can only use some default params for some cases, and
>> the
>> > >> > other
>> > >> > > > cases cannot take advantage of the JDBC catalog and users still
>> > >> have to
>> > >> > > > write DDL manually.
>> > >> > > >
>> > >> > > > Thanks,
>> > >> > > > Bowen
>> > >> > > >
>> > >> > > > On Wed, Jan 8, 2020 at 7:46 PM Jingsong Li <
>> > [hidden email]>
>> > >> > > wrote:
>> > >> > > >
>> > >> > > > > Thanks Bowen for driving this,
>> > >> > > > >
>> > >> > > > > +1 for this, The DDL schema definition is a headache for
>> users,
>> > >> and
>> > >> > > > catalog
>> > >> > > > > is a solution to this problem.
>> > >> > > > >
>> > >> > > > > I have some questions and suggestions:
>> > >> > > > >
>> > >> > > > > - We can provide a Builder for Catalog, In my opinion,
>> > >> > defaultDatabase,
>> > >> > > > > username, pwd can be included in JDBC DB url.
>> > >> > > > >
>> > >> > > > > - About timestamp and time, write down the specific Flink
>> > >> precision
>> > >> > of
>> > >> > > > > Postgres?
>> > >> > > > >
>> > >> > > > > - I think there is a part missing in your document, that is
>> how
>> > to
>> > >> > use
>> > >> > > > this
>> > >> > > > > catalog. If you can write a complete example, I think it
>> will be
>> > >> much
>> > >> > > > > clearer.
>> > >> > > > >
>> > >> > > > > - So a thing is what TableFactory will this catalog use? For
>> > >> example,
>> > >> > > > > JDBCTableSourceSinkFactory has different parameters for
>> source
>> > or
>> > >> > sink?
>> > >> > > > How
>> > >> > > > > do you think about it?
>> > >> > > > >
>> > >> > > > > Best,
>> > >> > > > > Jingsong Lee
>> > >> > > > >
>> > >> > > > > On Thu, Jan 9, 2020 at 11:33 AM Jark Wu <[hidden email]>
>> > wrote:
>> > >> > > > >
>> > >> > > > > > Thanks Bowen for driving this.
>> > >> > > > > >
>> > >> > > > > > +1 to this feature.
>> > >> > > > > >
>> > >> > > > > > My concern is that why introducing a `PostgresJDBCCatalog`,
>> > not
>> > >> a
>> > >> > > > generic
>> > >> > > > > > `JDBCCatalog` (catalog.type = 'postgres' vs 'jdbc') ?
>> > >> > > > > > From my understanding, JDBC catalog is similar to JDBC
>> > >> source/sink.
>> > >> > > For
>> > >> > > > > > JDBC source/sink, we have a generic
>> > >> > > > > > implementation for JDBC and delegate operations to
>> > JDBCDialect.
>> > >> > > > Different
>> > >> > > > > > driver may have different implementation of
>> > >> > > > > > JDBCDialect, e.g `quoteIdentifier()`.
>> > >> > > > > >
>> > >> > > > > > For JDBC catalog, I guess maybe we can do it in the same
>> way,
>> > >> i.e.
>> > >> > a
>> > >> > > > > > generic JDBCCatalog implementation and delegate
>> > >> > > > > > operations to JDBCDialect, and we will have
>> `listDataBase()`,
>> > >> > > > > > `listTables()` interfaces in JDBCDialect. The benefit is
>> that:
>> > >> > > > > > 0) reuse the existing `JDBCDialect`, I guess JDBCCatalog
>> also
>> > >> need
>> > >> > to
>> > >> > > > > quote
>> > >> > > > > > identifiers.
>> > >> > > > > > 1) we can easily to support a new database catalog (e.g.
>> > mysql)
>> > >> by
>> > >> > > > > > implementing new dialects (e.g. MySQLDialect).
>> > >> > > > > > 2) this can keep the same behavior as JDBC source/sink,
>> i.e.
>> > >> > > > > > connector.type=jdbc, catalog.type=jdbc
>> > >> > > > > >
>> > >> > > > > > Best,
>> > >> > > > > > Jark
>> > >> > > > > >
>> > >> > > > > >
>> > >> > > > > > On Thu, 9 Jan 2020 at 08:33, Bowen Li <[hidden email]
>> >
>> > >> wrote:
>> > >> > > > > >
>> > >> > > > > > > Hi dev,
>> > >> > > > > > >
>> > >> > > > > > > I'd like to kick off a discussion on adding JDBC
>> catalogs,
>> > >> > > > specifically
>> > >> > > > > > > Postgres catalog in Flink [1].
>> > >> > > > > > >
>> > >> > > > > > > Currently users have to manually create schemas in Flink
>> > >> > > source/sink
>> > >> > > > > > > mirroring tables in their relational databases in use
>> cases
>> > >> like
>> > >> > > JDBC
>> > >> > > > > > > read/write and consuming CDC. Many users have complaint
>> > about
>> > >> the
>> > >> > > > > > > unnecessary, redundant, manual work. Any mismatch can
>> lead
>> > to
>> > >> a
>> > >> > > > failing
>> > >> > > > > > > Flink job at runtime instead of compile time. All these
>> have
>> > >> been
>> > >> > > > quite
>> > >> > > > > > > unpleasant, resulting in a broken user experience.
>> > >> > > > > > >
>> > >> > > > > > > We want to provide a JDBC catalog interface and a
>> Postgres
>> > >> > > > > implementation
>> > >> > > > > > > for Flink as a start to connect to all kinds of
>> relational
>> > >> > > databases,
>> > >> > > > > > > enabling Flink SQL to 1) retrieve table schema
>> automatically
>> > >> > > without
>> > >> > > > > > > requiring user writes duped DDL 2) check at compile time
>> for
>> > >> > schema
>> > >> > > > > > errors.
>> > >> > > > > > > It will greatly streamline user experiences when using
>> Flink
>> > >> to
>> > >> > > deal
>> > >> > > > > with
>> > >> > > > > > > popular relational databases like Postgres, MySQL,
>> MariaDB,
>> > >> AWS
>> > >> > > > Aurora,
>> > >> > > > > > > etc.
>> > >> > > > > > >
>> > >> > > > > > > Note that the problem and solution are actually very
>> general
>> > >> to
>> > >> > > Flink
>> > >> > > > > > when
>> > >> > > > > > > connecting to all kinds of external systems. We just
>> focus
>> > on
>> > >> > > solving
>> > >> > > > > > that
>> > >> > > > > > > for relational databases in this FLIP.
>> > >> > > > > > >
>> > >> > > > > > > Thanks,
>> > >> > > > > > > Bowen
>> > >> > > > > > >
>> > >> > > > > > > [1]
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > --
>> > >> > > > > Best, Jingsong Lee
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >> >
>> > >> > --
>> > >> > Best, Jingsong Lee
>> > >> >
>> > >>
>> > >
>> >
>>
>