Flink SQL Cdc with schema changing

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL Cdc with schema changing

taher koitawala-2
Hi All,
         I have a CDC use case where I want to capture and process debezium
logs that are streamed to Kafka via Debezium. As per all the flink examples
we have to pre create the schema of the tables where I want to perform a
write.

However my question is what if there is an alter table modify column data
type query that hits the source RDBMS, how does flink handle that schema
change and what changes are supported. If someone can give a full example
it will be very very helpful.


Regards,
Taher Koitawala
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL Cdc with schema changing

Jark Wu-2
Hi Taher,

Currently, Flink (SQL) CDC doesn't support automatic schema change
and doesn't support to consume schema change events in source.
But you can upgrade schema manually, for example, if you have a table
with columns [a, b, c], you can define a flink table t1 with these 3
columns.
When you add new column in source RDBMS, the Flink SQL job on t1
should work fine if you are using format 'debezium-json' or
'debezium-avro-confluent',
because they supports schema compatibility.
When you are notified there is a schema change in the source RDBMS,
then you can upgrade your Flink SQL DDL and job to include the added
column,
and consume from the previous savepoint [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint



On Wed, 5 May 2021 at 13:34, Taher Koitawala <[hidden email]> wrote:

> Hi All,
>          I have a CDC use case where I want to capture and process debezium
> logs that are streamed to Kafka via Debezium. As per all the flink examples
> we have to pre create the schema of the tables where I want to perform a
> write.
>
> However my question is what if there is an alter table modify column data
> type query that hits the source RDBMS, how does flink handle that schema
> change and what changes are supported. If someone can give a full example
> it will be very very helpful.
>
>
> Regards,
> Taher Koitawala
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL Cdc with schema changing

taher koitawala-2
Thank you for the reply Jack Wu, however that does not satisfy my
requirements, my use case is to have something that supports a schema drift
over avro format. Column addition and column datatype change both types of
variations is what I am trying to solve for. Either way thanks for
the help, much appreciated.

Regards,
Taher Koitawala

On Wed, May 5, 2021 at 3:53 PM Jark Wu <[hidden email]> wrote:

> Hi Taher,
>
> Currently, Flink (SQL) CDC doesn't support automatic schema change
> and doesn't support to consume schema change events in source.
> But you can upgrade schema manually, for example, if you have a table
> with columns [a, b, c], you can define a flink table t1 with these 3
> columns.
> When you add new column in source RDBMS, the Flink SQL job on t1
> should work fine if you are using format 'debezium-json' or
> 'debezium-avro-confluent',
> because they supports schema compatibility.
> When you are notified there is a schema change in the source RDBMS,
> then you can upgrade your Flink SQL DDL and job to include the added
> column,
> and consume from the previous savepoint [1].
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
>
>
>
> On Wed, 5 May 2021 at 13:34, Taher Koitawala <[hidden email]> wrote:
>
> > Hi All,
> >          I have a CDC use case where I want to capture and process
> debezium
> > logs that are streamed to Kafka via Debezium. As per all the flink
> examples
> > we have to pre create the schema of the tables where I want to perform a
> > write.
> >
> > However my question is what if there is an alter table modify column data
> > type query that hits the source RDBMS, how does flink handle that schema
> > change and what changes are supported. If someone can give a full example
> > it will be very very helpful.
> >
> >
> > Regards,
> > Taher Koitawala
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL Cdc with schema changing

Jark Wu-2
Hi Taher,

Could you explain a bit more your use case and what do you expect Flink SQL
to support?
That could help us to better understand and plan the future roadmap.

Best,
Jark

On Wed, 5 May 2021 at 19:42, Taher Koitawala <[hidden email]> wrote:

> Thank you for the reply Jack Wu, however that does not satisfy my
> requirements, my use case is to have something that supports a schema drift
> over avro format. Column addition and column datatype change both types of
> variations is what I am trying to solve for. Either way thanks for
> the help, much appreciated.
>
> Regards,
> Taher Koitawala
>
> On Wed, May 5, 2021 at 3:53 PM Jark Wu <[hidden email]> wrote:
>
> > Hi Taher,
> >
> > Currently, Flink (SQL) CDC doesn't support automatic schema change
> > and doesn't support to consume schema change events in source.
> > But you can upgrade schema manually, for example, if you have a table
> > with columns [a, b, c], you can define a flink table t1 with these 3
> > columns.
> > When you add new column in source RDBMS, the Flink SQL job on t1
> > should work fine if you are using format 'debezium-json' or
> > 'debezium-avro-confluent',
> > because they supports schema compatibility.
> > When you are notified there is a schema change in the source RDBMS,
> > then you can upgrade your Flink SQL DDL and job to include the added
> > column,
> > and consume from the previous savepoint [1].
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
> >
> >
> >
> > On Wed, 5 May 2021 at 13:34, Taher Koitawala <[hidden email]> wrote:
> >
> > > Hi All,
> > >          I have a CDC use case where I want to capture and process
> > debezium
> > > logs that are streamed to Kafka via Debezium. As per all the flink
> > examples
> > > we have to pre create the schema of the tables where I want to perform
> a
> > > write.
> > >
> > > However my question is what if there is an alter table modify column
> data
> > > type query that hits the source RDBMS, how does flink handle that
> schema
> > > change and what changes are supported. If someone can give a full
> example
> > > it will be very very helpful.
> > >
> > >
> > > Regards,
> > > Taher Koitawala
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL Cdc with schema changing

taher koitawala-2
Sure, here's the use case that I want to solve. I want to stream CDC
records that are inserted in kafka via debezium. We want to capture all
events of Debezium, with an alter table add column, modify column, inserts,
updates and deletes over an avro based file format which can then be
queried. For now we are also evaluating if we can write direct records to
Iceberg format and try to solve.

Further leads on iceberg will be appreciated.

It is a very common practise these days to bring over change records with
evolving schema. We have HUDI as an option however that is a Spark based
approach. Personal opinion dont want to offend anyone, however I think
Flink is way way way better than spark when to comes to streaming. Another
problem with Hudi is the compaction process that it needs.

Major goal is to support CDC natively via streams. When a user queries the
data the goal is to get kind of a lock over the dataset where the user can
see committed data only and changes can still be streamed whilst the user
queries that data.

In a nutshell I am trying to design a CDC system over with flink as the
major stream processing engine.









On Wed, May 5, 2021 at 5:40 PM Jark Wu <[hidden email]> wrote:

> Hi Taher,
>
> Could you explain a bit more your use case and what do you expect Flink SQL
> to support?
> That could help us to better understand and plan the future roadmap.
>
> Best,
> Jark
>
> On Wed, 5 May 2021 at 19:42, Taher Koitawala <[hidden email]> wrote:
>
> > Thank you for the reply Jack Wu, however that does not satisfy my
> > requirements, my use case is to have something that supports a schema
> drift
> > over avro format. Column addition and column datatype change both types
> of
> > variations is what I am trying to solve for. Either way thanks for
> > the help, much appreciated.
> >
> > Regards,
> > Taher Koitawala
> >
> > On Wed, May 5, 2021 at 3:53 PM Jark Wu <[hidden email]> wrote:
> >
> > > Hi Taher,
> > >
> > > Currently, Flink (SQL) CDC doesn't support automatic schema change
> > > and doesn't support to consume schema change events in source.
> > > But you can upgrade schema manually, for example, if you have a table
> > > with columns [a, b, c], you can define a flink table t1 with these 3
> > > columns.
> > > When you add new column in source RDBMS, the Flink SQL job on t1
> > > should work fine if you are using format 'debezium-json' or
> > > 'debezium-avro-confluent',
> > > because they supports schema compatibility.
> > > When you are notified there is a schema change in the source RDBMS,
> > > then you can upgrade your Flink SQL DDL and job to include the added
> > > column,
> > > and consume from the previous savepoint [1].
> > >
> > > Best,
> > > Jark
> > >
> > > [1]:
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
> > >
> > >
> > >
> > > On Wed, 5 May 2021 at 13:34, Taher Koitawala <[hidden email]>
> wrote:
> > >
> > > > Hi All,
> > > >          I have a CDC use case where I want to capture and process
> > > debezium
> > > > logs that are streamed to Kafka via Debezium. As per all the flink
> > > examples
> > > > we have to pre create the schema of the tables where I want to
> perform
> > a
> > > > write.
> > > >
> > > > However my question is what if there is an alter table modify column
> > data
> > > > type query that hits the source RDBMS, how does flink handle that
> > schema
> > > > change and what changes are supported. If someone can give a full
> > example
> > > > it will be very very helpful.
> > > >
> > > >
> > > > Regards,
> > > > Taher Koitawala
> > > >
> > >
> >
>