Hi everyone,
I'm happy to present the results of long discussions that we had internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more have contributed to this design document. We would like to propose new long-term table source and table sink interfaces: https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces This is a requirement for FLIP-105 and finalizing FLIP-32. The goals of this FLIP are: - Simplify the current interface architecture: - Merge upsert, retract, and append sinks. - Unify batch and streaming sources. - Unify batch and streaming sinks. - Allow sources to produce a changelog: - UpsertTableSources have been requested a lot by users. Now is the time to open the internal planner capabilities via the new interfaces. - According to FLIP-105, we would like to support changelogs for processing formats such as Debezium. - Don't rely on DataStream API for source and sinks: - According to FLIP-32, the Table API and SQL should be independent of the DataStream API which is why the `table-common` module has no dependencies on `flink-streaming-java`. - Source and sink implementations should only depend on the `table-common` module after FLIP-27. - Until FLIP-27 is ready, we still put most of the interfaces in `table-common` and strictly separate interfaces that communicate with a planner and actual runtime reader/writers. - Implement efficient sources and sinks without planner dependencies: - Make Blink's internal data structures available to connectors. - Introduce stable interfaces for data structures that can be marked as `@PublicEvolving`. - Only require dependencies on `flink-table-common` in the future It finalizes the concept of dynamic tables and consideres how all source/sink related classes play together. We look forward to your feedback. Regards, Timo |
Hi Timo,
Thank you and others for the efforts to prepare this FLIP. The FLIP LGTM generally. +1 for moving blink data structures to table-common, it's useful to udf too in the future. A little question is, do we plan to support the new interfaces and data types in legacy planner? Or we only plan to support these new interfaces in blink planner. And using primary keys from DDL instead of derived key information from each query is also a good idea, we met some use cases where this does not works very well before. This FLIP also makes the dependencies of table modules more clear, I like it very much. Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > Hi everyone, > > I'm happy to present the results of long discussions that we had > internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more > have contributed to this design document. > > We would like to propose new long-term table source and table sink > interfaces: > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > This is a requirement for FLIP-105 and finalizing FLIP-32. > > The goals of this FLIP are: > > - Simplify the current interface architecture: > - Merge upsert, retract, and append sinks. > - Unify batch and streaming sources. > - Unify batch and streaming sinks. > > - Allow sources to produce a changelog: > - UpsertTableSources have been requested a lot by users. Now is the > time to open the internal planner capabilities via the new interfaces. > - According to FLIP-105, we would like to support changelogs for > processing formats such as Debezium. > > - Don't rely on DataStream API for source and sinks: > - According to FLIP-32, the Table API and SQL should be independent > of the DataStream API which is why the `table-common` module has no > dependencies on `flink-streaming-java`. > - Source and sink implementations should only depend on the > `table-common` module after FLIP-27. > - Until FLIP-27 is ready, we still put most of the interfaces in > `table-common` and strictly separate interfaces that communicate with a > planner and actual runtime reader/writers. > > - Implement efficient sources and sinks without planner dependencies: > - Make Blink's internal data structures available to connectors. > - Introduce stable interfaces for data structures that can be > marked as `@PublicEvolving`. > - Only require dependencies on `flink-table-common` in the future > > It finalizes the concept of dynamic tables and consideres how all > source/sink related classes play together. > > We look forward to your feedback. > > Regards, > Timo > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi Benchao,
this is a very good question. I will update the FLIP about this. The legacy planner will not support the new interfaces. It will only support the old interfaces. With the next release, I think the Blink planner is stable enough to be the default one as well. Regards, Timo On 18.03.20 08:45, Benchao Li wrote: > Hi Timo, > > Thank you and others for the efforts to prepare this FLIP. > > The FLIP LGTM generally. > > +1 for moving blink data structures to table-common, it's useful to udf too > in the future. > A little question is, do we plan to support the new interfaces and data > types in legacy planner? > Or we only plan to support these new interfaces in blink planner. > > And using primary keys from DDL instead of derived key information from > each query is also a good idea, > we met some use cases where this does not works very well before. > > This FLIP also makes the dependencies of table modules more clear, I like > it very much. > > Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > >> Hi everyone, >> >> I'm happy to present the results of long discussions that we had >> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more >> have contributed to this design document. >> >> We would like to propose new long-term table source and table sink >> interfaces: >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >> >> This is a requirement for FLIP-105 and finalizing FLIP-32. >> >> The goals of this FLIP are: >> >> - Simplify the current interface architecture: >> - Merge upsert, retract, and append sinks. >> - Unify batch and streaming sources. >> - Unify batch and streaming sinks. >> >> - Allow sources to produce a changelog: >> - UpsertTableSources have been requested a lot by users. Now is the >> time to open the internal planner capabilities via the new interfaces. >> - According to FLIP-105, we would like to support changelogs for >> processing formats such as Debezium. >> >> - Don't rely on DataStream API for source and sinks: >> - According to FLIP-32, the Table API and SQL should be independent >> of the DataStream API which is why the `table-common` module has no >> dependencies on `flink-streaming-java`. >> - Source and sink implementations should only depend on the >> `table-common` module after FLIP-27. >> - Until FLIP-27 is ready, we still put most of the interfaces in >> `table-common` and strictly separate interfaces that communicate with a >> planner and actual runtime reader/writers. >> >> - Implement efficient sources and sinks without planner dependencies: >> - Make Blink's internal data structures available to connectors. >> - Introduce stable interfaces for data structures that can be >> marked as `@PublicEvolving`. >> - Only require dependencies on `flink-table-common` in the future >> >> It finalizes the concept of dynamic tables and consideres how all >> source/sink related classes play together. >> >> We look forward to your feedback. >> >> Regards, >> Timo >> > > |
Hi everyone,
I received some questions around how the new interfaces play together with formats and their factories. Furthermore, for MySQL or Postgres CDC logs, the format should be able to return a `ChangelogMode`. Also, I incorporated the feedback around the factory design in general. I added a new section `Factory Interfaces` to the design document. This should be helpful to understand the big picture and connecting the concepts. Please let me know what you think? Thanks, Timo On 18.03.20 13:43, Timo Walther wrote: > Hi Benchao, > > this is a very good question. I will update the FLIP about this. > > The legacy planner will not support the new interfaces. It will only > support the old interfaces. With the next release, I think the Blink > planner is stable enough to be the default one as well. > > Regards, > Timo > > On 18.03.20 08:45, Benchao Li wrote: >> Hi Timo, >> >> Thank you and others for the efforts to prepare this FLIP. >> >> The FLIP LGTM generally. >> >> +1 for moving blink data structures to table-common, it's useful to >> udf too >> in the future. >> A little question is, do we plan to support the new interfaces and data >> types in legacy planner? >> Or we only plan to support these new interfaces in blink planner. >> >> And using primary keys from DDL instead of derived key information from >> each query is also a good idea, >> we met some use cases where this does not works very well before. >> >> This FLIP also makes the dependencies of table modules more clear, I like >> it very much. >> >> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: >> >>> Hi everyone, >>> >>> I'm happy to present the results of long discussions that we had >>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more >>> have contributed to this design document. >>> >>> We would like to propose new long-term table source and table sink >>> interfaces: >>> >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>> >>> >>> This is a requirement for FLIP-105 and finalizing FLIP-32. >>> >>> The goals of this FLIP are: >>> >>> - Simplify the current interface architecture: >>> - Merge upsert, retract, and append sinks. >>> - Unify batch and streaming sources. >>> - Unify batch and streaming sinks. >>> >>> - Allow sources to produce a changelog: >>> - UpsertTableSources have been requested a lot by users. Now is >>> the >>> time to open the internal planner capabilities via the new interfaces. >>> - According to FLIP-105, we would like to support changelogs for >>> processing formats such as Debezium. >>> >>> - Don't rely on DataStream API for source and sinks: >>> - According to FLIP-32, the Table API and SQL should be >>> independent >>> of the DataStream API which is why the `table-common` module has no >>> dependencies on `flink-streaming-java`. >>> - Source and sink implementations should only depend on the >>> `table-common` module after FLIP-27. >>> - Until FLIP-27 is ready, we still put most of the interfaces in >>> `table-common` and strictly separate interfaces that communicate with a >>> planner and actual runtime reader/writers. >>> >>> - Implement efficient sources and sinks without planner dependencies: >>> - Make Blink's internal data structures available to connectors. >>> - Introduce stable interfaces for data structures that can be >>> marked as `@PublicEvolving`. >>> - Only require dependencies on `flink-table-common` in the future >>> >>> It finalizes the concept of dynamic tables and consideres how all >>> source/sink related classes play together. >>> >>> We look forward to your feedback. >>> >>> Regards, >>> Timo >>> >> >> |
Hi Timo,
Thank you for the proposal. I think it is an important improvement that will benefit many parts of the Table API. The proposal looks really good to me and personally I would be comfortable with voting on the current state. Best, Dawid On 23/03/2020 18:53, Timo Walther wrote: > Hi everyone, > > I received some questions around how the new interfaces play together > with formats and their factories. > > Furthermore, for MySQL or Postgres CDC logs, the format should be able > to return a `ChangelogMode`. > > Also, I incorporated the feedback around the factory design in general. > > I added a new section `Factory Interfaces` to the design document. > This should be helpful to understand the big picture and connecting > the concepts. > > Please let me know what you think? > > Thanks, > Timo > > > On 18.03.20 13:43, Timo Walther wrote: >> Hi Benchao, >> >> this is a very good question. I will update the FLIP about this. >> >> The legacy planner will not support the new interfaces. It will only >> support the old interfaces. With the next release, I think the Blink >> planner is stable enough to be the default one as well. >> >> Regards, >> Timo >> >> On 18.03.20 08:45, Benchao Li wrote: >>> Hi Timo, >>> >>> Thank you and others for the efforts to prepare this FLIP. >>> >>> The FLIP LGTM generally. >>> >>> +1 for moving blink data structures to table-common, it's useful to >>> udf too >>> in the future. >>> A little question is, do we plan to support the new interfaces and data >>> types in legacy planner? >>> Or we only plan to support these new interfaces in blink planner. >>> >>> And using primary keys from DDL instead of derived key information from >>> each query is also a good idea, >>> we met some use cases where this does not works very well before. >>> >>> This FLIP also makes the dependencies of table modules more clear, I >>> like >>> it very much. >>> >>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: >>> >>>> Hi everyone, >>>> >>>> I'm happy to present the results of long discussions that we had >>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more >>>> have contributed to this design document. >>>> >>>> We would like to propose new long-term table source and table sink >>>> interfaces: >>>> >>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>> >>>> >>>> This is a requirement for FLIP-105 and finalizing FLIP-32. >>>> >>>> The goals of this FLIP are: >>>> >>>> - Simplify the current interface architecture: >>>> - Merge upsert, retract, and append sinks. >>>> - Unify batch and streaming sources. >>>> - Unify batch and streaming sinks. >>>> >>>> - Allow sources to produce a changelog: >>>> - UpsertTableSources have been requested a lot by users. Now >>>> is the >>>> time to open the internal planner capabilities via the new interfaces. >>>> - According to FLIP-105, we would like to support changelogs for >>>> processing formats such as Debezium. >>>> >>>> - Don't rely on DataStream API for source and sinks: >>>> - According to FLIP-32, the Table API and SQL should be >>>> independent >>>> of the DataStream API which is why the `table-common` module has no >>>> dependencies on `flink-streaming-java`. >>>> - Source and sink implementations should only depend on the >>>> `table-common` module after FLIP-27. >>>> - Until FLIP-27 is ready, we still put most of the interfaces in >>>> `table-common` and strictly separate interfaces that communicate >>>> with a >>>> planner and actual runtime reader/writers. >>>> >>>> - Implement efficient sources and sinks without planner dependencies: >>>> - Make Blink's internal data structures available to connectors. >>>> - Introduce stable interfaces for data structures that can be >>>> marked as `@PublicEvolving`. >>>> - Only require dependencies on `flink-table-common` in the >>>> future >>>> >>>> It finalizes the concept of dynamic tables and consideres how all >>>> source/sink related classes play together. >>>> >>>> We look forward to your feedback. >>>> >>>> Regards, >>>> Timo >>>> >>> >>> > signature.asc (849 bytes) Download Attachment |
Hi Timo,
Thanks for the proposal. I completely agree that the current Table connectors could be simplified quite a bit. I haven't finished reading everything, but here are some quick thoughts. Actually to me the biggest question is why should there be two different connector systems for DataStream and Table? What is the fundamental reason that is preventing us from merging them to one? The basic functionality of a connector is to provide capabilities to do IO and Serde. Conceptually, Table connectors should just be DataStream connectors that are dealing with Rows. It seems that quite a few of the special connector requirements are just a specific way to do IO / Serde. Taking SupportsFilterPushDown as an example, imagine we have the following interface: interface FilterableSource<PREDICATE> { void applyFilterable(Supplier<PREDICATE> predicate); } And if a ParquetSource would like to support filterable, it will become: class ParquetSource implements Source, FilterableSource(FilterPredicate> { ... } For Table, one just need to provide an predicate supplier that converts an Expression to the specified predicate type. This has a few benefit: 1. Same unified API for filterable for sources, regardless of DataStream or Table. 2. The DataStream users now can also use the ExpressionToPredicate supplier if they want to. To summarize, my main point is that I am wondering if it is possible to have a single set of connector interface for both Table and DataStream, rather than having two hierarchies. I am not 100% sure if this would work, but if it works, this would be a huge win from both code maintenance and user experience perspective. Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz <[hidden email]> wrote: > Hi Timo, > > Thank you for the proposal. I think it is an important improvement that > will benefit many parts of the Table API. The proposal looks really good > to me and personally I would be comfortable with voting on the current > state. > > Best, > > Dawid > > On 23/03/2020 18:53, Timo Walther wrote: > > Hi everyone, > > > > I received some questions around how the new interfaces play together > > with formats and their factories. > > > > Furthermore, for MySQL or Postgres CDC logs, the format should be able > > to return a `ChangelogMode`. > > > > Also, I incorporated the feedback around the factory design in general. > > > > I added a new section `Factory Interfaces` to the design document. > > This should be helpful to understand the big picture and connecting > > the concepts. > > > > Please let me know what you think? > > > > Thanks, > > Timo > > > > > > On 18.03.20 13:43, Timo Walther wrote: > >> Hi Benchao, > >> > >> this is a very good question. I will update the FLIP about this. > >> > >> The legacy planner will not support the new interfaces. It will only > >> support the old interfaces. With the next release, I think the Blink > >> planner is stable enough to be the default one as well. > >> > >> Regards, > >> Timo > >> > >> On 18.03.20 08:45, Benchao Li wrote: > >>> Hi Timo, > >>> > >>> Thank you and others for the efforts to prepare this FLIP. > >>> > >>> The FLIP LGTM generally. > >>> > >>> +1 for moving blink data structures to table-common, it's useful to > >>> udf too > >>> in the future. > >>> A little question is, do we plan to support the new interfaces and data > >>> types in legacy planner? > >>> Or we only plan to support these new interfaces in blink planner. > >>> > >>> And using primary keys from DDL instead of derived key information from > >>> each query is also a good idea, > >>> we met some use cases where this does not works very well before. > >>> > >>> This FLIP also makes the dependencies of table modules more clear, I > >>> like > >>> it very much. > >>> > >>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > >>> > >>>> Hi everyone, > >>>> > >>>> I'm happy to present the results of long discussions that we had > >>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more > >>>> have contributed to this design document. > >>>> > >>>> We would like to propose new long-term table source and table sink > >>>> interfaces: > >>>> > >>>> > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > >>>> > >>>> > >>>> This is a requirement for FLIP-105 and finalizing FLIP-32. > >>>> > >>>> The goals of this FLIP are: > >>>> > >>>> - Simplify the current interface architecture: > >>>> - Merge upsert, retract, and append sinks. > >>>> - Unify batch and streaming sources. > >>>> - Unify batch and streaming sinks. > >>>> > >>>> - Allow sources to produce a changelog: > >>>> - UpsertTableSources have been requested a lot by users. Now > >>>> is the > >>>> time to open the internal planner capabilities via the new interfaces. > >>>> - According to FLIP-105, we would like to support changelogs for > >>>> processing formats such as Debezium. > >>>> > >>>> - Don't rely on DataStream API for source and sinks: > >>>> - According to FLIP-32, the Table API and SQL should be > >>>> independent > >>>> of the DataStream API which is why the `table-common` module has no > >>>> dependencies on `flink-streaming-java`. > >>>> - Source and sink implementations should only depend on the > >>>> `table-common` module after FLIP-27. > >>>> - Until FLIP-27 is ready, we still put most of the interfaces in > >>>> `table-common` and strictly separate interfaces that communicate > >>>> with a > >>>> planner and actual runtime reader/writers. > >>>> > >>>> - Implement efficient sources and sinks without planner dependencies: > >>>> - Make Blink's internal data structures available to connectors. > >>>> - Introduce stable interfaces for data structures that can be > >>>> marked as `@PublicEvolving`. > >>>> - Only require dependencies on `flink-table-common` in the > >>>> future > >>>> > >>>> It finalizes the concept of dynamic tables and consideres how all > >>>> source/sink related classes play together. > >>>> > >>>> We look forward to your feedback. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>> > >>> > > > > |
Thanks Timo for the design doc.
In general I'm +1 to this, with a minor comment. Since we introduced dozens interfaces all at once, I'm not sure if it's good to annotate them with @PublicEnvolving already. I can imagine these interfaces would only be stable after 1 or 2 major release. Given the fact that these interfaces will only be used by connector developers, how about we annotate them as @Internal first? After more try out and feedbacks from connector developers, we can improve those interfaces quickly and mark them @PublicEnvolving after we are confident about them. BTW, if I'm not mistaken, the end users will only see Row with enhanced RowKind. This is the only one which actually goes public IMO. Best, Kurt On Tue, Mar 24, 2020 at 9:24 AM Becket Qin <[hidden email]> wrote: > Hi Timo, > > Thanks for the proposal. I completely agree that the current Table > connectors could be simplified quite a bit. I haven't finished reading > everything, but here are some quick thoughts. > > Actually to me the biggest question is why should there be two different > connector systems for DataStream and Table? What is the fundamental reason > that is preventing us from merging them to one? > > The basic functionality of a connector is to provide capabilities to do IO > and Serde. Conceptually, Table connectors should just be DataStream > connectors that are dealing with Rows. It seems that quite a few of the > special connector requirements are just a specific way to do IO / Serde. > Taking SupportsFilterPushDown as an example, imagine we have the following > interface: > > interface FilterableSource<PREDICATE> { > void applyFilterable(Supplier<PREDICATE> predicate); > } > > And if a ParquetSource would like to support filterable, it will become: > > class ParquetSource implements Source, FilterableSource(FilterPredicate> { > ... > } > > For Table, one just need to provide an predicate supplier that converts an > Expression to the specified predicate type. This has a few benefit: > 1. Same unified API for filterable for sources, regardless of DataStream or > Table. > 2. The DataStream users now can also use the ExpressionToPredicate > supplier if they want to. > > To summarize, my main point is that I am wondering if it is possible to > have a single set of connector interface for both Table and DataStream, > rather than having two hierarchies. I am not 100% sure if this would work, > but if it works, this would be a huge win from both code maintenance and > user experience perspective. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz <[hidden email]> > wrote: > > > Hi Timo, > > > > Thank you for the proposal. I think it is an important improvement that > > will benefit many parts of the Table API. The proposal looks really good > > to me and personally I would be comfortable with voting on the current > > state. > > > > Best, > > > > Dawid > > > > On 23/03/2020 18:53, Timo Walther wrote: > > > Hi everyone, > > > > > > I received some questions around how the new interfaces play together > > > with formats and their factories. > > > > > > Furthermore, for MySQL or Postgres CDC logs, the format should be able > > > to return a `ChangelogMode`. > > > > > > Also, I incorporated the feedback around the factory design in general. > > > > > > I added a new section `Factory Interfaces` to the design document. > > > This should be helpful to understand the big picture and connecting > > > the concepts. > > > > > > Please let me know what you think? > > > > > > Thanks, > > > Timo > > > > > > > > > On 18.03.20 13:43, Timo Walther wrote: > > >> Hi Benchao, > > >> > > >> this is a very good question. I will update the FLIP about this. > > >> > > >> The legacy planner will not support the new interfaces. It will only > > >> support the old interfaces. With the next release, I think the Blink > > >> planner is stable enough to be the default one as well. > > >> > > >> Regards, > > >> Timo > > >> > > >> On 18.03.20 08:45, Benchao Li wrote: > > >>> Hi Timo, > > >>> > > >>> Thank you and others for the efforts to prepare this FLIP. > > >>> > > >>> The FLIP LGTM generally. > > >>> > > >>> +1 for moving blink data structures to table-common, it's useful to > > >>> udf too > > >>> in the future. > > >>> A little question is, do we plan to support the new interfaces and > data > > >>> types in legacy planner? > > >>> Or we only plan to support these new interfaces in blink planner. > > >>> > > >>> And using primary keys from DDL instead of derived key information > from > > >>> each query is also a good idea, > > >>> we met some use cases where this does not works very well before. > > >>> > > >>> This FLIP also makes the dependencies of table modules more clear, I > > >>> like > > >>> it very much. > > >>> > > >>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > > >>> > > >>>> Hi everyone, > > >>>> > > >>>> I'm happy to present the results of long discussions that we had > > >>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more > > >>>> have contributed to this design document. > > >>>> > > >>>> We would like to propose new long-term table source and table sink > > >>>> interfaces: > > >>>> > > >>>> > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > >>>> > > >>>> > > >>>> This is a requirement for FLIP-105 and finalizing FLIP-32. > > >>>> > > >>>> The goals of this FLIP are: > > >>>> > > >>>> - Simplify the current interface architecture: > > >>>> - Merge upsert, retract, and append sinks. > > >>>> - Unify batch and streaming sources. > > >>>> - Unify batch and streaming sinks. > > >>>> > > >>>> - Allow sources to produce a changelog: > > >>>> - UpsertTableSources have been requested a lot by users. Now > > >>>> is the > > >>>> time to open the internal planner capabilities via the new > interfaces. > > >>>> - According to FLIP-105, we would like to support changelogs > for > > >>>> processing formats such as Debezium. > > >>>> > > >>>> - Don't rely on DataStream API for source and sinks: > > >>>> - According to FLIP-32, the Table API and SQL should be > > >>>> independent > > >>>> of the DataStream API which is why the `table-common` module has no > > >>>> dependencies on `flink-streaming-java`. > > >>>> - Source and sink implementations should only depend on the > > >>>> `table-common` module after FLIP-27. > > >>>> - Until FLIP-27 is ready, we still put most of the interfaces > in > > >>>> `table-common` and strictly separate interfaces that communicate > > >>>> with a > > >>>> planner and actual runtime reader/writers. > > >>>> > > >>>> - Implement efficient sources and sinks without planner > dependencies: > > >>>> - Make Blink's internal data structures available to > connectors. > > >>>> - Introduce stable interfaces for data structures that can be > > >>>> marked as `@PublicEvolving`. > > >>>> - Only require dependencies on `flink-table-common` in the > > >>>> future > > >>>> > > >>>> It finalizes the concept of dynamic tables and consideres how all > > >>>> source/sink related classes play together. > > >>>> > > >>>> We look forward to your feedback. > > >>>> > > >>>> Regards, > > >>>> Timo > > >>>> > > >>> > > >>> > > > > > > > > |
In reply to this post by Becket Qin
Hi Becket,
Answering your question, we have the same intention not to duplicate connectors between datastream and table apis. The interfaces proposed in the FLIP are a way to describe relational properties of a source. The intention is as you described to translate all of those expressed as expressions or other Table specific structures into a DataStream source. In other words I think what we are doing here is in line with what you described. Best, Dawid On 24/03/2020 02:23, Becket Qin wrote: > Hi Timo, > > Thanks for the proposal. I completely agree that the current Table > connectors could be simplified quite a bit. I haven't finished reading > everything, but here are some quick thoughts. > > Actually to me the biggest question is why should there be two different > connector systems for DataStream and Table? What is the fundamental reason > that is preventing us from merging them to one? > > The basic functionality of a connector is to provide capabilities to do IO > and Serde. Conceptually, Table connectors should just be DataStream > connectors that are dealing with Rows. It seems that quite a few of the > special connector requirements are just a specific way to do IO / Serde. > Taking SupportsFilterPushDown as an example, imagine we have the following > interface: > > interface FilterableSource<PREDICATE> { > void applyFilterable(Supplier<PREDICATE> predicate); > } > > And if a ParquetSource would like to support filterable, it will become: > > class ParquetSource implements Source, FilterableSource(FilterPredicate> { > ... > } > > For Table, one just need to provide an predicate supplier that converts an > Expression to the specified predicate type. This has a few benefit: > 1. Same unified API for filterable for sources, regardless of DataStream or > Table. > 2. The DataStream users now can also use the ExpressionToPredicate > supplier if they want to. > > To summarize, my main point is that I am wondering if it is possible to > have a single set of connector interface for both Table and DataStream, > rather than having two hierarchies. I am not 100% sure if this would work, > but if it works, this would be a huge win from both code maintenance and > user experience perspective. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz <[hidden email]> > wrote: > >> Hi Timo, >> >> Thank you for the proposal. I think it is an important improvement that >> will benefit many parts of the Table API. The proposal looks really good >> to me and personally I would be comfortable with voting on the current >> state. >> >> Best, >> >> Dawid >> >> On 23/03/2020 18:53, Timo Walther wrote: >>> Hi everyone, >>> >>> I received some questions around how the new interfaces play together >>> with formats and their factories. >>> >>> Furthermore, for MySQL or Postgres CDC logs, the format should be able >>> to return a `ChangelogMode`. >>> >>> Also, I incorporated the feedback around the factory design in general. >>> >>> I added a new section `Factory Interfaces` to the design document. >>> This should be helpful to understand the big picture and connecting >>> the concepts. >>> >>> Please let me know what you think? >>> >>> Thanks, >>> Timo >>> >>> >>> On 18.03.20 13:43, Timo Walther wrote: >>>> Hi Benchao, >>>> >>>> this is a very good question. I will update the FLIP about this. >>>> >>>> The legacy planner will not support the new interfaces. It will only >>>> support the old interfaces. With the next release, I think the Blink >>>> planner is stable enough to be the default one as well. >>>> >>>> Regards, >>>> Timo >>>> >>>> On 18.03.20 08:45, Benchao Li wrote: >>>>> Hi Timo, >>>>> >>>>> Thank you and others for the efforts to prepare this FLIP. >>>>> >>>>> The FLIP LGTM generally. >>>>> >>>>> +1 for moving blink data structures to table-common, it's useful to >>>>> udf too >>>>> in the future. >>>>> A little question is, do we plan to support the new interfaces and data >>>>> types in legacy planner? >>>>> Or we only plan to support these new interfaces in blink planner. >>>>> >>>>> And using primary keys from DDL instead of derived key information from >>>>> each query is also a good idea, >>>>> we met some use cases where this does not works very well before. >>>>> >>>>> This FLIP also makes the dependencies of table modules more clear, I >>>>> like >>>>> it very much. >>>>> >>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: >>>>> >>>>>> Hi everyone, >>>>>> >>>>>> I'm happy to present the results of long discussions that we had >>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more >>>>>> have contributed to this design document. >>>>>> >>>>>> We would like to propose new long-term table source and table sink >>>>>> interfaces: >>>>>> >>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>> >>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. >>>>>> >>>>>> The goals of this FLIP are: >>>>>> >>>>>> - Simplify the current interface architecture: >>>>>> - Merge upsert, retract, and append sinks. >>>>>> - Unify batch and streaming sources. >>>>>> - Unify batch and streaming sinks. >>>>>> >>>>>> - Allow sources to produce a changelog: >>>>>> - UpsertTableSources have been requested a lot by users. Now >>>>>> is the >>>>>> time to open the internal planner capabilities via the new interfaces. >>>>>> - According to FLIP-105, we would like to support changelogs for >>>>>> processing formats such as Debezium. >>>>>> >>>>>> - Don't rely on DataStream API for source and sinks: >>>>>> - According to FLIP-32, the Table API and SQL should be >>>>>> independent >>>>>> of the DataStream API which is why the `table-common` module has no >>>>>> dependencies on `flink-streaming-java`. >>>>>> - Source and sink implementations should only depend on the >>>>>> `table-common` module after FLIP-27. >>>>>> - Until FLIP-27 is ready, we still put most of the interfaces in >>>>>> `table-common` and strictly separate interfaces that communicate >>>>>> with a >>>>>> planner and actual runtime reader/writers. >>>>>> >>>>>> - Implement efficient sources and sinks without planner dependencies: >>>>>> - Make Blink's internal data structures available to connectors. >>>>>> - Introduce stable interfaces for data structures that can be >>>>>> marked as `@PublicEvolving`. >>>>>> - Only require dependencies on `flink-table-common` in the >>>>>> future >>>>>> >>>>>> It finalizes the concept of dynamic tables and consideres how all >>>>>> source/sink related classes play together. >>>>>> >>>>>> We look forward to your feedback. >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>> >> signature.asc (849 bytes) Download Attachment |
@Becket: We totally agree that we don't need table specific connectors
during runtime. As Dawid said, the interfaces proposed here are just for communication with the planner. Once the properties (watermarks, computed column, filters, projecttion etc.) are negotiated, we can configure a regular Flink connector. E.g. setting the watermark assigner and deserialization schema of a Kafka connector. For better separation of concerns, Flink connectors should not include relational interfaces and depend on flink-table. This is the responsibility of table source/sink. @Kurt: I would like to mark them @PublicEvolving already because we need to deprecate the old interfaces as early as possible. We cannot redirect to @Internal interfaces. They are not marked @Public, so we can still evolve them. But a core design shift should not happen again, it would leave a bad impression if we are redesign over and over again. Instead we should be confident in the current change. Regards, Timo On 24.03.20 09:20, Dawid Wysakowicz wrote: > Hi Becket, > > Answering your question, we have the same intention not to duplicate > connectors between datastream and table apis. The interfaces proposed in > the FLIP are a way to describe relational properties of a source. The > intention is as you described to translate all of those expressed as > expressions or other Table specific structures into a DataStream source. > In other words I think what we are doing here is in line with what you > described. > > Best, > > Dawid > > On 24/03/2020 02:23, Becket Qin wrote: >> Hi Timo, >> >> Thanks for the proposal. I completely agree that the current Table >> connectors could be simplified quite a bit. I haven't finished reading >> everything, but here are some quick thoughts. >> >> Actually to me the biggest question is why should there be two different >> connector systems for DataStream and Table? What is the fundamental reason >> that is preventing us from merging them to one? >> >> The basic functionality of a connector is to provide capabilities to do IO >> and Serde. Conceptually, Table connectors should just be DataStream >> connectors that are dealing with Rows. It seems that quite a few of the >> special connector requirements are just a specific way to do IO / Serde. >> Taking SupportsFilterPushDown as an example, imagine we have the following >> interface: >> >> interface FilterableSource<PREDICATE> { >> void applyFilterable(Supplier<PREDICATE> predicate); >> } >> >> And if a ParquetSource would like to support filterable, it will become: >> >> class ParquetSource implements Source, FilterableSource(FilterPredicate> { >> ... >> } >> >> For Table, one just need to provide an predicate supplier that converts an >> Expression to the specified predicate type. This has a few benefit: >> 1. Same unified API for filterable for sources, regardless of DataStream or >> Table. >> 2. The DataStream users now can also use the ExpressionToPredicate >> supplier if they want to. >> >> To summarize, my main point is that I am wondering if it is possible to >> have a single set of connector interface for both Table and DataStream, >> rather than having two hierarchies. I am not 100% sure if this would work, >> but if it works, this would be a huge win from both code maintenance and >> user experience perspective. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> >> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz <[hidden email]> >> wrote: >> >>> Hi Timo, >>> >>> Thank you for the proposal. I think it is an important improvement that >>> will benefit many parts of the Table API. The proposal looks really good >>> to me and personally I would be comfortable with voting on the current >>> state. >>> >>> Best, >>> >>> Dawid >>> >>> On 23/03/2020 18:53, Timo Walther wrote: >>>> Hi everyone, >>>> >>>> I received some questions around how the new interfaces play together >>>> with formats and their factories. >>>> >>>> Furthermore, for MySQL or Postgres CDC logs, the format should be able >>>> to return a `ChangelogMode`. >>>> >>>> Also, I incorporated the feedback around the factory design in general. >>>> >>>> I added a new section `Factory Interfaces` to the design document. >>>> This should be helpful to understand the big picture and connecting >>>> the concepts. >>>> >>>> Please let me know what you think? >>>> >>>> Thanks, >>>> Timo >>>> >>>> >>>> On 18.03.20 13:43, Timo Walther wrote: >>>>> Hi Benchao, >>>>> >>>>> this is a very good question. I will update the FLIP about this. >>>>> >>>>> The legacy planner will not support the new interfaces. It will only >>>>> support the old interfaces. With the next release, I think the Blink >>>>> planner is stable enough to be the default one as well. >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> On 18.03.20 08:45, Benchao Li wrote: >>>>>> Hi Timo, >>>>>> >>>>>> Thank you and others for the efforts to prepare this FLIP. >>>>>> >>>>>> The FLIP LGTM generally. >>>>>> >>>>>> +1 for moving blink data structures to table-common, it's useful to >>>>>> udf too >>>>>> in the future. >>>>>> A little question is, do we plan to support the new interfaces and data >>>>>> types in legacy planner? >>>>>> Or we only plan to support these new interfaces in blink planner. >>>>>> >>>>>> And using primary keys from DDL instead of derived key information from >>>>>> each query is also a good idea, >>>>>> we met some use cases where this does not works very well before. >>>>>> >>>>>> This FLIP also makes the dependencies of table modules more clear, I >>>>>> like >>>>>> it very much. >>>>>> >>>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: >>>>>> >>>>>>> Hi everyone, >>>>>>> >>>>>>> I'm happy to present the results of long discussions that we had >>>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many more >>>>>>> have contributed to this design document. >>>>>>> >>>>>>> We would like to propose new long-term table source and table sink >>>>>>> interfaces: >>>>>>> >>>>>>> >>>>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>> >>>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. >>>>>>> >>>>>>> The goals of this FLIP are: >>>>>>> >>>>>>> - Simplify the current interface architecture: >>>>>>> - Merge upsert, retract, and append sinks. >>>>>>> - Unify batch and streaming sources. >>>>>>> - Unify batch and streaming sinks. >>>>>>> >>>>>>> - Allow sources to produce a changelog: >>>>>>> - UpsertTableSources have been requested a lot by users. Now >>>>>>> is the >>>>>>> time to open the internal planner capabilities via the new interfaces. >>>>>>> - According to FLIP-105, we would like to support changelogs for >>>>>>> processing formats such as Debezium. >>>>>>> >>>>>>> - Don't rely on DataStream API for source and sinks: >>>>>>> - According to FLIP-32, the Table API and SQL should be >>>>>>> independent >>>>>>> of the DataStream API which is why the `table-common` module has no >>>>>>> dependencies on `flink-streaming-java`. >>>>>>> - Source and sink implementations should only depend on the >>>>>>> `table-common` module after FLIP-27. >>>>>>> - Until FLIP-27 is ready, we still put most of the interfaces in >>>>>>> `table-common` and strictly separate interfaces that communicate >>>>>>> with a >>>>>>> planner and actual runtime reader/writers. >>>>>>> >>>>>>> - Implement efficient sources and sinks without planner dependencies: >>>>>>> - Make Blink's internal data structures available to connectors. >>>>>>> - Introduce stable interfaces for data structures that can be >>>>>>> marked as `@PublicEvolving`. >>>>>>> - Only require dependencies on `flink-table-common` in the >>>>>>> future >>>>>>> >>>>>>> It finalizes the concept of dynamic tables and consideres how all >>>>>>> source/sink related classes play together. >>>>>>> >>>>>>> We look forward to your feedback. >>>>>>> >>>>>>> Regards, >>>>>>> Timo >>>>>>> >>>>>> >>> > |
+1. Thanks Timo for the design doc.
We can also consider @Experimental too. But I am +1 to @PublicEvolving, we should be confident in the current change. Best, Jingsong Lee On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> wrote: > @Becket: We totally agree that we don't need table specific connectors > during runtime. As Dawid said, the interfaces proposed here are just for > communication with the planner. Once the properties (watermarks, > computed column, filters, projecttion etc.) are negotiated, we can > configure a regular Flink connector. > > E.g. setting the watermark assigner and deserialization schema of a > Kafka connector. > > For better separation of concerns, Flink connectors should not include > relational interfaces and depend on flink-table. This is the > responsibility of table source/sink. > > @Kurt: I would like to mark them @PublicEvolving already because we need > to deprecate the old interfaces as early as possible. We cannot redirect > to @Internal interfaces. They are not marked @Public, so we can still > evolve them. But a core design shift should not happen again, it would > leave a bad impression if we are redesign over and over again. Instead > we should be confident in the current change. > > Regards, > Timo > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > Hi Becket, > > > > Answering your question, we have the same intention not to duplicate > > connectors between datastream and table apis. The interfaces proposed in > > the FLIP are a way to describe relational properties of a source. The > > intention is as you described to translate all of those expressed as > > expressions or other Table specific structures into a DataStream source. > > In other words I think what we are doing here is in line with what you > > described. > > > > Best, > > > > Dawid > > > > On 24/03/2020 02:23, Becket Qin wrote: > >> Hi Timo, > >> > >> Thanks for the proposal. I completely agree that the current Table > >> connectors could be simplified quite a bit. I haven't finished reading > >> everything, but here are some quick thoughts. > >> > >> Actually to me the biggest question is why should there be two different > >> connector systems for DataStream and Table? What is the fundamental > reason > >> that is preventing us from merging them to one? > >> > >> The basic functionality of a connector is to provide capabilities to do > IO > >> and Serde. Conceptually, Table connectors should just be DataStream > >> connectors that are dealing with Rows. It seems that quite a few of the > >> special connector requirements are just a specific way to do IO / Serde. > >> Taking SupportsFilterPushDown as an example, imagine we have the > following > >> interface: > >> > >> interface FilterableSource<PREDICATE> { > >> void applyFilterable(Supplier<PREDICATE> predicate); > >> } > >> > >> And if a ParquetSource would like to support filterable, it will become: > >> > >> class ParquetSource implements Source, > FilterableSource(FilterPredicate> { > >> ... > >> } > >> > >> For Table, one just need to provide an predicate supplier that converts > an > >> Expression to the specified predicate type. This has a few benefit: > >> 1. Same unified API for filterable for sources, regardless of > DataStream or > >> Table. > >> 2. The DataStream users now can also use the ExpressionToPredicate > >> supplier if they want to. > >> > >> To summarize, my main point is that I am wondering if it is possible to > >> have a single set of connector interface for both Table and DataStream, > >> rather than having two hierarchies. I am not 100% sure if this would > work, > >> but if it works, this would be a huge win from both code maintenance and > >> user experience perspective. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> > >> > >> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > [hidden email]> > >> wrote: > >> > >>> Hi Timo, > >>> > >>> Thank you for the proposal. I think it is an important improvement that > >>> will benefit many parts of the Table API. The proposal looks really > good > >>> to me and personally I would be comfortable with voting on the current > >>> state. > >>> > >>> Best, > >>> > >>> Dawid > >>> > >>> On 23/03/2020 18:53, Timo Walther wrote: > >>>> Hi everyone, > >>>> > >>>> I received some questions around how the new interfaces play together > >>>> with formats and their factories. > >>>> > >>>> Furthermore, for MySQL or Postgres CDC logs, the format should be able > >>>> to return a `ChangelogMode`. > >>>> > >>>> Also, I incorporated the feedback around the factory design in > general. > >>>> > >>>> I added a new section `Factory Interfaces` to the design document. > >>>> This should be helpful to understand the big picture and connecting > >>>> the concepts. > >>>> > >>>> Please let me know what you think? > >>>> > >>>> Thanks, > >>>> Timo > >>>> > >>>> > >>>> On 18.03.20 13:43, Timo Walther wrote: > >>>>> Hi Benchao, > >>>>> > >>>>> this is a very good question. I will update the FLIP about this. > >>>>> > >>>>> The legacy planner will not support the new interfaces. It will only > >>>>> support the old interfaces. With the next release, I think the Blink > >>>>> planner is stable enough to be the default one as well. > >>>>> > >>>>> Regards, > >>>>> Timo > >>>>> > >>>>> On 18.03.20 08:45, Benchao Li wrote: > >>>>>> Hi Timo, > >>>>>> > >>>>>> Thank you and others for the efforts to prepare this FLIP. > >>>>>> > >>>>>> The FLIP LGTM generally. > >>>>>> > >>>>>> +1 for moving blink data structures to table-common, it's useful to > >>>>>> udf too > >>>>>> in the future. > >>>>>> A little question is, do we plan to support the new interfaces and > data > >>>>>> types in legacy planner? > >>>>>> Or we only plan to support these new interfaces in blink planner. > >>>>>> > >>>>>> And using primary keys from DDL instead of derived key information > from > >>>>>> each query is also a good idea, > >>>>>> we met some use cases where this does not works very well before. > >>>>>> > >>>>>> This FLIP also makes the dependencies of table modules more clear, I > >>>>>> like > >>>>>> it very much. > >>>>>> > >>>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > >>>>>> > >>>>>>> Hi everyone, > >>>>>>> > >>>>>>> I'm happy to present the results of long discussions that we had > >>>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many > more > >>>>>>> have contributed to this design document. > >>>>>>> > >>>>>>> We would like to propose new long-term table source and table sink > >>>>>>> interfaces: > >>>>>>> > >>>>>>> > >>>>>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > >>>>>>> > >>>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. > >>>>>>> > >>>>>>> The goals of this FLIP are: > >>>>>>> > >>>>>>> - Simplify the current interface architecture: > >>>>>>> - Merge upsert, retract, and append sinks. > >>>>>>> - Unify batch and streaming sources. > >>>>>>> - Unify batch and streaming sinks. > >>>>>>> > >>>>>>> - Allow sources to produce a changelog: > >>>>>>> - UpsertTableSources have been requested a lot by users. Now > >>>>>>> is the > >>>>>>> time to open the internal planner capabilities via the new > interfaces. > >>>>>>> - According to FLIP-105, we would like to support > changelogs for > >>>>>>> processing formats such as Debezium. > >>>>>>> > >>>>>>> - Don't rely on DataStream API for source and sinks: > >>>>>>> - According to FLIP-32, the Table API and SQL should be > >>>>>>> independent > >>>>>>> of the DataStream API which is why the `table-common` module has no > >>>>>>> dependencies on `flink-streaming-java`. > >>>>>>> - Source and sink implementations should only depend on the > >>>>>>> `table-common` module after FLIP-27. > >>>>>>> - Until FLIP-27 is ready, we still put most of the > interfaces in > >>>>>>> `table-common` and strictly separate interfaces that communicate > >>>>>>> with a > >>>>>>> planner and actual runtime reader/writers. > >>>>>>> > >>>>>>> - Implement efficient sources and sinks without planner > dependencies: > >>>>>>> - Make Blink's internal data structures available to > connectors. > >>>>>>> - Introduce stable interfaces for data structures that can > be > >>>>>>> marked as `@PublicEvolving`. > >>>>>>> - Only require dependencies on `flink-table-common` in the > >>>>>>> future > >>>>>>> > >>>>>>> It finalizes the concept of dynamic tables and consideres how all > >>>>>>> source/sink related classes play together. > >>>>>>> > >>>>>>> We look forward to your feedback. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Timo > >>>>>>> > >>>>>> > >>> > > > > -- Best, Jingsong Lee |
Hi Timo and Dawid,
It's really great that we have the same goal. I am actually wondering if we can go one step further to avoid some of the interfaces in Table as well. For example, if we have the FilterableSource, do we still need the FilterableTableSource? Should DynamicTableSource just become a Source<*Row*, SourceSplitT, EnumChkT>? Can you help me understand a bit more about the reason we need the following relational representation / wrapper interfaces v.s. the interfaces that we could put to the Source in FLIP-27? DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> SupportsFilterablePushDown v.s. FilterableSource SupportsProjectablePushDown v.s. ProjectableSource SupportsWatermarkPushDown v.s. WithWatermarkAssigner SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer ScanTableSource v.s. ChangeLogDeserializer. LookUpTableSource v.s. LookUpSource Assuming we have all the interfaces on the right side, do we still need the interfaces on the left side? Note that the interfaces on the right can be used by both DataStream and Table. If we do this, there will only be one set of Source interfaces Table and DataStream, the only difference is that the Source for table will have some specific plugins and configurations. An omnipotent Source can implement all the the above interfaces and take a Deserializer that implements both ComputedColumnDeserializer and ChangeLogDeserializer. Would the SQL planner work with that? Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> wrote: > +1. Thanks Timo for the design doc. > > We can also consider @Experimental too. But I am +1 to @PublicEvolving, we > should be confident in the current change. > > Best, > Jingsong Lee > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> wrote: > > > @Becket: We totally agree that we don't need table specific connectors > > during runtime. As Dawid said, the interfaces proposed here are just for > > communication with the planner. Once the properties (watermarks, > > computed column, filters, projecttion etc.) are negotiated, we can > > configure a regular Flink connector. > > > > E.g. setting the watermark assigner and deserialization schema of a > > Kafka connector. > > > > For better separation of concerns, Flink connectors should not include > > relational interfaces and depend on flink-table. This is the > > responsibility of table source/sink. > > > > @Kurt: I would like to mark them @PublicEvolving already because we need > > to deprecate the old interfaces as early as possible. We cannot redirect > > to @Internal interfaces. They are not marked @Public, so we can still > > evolve them. But a core design shift should not happen again, it would > > leave a bad impression if we are redesign over and over again. Instead > > we should be confident in the current change. > > > > Regards, > > Timo > > > > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > > Hi Becket, > > > > > > Answering your question, we have the same intention not to duplicate > > > connectors between datastream and table apis. The interfaces proposed > in > > > the FLIP are a way to describe relational properties of a source. The > > > intention is as you described to translate all of those expressed as > > > expressions or other Table specific structures into a DataStream > source. > > > In other words I think what we are doing here is in line with what you > > > described. > > > > > > Best, > > > > > > Dawid > > > > > > On 24/03/2020 02:23, Becket Qin wrote: > > >> Hi Timo, > > >> > > >> Thanks for the proposal. I completely agree that the current Table > > >> connectors could be simplified quite a bit. I haven't finished reading > > >> everything, but here are some quick thoughts. > > >> > > >> Actually to me the biggest question is why should there be two > different > > >> connector systems for DataStream and Table? What is the fundamental > > reason > > >> that is preventing us from merging them to one? > > >> > > >> The basic functionality of a connector is to provide capabilities to > do > > IO > > >> and Serde. Conceptually, Table connectors should just be DataStream > > >> connectors that are dealing with Rows. It seems that quite a few of > the > > >> special connector requirements are just a specific way to do IO / > Serde. > > >> Taking SupportsFilterPushDown as an example, imagine we have the > > following > > >> interface: > > >> > > >> interface FilterableSource<PREDICATE> { > > >> void applyFilterable(Supplier<PREDICATE> predicate); > > >> } > > >> > > >> And if a ParquetSource would like to support filterable, it will > become: > > >> > > >> class ParquetSource implements Source, > > FilterableSource(FilterPredicate> { > > >> ... > > >> } > > >> > > >> For Table, one just need to provide an predicate supplier that > converts > > an > > >> Expression to the specified predicate type. This has a few benefit: > > >> 1. Same unified API for filterable for sources, regardless of > > DataStream or > > >> Table. > > >> 2. The DataStream users now can also use the ExpressionToPredicate > > >> supplier if they want to. > > >> > > >> To summarize, my main point is that I am wondering if it is possible > to > > >> have a single set of connector interface for both Table and > DataStream, > > >> rather than having two hierarchies. I am not 100% sure if this would > > work, > > >> but if it works, this would be a huge win from both code maintenance > and > > >> user experience perspective. > > >> > > >> Thanks, > > >> > > >> Jiangjie (Becket) Qin > > >> > > >> > > >> > > >> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > > [hidden email]> > > >> wrote: > > >> > > >>> Hi Timo, > > >>> > > >>> Thank you for the proposal. I think it is an important improvement > that > > >>> will benefit many parts of the Table API. The proposal looks really > > good > > >>> to me and personally I would be comfortable with voting on the > current > > >>> state. > > >>> > > >>> Best, > > >>> > > >>> Dawid > > >>> > > >>> On 23/03/2020 18:53, Timo Walther wrote: > > >>>> Hi everyone, > > >>>> > > >>>> I received some questions around how the new interfaces play > together > > >>>> with formats and their factories. > > >>>> > > >>>> Furthermore, for MySQL or Postgres CDC logs, the format should be > able > > >>>> to return a `ChangelogMode`. > > >>>> > > >>>> Also, I incorporated the feedback around the factory design in > > general. > > >>>> > > >>>> I added a new section `Factory Interfaces` to the design document. > > >>>> This should be helpful to understand the big picture and connecting > > >>>> the concepts. > > >>>> > > >>>> Please let me know what you think? > > >>>> > > >>>> Thanks, > > >>>> Timo > > >>>> > > >>>> > > >>>> On 18.03.20 13:43, Timo Walther wrote: > > >>>>> Hi Benchao, > > >>>>> > > >>>>> this is a very good question. I will update the FLIP about this. > > >>>>> > > >>>>> The legacy planner will not support the new interfaces. It will > only > > >>>>> support the old interfaces. With the next release, I think the > Blink > > >>>>> planner is stable enough to be the default one as well. > > >>>>> > > >>>>> Regards, > > >>>>> Timo > > >>>>> > > >>>>> On 18.03.20 08:45, Benchao Li wrote: > > >>>>>> Hi Timo, > > >>>>>> > > >>>>>> Thank you and others for the efforts to prepare this FLIP. > > >>>>>> > > >>>>>> The FLIP LGTM generally. > > >>>>>> > > >>>>>> +1 for moving blink data structures to table-common, it's useful > to > > >>>>>> udf too > > >>>>>> in the future. > > >>>>>> A little question is, do we plan to support the new interfaces and > > data > > >>>>>> types in legacy planner? > > >>>>>> Or we only plan to support these new interfaces in blink planner. > > >>>>>> > > >>>>>> And using primary keys from DDL instead of derived key information > > from > > >>>>>> each query is also a good idea, > > >>>>>> we met some use cases where this does not works very well before. > > >>>>>> > > >>>>>> This FLIP also makes the dependencies of table modules more > clear, I > > >>>>>> like > > >>>>>> it very much. > > >>>>>> > > >>>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > > >>>>>> > > >>>>>>> Hi everyone, > > >>>>>>> > > >>>>>>> I'm happy to present the results of long discussions that we had > > >>>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many > > more > > >>>>>>> have contributed to this design document. > > >>>>>>> > > >>>>>>> We would like to propose new long-term table source and table > sink > > >>>>>>> interfaces: > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > >>>>>>> > > >>>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. > > >>>>>>> > > >>>>>>> The goals of this FLIP are: > > >>>>>>> > > >>>>>>> - Simplify the current interface architecture: > > >>>>>>> - Merge upsert, retract, and append sinks. > > >>>>>>> - Unify batch and streaming sources. > > >>>>>>> - Unify batch and streaming sinks. > > >>>>>>> > > >>>>>>> - Allow sources to produce a changelog: > > >>>>>>> - UpsertTableSources have been requested a lot by users. > Now > > >>>>>>> is the > > >>>>>>> time to open the internal planner capabilities via the new > > interfaces. > > >>>>>>> - According to FLIP-105, we would like to support > > changelogs for > > >>>>>>> processing formats such as Debezium. > > >>>>>>> > > >>>>>>> - Don't rely on DataStream API for source and sinks: > > >>>>>>> - According to FLIP-32, the Table API and SQL should be > > >>>>>>> independent > > >>>>>>> of the DataStream API which is why the `table-common` module has > no > > >>>>>>> dependencies on `flink-streaming-java`. > > >>>>>>> - Source and sink implementations should only depend on > the > > >>>>>>> `table-common` module after FLIP-27. > > >>>>>>> - Until FLIP-27 is ready, we still put most of the > > interfaces in > > >>>>>>> `table-common` and strictly separate interfaces that communicate > > >>>>>>> with a > > >>>>>>> planner and actual runtime reader/writers. > > >>>>>>> > > >>>>>>> - Implement efficient sources and sinks without planner > > dependencies: > > >>>>>>> - Make Blink's internal data structures available to > > connectors. > > >>>>>>> - Introduce stable interfaces for data structures that can > > be > > >>>>>>> marked as `@PublicEvolving`. > > >>>>>>> - Only require dependencies on `flink-table-common` in the > > >>>>>>> future > > >>>>>>> > > >>>>>>> It finalizes the concept of dynamic tables and consideres how all > > >>>>>>> source/sink related classes play together. > > >>>>>>> > > >>>>>>> We look forward to your feedback. > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Timo > > >>>>>>> > > >>>>>> > > >>> > > > > > > > > > -- > Best, Jingsong Lee > |
Hi Becket,
I don't think DataStream should see some SQL specific concepts such as Filtering or ComputedColumn. It's better to stay within SQL area and translate to more generic concept when translating to DataStream/Runtime layer, such as use MapFunction to represent computed column logic. Best, Kurt On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <[hidden email]> wrote: > Hi Timo and Dawid, > > It's really great that we have the same goal. I am actually wondering if we > can go one step further to avoid some of the interfaces in Table as well. > > For example, if we have the FilterableSource, do we still need the > FilterableTableSource? Should DynamicTableSource just become a > Source<*Row*, > SourceSplitT, EnumChkT>? > > Can you help me understand a bit more about the reason we need the > following relational representation / wrapper interfaces v.s. the > interfaces that we could put to the Source in FLIP-27? > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > SupportsFilterablePushDown v.s. FilterableSource > SupportsProjectablePushDown v.s. ProjectableSource > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > ScanTableSource v.s. ChangeLogDeserializer. > LookUpTableSource v.s. LookUpSource > > Assuming we have all the interfaces on the right side, do we still need the > interfaces on the left side? Note that the interfaces on the right can be > used by both DataStream and Table. If we do this, there will only be one > set of Source interfaces Table and DataStream, the only difference is that > the Source for table will have some specific plugins and configurations. An > omnipotent Source can implement all the the above interfaces and take a > Deserializer that implements both ComputedColumnDeserializer and > ChangeLogDeserializer. > > Would the SQL planner work with that? > > Thanks, > > Jiangjie (Becket) Qin > > > > > > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> > wrote: > > > +1. Thanks Timo for the design doc. > > > > We can also consider @Experimental too. But I am +1 to @PublicEvolving, > we > > should be confident in the current change. > > > > Best, > > Jingsong Lee > > > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> wrote: > > > > > @Becket: We totally agree that we don't need table specific connectors > > > during runtime. As Dawid said, the interfaces proposed here are just > for > > > communication with the planner. Once the properties (watermarks, > > > computed column, filters, projecttion etc.) are negotiated, we can > > > configure a regular Flink connector. > > > > > > E.g. setting the watermark assigner and deserialization schema of a > > > Kafka connector. > > > > > > For better separation of concerns, Flink connectors should not include > > > relational interfaces and depend on flink-table. This is the > > > responsibility of table source/sink. > > > > > > @Kurt: I would like to mark them @PublicEvolving already because we > need > > > to deprecate the old interfaces as early as possible. We cannot > redirect > > > to @Internal interfaces. They are not marked @Public, so we can still > > > evolve them. But a core design shift should not happen again, it would > > > leave a bad impression if we are redesign over and over again. Instead > > > we should be confident in the current change. > > > > > > Regards, > > > Timo > > > > > > > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > > > Hi Becket, > > > > > > > > Answering your question, we have the same intention not to duplicate > > > > connectors between datastream and table apis. The interfaces proposed > > in > > > > the FLIP are a way to describe relational properties of a source. The > > > > intention is as you described to translate all of those expressed as > > > > expressions or other Table specific structures into a DataStream > > source. > > > > In other words I think what we are doing here is in line with what > you > > > > described. > > > > > > > > Best, > > > > > > > > Dawid > > > > > > > > On 24/03/2020 02:23, Becket Qin wrote: > > > >> Hi Timo, > > > >> > > > >> Thanks for the proposal. I completely agree that the current Table > > > >> connectors could be simplified quite a bit. I haven't finished > reading > > > >> everything, but here are some quick thoughts. > > > >> > > > >> Actually to me the biggest question is why should there be two > > different > > > >> connector systems for DataStream and Table? What is the fundamental > > > reason > > > >> that is preventing us from merging them to one? > > > >> > > > >> The basic functionality of a connector is to provide capabilities to > > do > > > IO > > > >> and Serde. Conceptually, Table connectors should just be DataStream > > > >> connectors that are dealing with Rows. It seems that quite a few of > > the > > > >> special connector requirements are just a specific way to do IO / > > Serde. > > > >> Taking SupportsFilterPushDown as an example, imagine we have the > > > following > > > >> interface: > > > >> > > > >> interface FilterableSource<PREDICATE> { > > > >> void applyFilterable(Supplier<PREDICATE> predicate); > > > >> } > > > >> > > > >> And if a ParquetSource would like to support filterable, it will > > become: > > > >> > > > >> class ParquetSource implements Source, > > > FilterableSource(FilterPredicate> { > > > >> ... > > > >> } > > > >> > > > >> For Table, one just need to provide an predicate supplier that > > converts > > > an > > > >> Expression to the specified predicate type. This has a few benefit: > > > >> 1. Same unified API for filterable for sources, regardless of > > > DataStream or > > > >> Table. > > > >> 2. The DataStream users now can also use the ExpressionToPredicate > > > >> supplier if they want to. > > > >> > > > >> To summarize, my main point is that I am wondering if it is possible > > to > > > >> have a single set of connector interface for both Table and > > DataStream, > > > >> rather than having two hierarchies. I am not 100% sure if this would > > > work, > > > >> but if it works, this would be a huge win from both code maintenance > > and > > > >> user experience perspective. > > > >> > > > >> Thanks, > > > >> > > > >> Jiangjie (Becket) Qin > > > >> > > > >> > > > >> > > > >> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > > > [hidden email]> > > > >> wrote: > > > >> > > > >>> Hi Timo, > > > >>> > > > >>> Thank you for the proposal. I think it is an important improvement > > that > > > >>> will benefit many parts of the Table API. The proposal looks really > > > good > > > >>> to me and personally I would be comfortable with voting on the > > current > > > >>> state. > > > >>> > > > >>> Best, > > > >>> > > > >>> Dawid > > > >>> > > > >>> On 23/03/2020 18:53, Timo Walther wrote: > > > >>>> Hi everyone, > > > >>>> > > > >>>> I received some questions around how the new interfaces play > > together > > > >>>> with formats and their factories. > > > >>>> > > > >>>> Furthermore, for MySQL or Postgres CDC logs, the format should be > > able > > > >>>> to return a `ChangelogMode`. > > > >>>> > > > >>>> Also, I incorporated the feedback around the factory design in > > > general. > > > >>>> > > > >>>> I added a new section `Factory Interfaces` to the design document. > > > >>>> This should be helpful to understand the big picture and > connecting > > > >>>> the concepts. > > > >>>> > > > >>>> Please let me know what you think? > > > >>>> > > > >>>> Thanks, > > > >>>> Timo > > > >>>> > > > >>>> > > > >>>> On 18.03.20 13:43, Timo Walther wrote: > > > >>>>> Hi Benchao, > > > >>>>> > > > >>>>> this is a very good question. I will update the FLIP about this. > > > >>>>> > > > >>>>> The legacy planner will not support the new interfaces. It will > > only > > > >>>>> support the old interfaces. With the next release, I think the > > Blink > > > >>>>> planner is stable enough to be the default one as well. > > > >>>>> > > > >>>>> Regards, > > > >>>>> Timo > > > >>>>> > > > >>>>> On 18.03.20 08:45, Benchao Li wrote: > > > >>>>>> Hi Timo, > > > >>>>>> > > > >>>>>> Thank you and others for the efforts to prepare this FLIP. > > > >>>>>> > > > >>>>>> The FLIP LGTM generally. > > > >>>>>> > > > >>>>>> +1 for moving blink data structures to table-common, it's useful > > to > > > >>>>>> udf too > > > >>>>>> in the future. > > > >>>>>> A little question is, do we plan to support the new interfaces > and > > > data > > > >>>>>> types in legacy planner? > > > >>>>>> Or we only plan to support these new interfaces in blink > planner. > > > >>>>>> > > > >>>>>> And using primary keys from DDL instead of derived key > information > > > from > > > >>>>>> each query is also a good idea, > > > >>>>>> we met some use cases where this does not works very well > before. > > > >>>>>> > > > >>>>>> This FLIP also makes the dependencies of table modules more > > clear, I > > > >>>>>> like > > > >>>>>> it very much. > > > >>>>>> > > > >>>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > > > >>>>>> > > > >>>>>>> Hi everyone, > > > >>>>>>> > > > >>>>>>> I'm happy to present the results of long discussions that we > had > > > >>>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and many > > > more > > > >>>>>>> have contributed to this design document. > > > >>>>>>> > > > >>>>>>> We would like to propose new long-term table source and table > > sink > > > >>>>>>> interfaces: > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > > >>>>>>> > > > >>>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. > > > >>>>>>> > > > >>>>>>> The goals of this FLIP are: > > > >>>>>>> > > > >>>>>>> - Simplify the current interface architecture: > > > >>>>>>> - Merge upsert, retract, and append sinks. > > > >>>>>>> - Unify batch and streaming sources. > > > >>>>>>> - Unify batch and streaming sinks. > > > >>>>>>> > > > >>>>>>> - Allow sources to produce a changelog: > > > >>>>>>> - UpsertTableSources have been requested a lot by users. > > Now > > > >>>>>>> is the > > > >>>>>>> time to open the internal planner capabilities via the new > > > interfaces. > > > >>>>>>> - According to FLIP-105, we would like to support > > > changelogs for > > > >>>>>>> processing formats such as Debezium. > > > >>>>>>> > > > >>>>>>> - Don't rely on DataStream API for source and sinks: > > > >>>>>>> - According to FLIP-32, the Table API and SQL should be > > > >>>>>>> independent > > > >>>>>>> of the DataStream API which is why the `table-common` module > has > > no > > > >>>>>>> dependencies on `flink-streaming-java`. > > > >>>>>>> - Source and sink implementations should only depend on > > the > > > >>>>>>> `table-common` module after FLIP-27. > > > >>>>>>> - Until FLIP-27 is ready, we still put most of the > > > interfaces in > > > >>>>>>> `table-common` and strictly separate interfaces that > communicate > > > >>>>>>> with a > > > >>>>>>> planner and actual runtime reader/writers. > > > >>>>>>> > > > >>>>>>> - Implement efficient sources and sinks without planner > > > dependencies: > > > >>>>>>> - Make Blink's internal data structures available to > > > connectors. > > > >>>>>>> - Introduce stable interfaces for data structures that > can > > > be > > > >>>>>>> marked as `@PublicEvolving`. > > > >>>>>>> - Only require dependencies on `flink-table-common` in > the > > > >>>>>>> future > > > >>>>>>> > > > >>>>>>> It finalizes the concept of dynamic tables and consideres how > all > > > >>>>>>> source/sink related classes play together. > > > >>>>>>> > > > >>>>>>> We look forward to your feedback. > > > >>>>>>> > > > >>>>>>> Regards, > > > >>>>>>> Timo > > > >>>>>>> > > > >>>>>> > > > >>> > > > > > > > > > > > > > > -- > > Best, Jingsong Lee > > > |
Thanks Timo for updating the formats section. That would be very helpful
for changelog supporting (FLIP-105). I just left 2 minor comment about some method names. In general, I'm +1 to start a voting. -------------------------------------------------------------------------------------------------- Hi Becket, I agree we shouldn't duplicate codes, especiall the runtime implementations. However, the interfaces proposed by FLIP-95 are mainly used during optimization (compiling), not runtime. I don't think there is much to share for this. Because table/sql is declarative, but DataStream is imperative. For example, filter push down, DataStream FilterableSource may allow to accept a FilterFunction (which is a black box for the source). However, table sources should pick the pushed filter expressions, some sources may only support "=", "<", ">" conditions. Pushing a FilterFunction doesn't work in table ecosystem. That means, the connectors have to have some table-specific implementations. Best, Jark On Tue, 24 Mar 2020 at 20:41, Kurt Young <[hidden email]> wrote: > Hi Becket, > > I don't think DataStream should see some SQL specific concepts such as > Filtering or ComputedColumn. It's > better to stay within SQL area and translate to more generic concept when > translating to DataStream/Runtime > layer, such as use MapFunction to represent computed column logic. > > Best, > Kurt > > > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <[hidden email]> wrote: > > > Hi Timo and Dawid, > > > > It's really great that we have the same goal. I am actually wondering if > we > > can go one step further to avoid some of the interfaces in Table as well. > > > > For example, if we have the FilterableSource, do we still need the > > FilterableTableSource? Should DynamicTableSource just become a > > Source<*Row*, > > SourceSplitT, EnumChkT>? > > > > Can you help me understand a bit more about the reason we need the > > following relational representation / wrapper interfaces v.s. the > > interfaces that we could put to the Source in FLIP-27? > > > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > > SupportsFilterablePushDown v.s. FilterableSource > > SupportsProjectablePushDown v.s. ProjectableSource > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > > ScanTableSource v.s. ChangeLogDeserializer. > > LookUpTableSource v.s. LookUpSource > > > > Assuming we have all the interfaces on the right side, do we still need > the > > interfaces on the left side? Note that the interfaces on the right can be > > used by both DataStream and Table. If we do this, there will only be one > > set of Source interfaces Table and DataStream, the only difference is > that > > the Source for table will have some specific plugins and configurations. > An > > omnipotent Source can implement all the the above interfaces and take a > > Deserializer that implements both ComputedColumnDeserializer and > > ChangeLogDeserializer. > > > > Would the SQL planner work with that? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> > > wrote: > > > > > +1. Thanks Timo for the design doc. > > > > > > We can also consider @Experimental too. But I am +1 to @PublicEvolving, > > we > > > should be confident in the current change. > > > > > > Best, > > > Jingsong Lee > > > > > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> > wrote: > > > > > > > @Becket: We totally agree that we don't need table specific > connectors > > > > during runtime. As Dawid said, the interfaces proposed here are just > > for > > > > communication with the planner. Once the properties (watermarks, > > > > computed column, filters, projecttion etc.) are negotiated, we can > > > > configure a regular Flink connector. > > > > > > > > E.g. setting the watermark assigner and deserialization schema of a > > > > Kafka connector. > > > > > > > > For better separation of concerns, Flink connectors should not > include > > > > relational interfaces and depend on flink-table. This is the > > > > responsibility of table source/sink. > > > > > > > > @Kurt: I would like to mark them @PublicEvolving already because we > > need > > > > to deprecate the old interfaces as early as possible. We cannot > > redirect > > > > to @Internal interfaces. They are not marked @Public, so we can still > > > > evolve them. But a core design shift should not happen again, it > would > > > > leave a bad impression if we are redesign over and over again. > Instead > > > > we should be confident in the current change. > > > > > > > > Regards, > > > > Timo > > > > > > > > > > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > > > > Hi Becket, > > > > > > > > > > Answering your question, we have the same intention not to > duplicate > > > > > connectors between datastream and table apis. The interfaces > proposed > > > in > > > > > the FLIP are a way to describe relational properties of a source. > The > > > > > intention is as you described to translate all of those expressed > as > > > > > expressions or other Table specific structures into a DataStream > > > source. > > > > > In other words I think what we are doing here is in line with what > > you > > > > > described. > > > > > > > > > > Best, > > > > > > > > > > Dawid > > > > > > > > > > On 24/03/2020 02:23, Becket Qin wrote: > > > > >> Hi Timo, > > > > >> > > > > >> Thanks for the proposal. I completely agree that the current Table > > > > >> connectors could be simplified quite a bit. I haven't finished > > reading > > > > >> everything, but here are some quick thoughts. > > > > >> > > > > >> Actually to me the biggest question is why should there be two > > > different > > > > >> connector systems for DataStream and Table? What is the > fundamental > > > > reason > > > > >> that is preventing us from merging them to one? > > > > >> > > > > >> The basic functionality of a connector is to provide capabilities > to > > > do > > > > IO > > > > >> and Serde. Conceptually, Table connectors should just be > DataStream > > > > >> connectors that are dealing with Rows. It seems that quite a few > of > > > the > > > > >> special connector requirements are just a specific way to do IO / > > > Serde. > > > > >> Taking SupportsFilterPushDown as an example, imagine we have the > > > > following > > > > >> interface: > > > > >> > > > > >> interface FilterableSource<PREDICATE> { > > > > >> void applyFilterable(Supplier<PREDICATE> predicate); > > > > >> } > > > > >> > > > > >> And if a ParquetSource would like to support filterable, it will > > > become: > > > > >> > > > > >> class ParquetSource implements Source, > > > > FilterableSource(FilterPredicate> { > > > > >> ... > > > > >> } > > > > >> > > > > >> For Table, one just need to provide an predicate supplier that > > > converts > > > > an > > > > >> Expression to the specified predicate type. This has a few > benefit: > > > > >> 1. Same unified API for filterable for sources, regardless of > > > > DataStream or > > > > >> Table. > > > > >> 2. The DataStream users now can also use the > ExpressionToPredicate > > > > >> supplier if they want to. > > > > >> > > > > >> To summarize, my main point is that I am wondering if it is > possible > > > to > > > > >> have a single set of connector interface for both Table and > > > DataStream, > > > > >> rather than having two hierarchies. I am not 100% sure if this > would > > > > work, > > > > >> but if it works, this would be a huge win from both code > maintenance > > > and > > > > >> user experience perspective. > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Jiangjie (Becket) Qin > > > > >> > > > > >> > > > > >> > > > > >> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > > > > [hidden email]> > > > > >> wrote: > > > > >> > > > > >>> Hi Timo, > > > > >>> > > > > >>> Thank you for the proposal. I think it is an important > improvement > > > that > > > > >>> will benefit many parts of the Table API. The proposal looks > really > > > > good > > > > >>> to me and personally I would be comfortable with voting on the > > > current > > > > >>> state. > > > > >>> > > > > >>> Best, > > > > >>> > > > > >>> Dawid > > > > >>> > > > > >>> On 23/03/2020 18:53, Timo Walther wrote: > > > > >>>> Hi everyone, > > > > >>>> > > > > >>>> I received some questions around how the new interfaces play > > > together > > > > >>>> with formats and their factories. > > > > >>>> > > > > >>>> Furthermore, for MySQL or Postgres CDC logs, the format should > be > > > able > > > > >>>> to return a `ChangelogMode`. > > > > >>>> > > > > >>>> Also, I incorporated the feedback around the factory design in > > > > general. > > > > >>>> > > > > >>>> I added a new section `Factory Interfaces` to the design > document. > > > > >>>> This should be helpful to understand the big picture and > > connecting > > > > >>>> the concepts. > > > > >>>> > > > > >>>> Please let me know what you think? > > > > >>>> > > > > >>>> Thanks, > > > > >>>> Timo > > > > >>>> > > > > >>>> > > > > >>>> On 18.03.20 13:43, Timo Walther wrote: > > > > >>>>> Hi Benchao, > > > > >>>>> > > > > >>>>> this is a very good question. I will update the FLIP about > this. > > > > >>>>> > > > > >>>>> The legacy planner will not support the new interfaces. It will > > > only > > > > >>>>> support the old interfaces. With the next release, I think the > > > Blink > > > > >>>>> planner is stable enough to be the default one as well. > > > > >>>>> > > > > >>>>> Regards, > > > > >>>>> Timo > > > > >>>>> > > > > >>>>> On 18.03.20 08:45, Benchao Li wrote: > > > > >>>>>> Hi Timo, > > > > >>>>>> > > > > >>>>>> Thank you and others for the efforts to prepare this FLIP. > > > > >>>>>> > > > > >>>>>> The FLIP LGTM generally. > > > > >>>>>> > > > > >>>>>> +1 for moving blink data structures to table-common, it's > useful > > > to > > > > >>>>>> udf too > > > > >>>>>> in the future. > > > > >>>>>> A little question is, do we plan to support the new interfaces > > and > > > > data > > > > >>>>>> types in legacy planner? > > > > >>>>>> Or we only plan to support these new interfaces in blink > > planner. > > > > >>>>>> > > > > >>>>>> And using primary keys from DDL instead of derived key > > information > > > > from > > > > >>>>>> each query is also a good idea, > > > > >>>>>> we met some use cases where this does not works very well > > before. > > > > >>>>>> > > > > >>>>>> This FLIP also makes the dependencies of table modules more > > > clear, I > > > > >>>>>> like > > > > >>>>>> it very much. > > > > >>>>>> > > > > >>>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > > > > >>>>>> > > > > >>>>>>> Hi everyone, > > > > >>>>>>> > > > > >>>>>>> I'm happy to present the results of long discussions that we > > had > > > > >>>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and > many > > > > more > > > > >>>>>>> have contributed to this design document. > > > > >>>>>>> > > > > >>>>>>> We would like to propose new long-term table source and table > > > sink > > > > >>>>>>> interfaces: > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > > > >>>>>>> > > > > >>>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. > > > > >>>>>>> > > > > >>>>>>> The goals of this FLIP are: > > > > >>>>>>> > > > > >>>>>>> - Simplify the current interface architecture: > > > > >>>>>>> - Merge upsert, retract, and append sinks. > > > > >>>>>>> - Unify batch and streaming sources. > > > > >>>>>>> - Unify batch and streaming sinks. > > > > >>>>>>> > > > > >>>>>>> - Allow sources to produce a changelog: > > > > >>>>>>> - UpsertTableSources have been requested a lot by > users. > > > Now > > > > >>>>>>> is the > > > > >>>>>>> time to open the internal planner capabilities via the new > > > > interfaces. > > > > >>>>>>> - According to FLIP-105, we would like to support > > > > changelogs for > > > > >>>>>>> processing formats such as Debezium. > > > > >>>>>>> > > > > >>>>>>> - Don't rely on DataStream API for source and sinks: > > > > >>>>>>> - According to FLIP-32, the Table API and SQL should > be > > > > >>>>>>> independent > > > > >>>>>>> of the DataStream API which is why the `table-common` module > > has > > > no > > > > >>>>>>> dependencies on `flink-streaming-java`. > > > > >>>>>>> - Source and sink implementations should only depend > on > > > the > > > > >>>>>>> `table-common` module after FLIP-27. > > > > >>>>>>> - Until FLIP-27 is ready, we still put most of the > > > > interfaces in > > > > >>>>>>> `table-common` and strictly separate interfaces that > > communicate > > > > >>>>>>> with a > > > > >>>>>>> planner and actual runtime reader/writers. > > > > >>>>>>> > > > > >>>>>>> - Implement efficient sources and sinks without planner > > > > dependencies: > > > > >>>>>>> - Make Blink's internal data structures available to > > > > connectors. > > > > >>>>>>> - Introduce stable interfaces for data structures that > > can > > > > be > > > > >>>>>>> marked as `@PublicEvolving`. > > > > >>>>>>> - Only require dependencies on `flink-table-common` in > > the > > > > >>>>>>> future > > > > >>>>>>> > > > > >>>>>>> It finalizes the concept of dynamic tables and consideres how > > all > > > > >>>>>>> source/sink related classes play together. > > > > >>>>>>> > > > > >>>>>>> We look forward to your feedback. > > > > >>>>>>> > > > > >>>>>>> Regards, > > > > >>>>>>> Timo > > > > >>>>>>> > > > > >>>>>> > > > > >>> > > > > > > > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > |
Hey Kurt,
I don't think DataStream should see some SQL specific concepts such as > Filtering or ComputedColumn. Projectable and Filterable seems not necessarily SQL concepts, but could be applicable to DataStream source as well to reduce the network load. For example ORC and Parquet should probably also be readable from DataStream, right? ComputedColumn is not part of the Source, it is an interface extends the Deserializer, which is a pluggable for the Source. From the SQL's perspective it has the concept of computed column, but from the Source perspective, It is essentially a Deserializer which also converts the records internally, assuming we allow some conversion to be embedded to the source in addition to just deserialization. Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <[hidden email]> wrote: > Thanks Timo for updating the formats section. That would be very helpful > for changelog supporting (FLIP-105). > > I just left 2 minor comment about some method names. In general, I'm +1 to > start a voting. > > > -------------------------------------------------------------------------------------------------- > > Hi Becket, > > I agree we shouldn't duplicate codes, especiall the runtime > implementations. > However, the interfaces proposed by FLIP-95 are mainly used during > optimization (compiling), not runtime. > I don't think there is much to share for this. Because table/sql > is declarative, but DataStream is imperative. > For example, filter push down, DataStream FilterableSource may allow to > accept a FilterFunction (which is a black box for the source). > However, table sources should pick the pushed filter expressions, some > sources may only support "=", "<", ">" conditions. > Pushing a FilterFunction doesn't work in table ecosystem. That means, the > connectors have to have some table-specific implementations. > > > Best, > Jark > > On Tue, 24 Mar 2020 at 20:41, Kurt Young <[hidden email]> wrote: > > > Hi Becket, > > > > I don't think DataStream should see some SQL specific concepts such as > > Filtering or ComputedColumn. It's > > better to stay within SQL area and translate to more generic concept when > > translating to DataStream/Runtime > > layer, such as use MapFunction to represent computed column logic. > > > > Best, > > Kurt > > > > > > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <[hidden email]> wrote: > > > > > Hi Timo and Dawid, > > > > > > It's really great that we have the same goal. I am actually wondering > if > > we > > > can go one step further to avoid some of the interfaces in Table as > well. > > > > > > For example, if we have the FilterableSource, do we still need the > > > FilterableTableSource? Should DynamicTableSource just become a > > > Source<*Row*, > > > SourceSplitT, EnumChkT>? > > > > > > Can you help me understand a bit more about the reason we need the > > > following relational representation / wrapper interfaces v.s. the > > > interfaces that we could put to the Source in FLIP-27? > > > > > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > > > SupportsFilterablePushDown v.s. FilterableSource > > > SupportsProjectablePushDown v.s. ProjectableSource > > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > > > ScanTableSource v.s. ChangeLogDeserializer. > > > LookUpTableSource v.s. LookUpSource > > > > > > Assuming we have all the interfaces on the right side, do we still need > > the > > > interfaces on the left side? Note that the interfaces on the right can > be > > > used by both DataStream and Table. If we do this, there will only be > one > > > set of Source interfaces Table and DataStream, the only difference is > > that > > > the Source for table will have some specific plugins and > configurations. > > An > > > omnipotent Source can implement all the the above interfaces and take a > > > Deserializer that implements both ComputedColumnDeserializer and > > > ChangeLogDeserializer. > > > > > > Would the SQL planner work with that? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> > > > wrote: > > > > > > > +1. Thanks Timo for the design doc. > > > > > > > > We can also consider @Experimental too. But I am +1 to > @PublicEvolving, > > > we > > > > should be confident in the current change. > > > > > > > > Best, > > > > Jingsong Lee > > > > > > > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> > > wrote: > > > > > > > > > @Becket: We totally agree that we don't need table specific > > connectors > > > > > during runtime. As Dawid said, the interfaces proposed here are > just > > > for > > > > > communication with the planner. Once the properties (watermarks, > > > > > computed column, filters, projecttion etc.) are negotiated, we can > > > > > configure a regular Flink connector. > > > > > > > > > > E.g. setting the watermark assigner and deserialization schema of a > > > > > Kafka connector. > > > > > > > > > > For better separation of concerns, Flink connectors should not > > include > > > > > relational interfaces and depend on flink-table. This is the > > > > > responsibility of table source/sink. > > > > > > > > > > @Kurt: I would like to mark them @PublicEvolving already because we > > > need > > > > > to deprecate the old interfaces as early as possible. We cannot > > > redirect > > > > > to @Internal interfaces. They are not marked @Public, so we can > still > > > > > evolve them. But a core design shift should not happen again, it > > would > > > > > leave a bad impression if we are redesign over and over again. > > Instead > > > > > we should be confident in the current change. > > > > > > > > > > Regards, > > > > > Timo > > > > > > > > > > > > > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > > > > > Hi Becket, > > > > > > > > > > > > Answering your question, we have the same intention not to > > duplicate > > > > > > connectors between datastream and table apis. The interfaces > > proposed > > > > in > > > > > > the FLIP are a way to describe relational properties of a source. > > The > > > > > > intention is as you described to translate all of those expressed > > as > > > > > > expressions or other Table specific structures into a DataStream > > > > source. > > > > > > In other words I think what we are doing here is in line with > what > > > you > > > > > > described. > > > > > > > > > > > > Best, > > > > > > > > > > > > Dawid > > > > > > > > > > > > On 24/03/2020 02:23, Becket Qin wrote: > > > > > >> Hi Timo, > > > > > >> > > > > > >> Thanks for the proposal. I completely agree that the current > Table > > > > > >> connectors could be simplified quite a bit. I haven't finished > > > reading > > > > > >> everything, but here are some quick thoughts. > > > > > >> > > > > > >> Actually to me the biggest question is why should there be two > > > > different > > > > > >> connector systems for DataStream and Table? What is the > > fundamental > > > > > reason > > > > > >> that is preventing us from merging them to one? > > > > > >> > > > > > >> The basic functionality of a connector is to provide > capabilities > > to > > > > do > > > > > IO > > > > > >> and Serde. Conceptually, Table connectors should just be > > DataStream > > > > > >> connectors that are dealing with Rows. It seems that quite a few > > of > > > > the > > > > > >> special connector requirements are just a specific way to do IO > / > > > > Serde. > > > > > >> Taking SupportsFilterPushDown as an example, imagine we have the > > > > > following > > > > > >> interface: > > > > > >> > > > > > >> interface FilterableSource<PREDICATE> { > > > > > >> void applyFilterable(Supplier<PREDICATE> predicate); > > > > > >> } > > > > > >> > > > > > >> And if a ParquetSource would like to support filterable, it will > > > > become: > > > > > >> > > > > > >> class ParquetSource implements Source, > > > > > FilterableSource(FilterPredicate> { > > > > > >> ... > > > > > >> } > > > > > >> > > > > > >> For Table, one just need to provide an predicate supplier that > > > > converts > > > > > an > > > > > >> Expression to the specified predicate type. This has a few > > benefit: > > > > > >> 1. Same unified API for filterable for sources, regardless of > > > > > DataStream or > > > > > >> Table. > > > > > >> 2. The DataStream users now can also use the > > ExpressionToPredicate > > > > > >> supplier if they want to. > > > > > >> > > > > > >> To summarize, my main point is that I am wondering if it is > > possible > > > > to > > > > > >> have a single set of connector interface for both Table and > > > > DataStream, > > > > > >> rather than having two hierarchies. I am not 100% sure if this > > would > > > > > work, > > > > > >> but if it works, this would be a huge win from both code > > maintenance > > > > and > > > > > >> user experience perspective. > > > > > >> > > > > > >> Thanks, > > > > > >> > > > > > >> Jiangjie (Becket) Qin > > > > > >> > > > > > >> > > > > > >> > > > > > >> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > > > > > [hidden email]> > > > > > >> wrote: > > > > > >> > > > > > >>> Hi Timo, > > > > > >>> > > > > > >>> Thank you for the proposal. I think it is an important > > improvement > > > > that > > > > > >>> will benefit many parts of the Table API. The proposal looks > > really > > > > > good > > > > > >>> to me and personally I would be comfortable with voting on the > > > > current > > > > > >>> state. > > > > > >>> > > > > > >>> Best, > > > > > >>> > > > > > >>> Dawid > > > > > >>> > > > > > >>> On 23/03/2020 18:53, Timo Walther wrote: > > > > > >>>> Hi everyone, > > > > > >>>> > > > > > >>>> I received some questions around how the new interfaces play > > > > together > > > > > >>>> with formats and their factories. > > > > > >>>> > > > > > >>>> Furthermore, for MySQL or Postgres CDC logs, the format should > > be > > > > able > > > > > >>>> to return a `ChangelogMode`. > > > > > >>>> > > > > > >>>> Also, I incorporated the feedback around the factory design in > > > > > general. > > > > > >>>> > > > > > >>>> I added a new section `Factory Interfaces` to the design > > document. > > > > > >>>> This should be helpful to understand the big picture and > > > connecting > > > > > >>>> the concepts. > > > > > >>>> > > > > > >>>> Please let me know what you think? > > > > > >>>> > > > > > >>>> Thanks, > > > > > >>>> Timo > > > > > >>>> > > > > > >>>> > > > > > >>>> On 18.03.20 13:43, Timo Walther wrote: > > > > > >>>>> Hi Benchao, > > > > > >>>>> > > > > > >>>>> this is a very good question. I will update the FLIP about > > this. > > > > > >>>>> > > > > > >>>>> The legacy planner will not support the new interfaces. It > will > > > > only > > > > > >>>>> support the old interfaces. With the next release, I think > the > > > > Blink > > > > > >>>>> planner is stable enough to be the default one as well. > > > > > >>>>> > > > > > >>>>> Regards, > > > > > >>>>> Timo > > > > > >>>>> > > > > > >>>>> On 18.03.20 08:45, Benchao Li wrote: > > > > > >>>>>> Hi Timo, > > > > > >>>>>> > > > > > >>>>>> Thank you and others for the efforts to prepare this FLIP. > > > > > >>>>>> > > > > > >>>>>> The FLIP LGTM generally. > > > > > >>>>>> > > > > > >>>>>> +1 for moving blink data structures to table-common, it's > > useful > > > > to > > > > > >>>>>> udf too > > > > > >>>>>> in the future. > > > > > >>>>>> A little question is, do we plan to support the new > interfaces > > > and > > > > > data > > > > > >>>>>> types in legacy planner? > > > > > >>>>>> Or we only plan to support these new interfaces in blink > > > planner. > > > > > >>>>>> > > > > > >>>>>> And using primary keys from DDL instead of derived key > > > information > > > > > from > > > > > >>>>>> each query is also a good idea, > > > > > >>>>>> we met some use cases where this does not works very well > > > before. > > > > > >>>>>> > > > > > >>>>>> This FLIP also makes the dependencies of table modules more > > > > clear, I > > > > > >>>>>> like > > > > > >>>>>> it very much. > > > > > >>>>>> > > > > > >>>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: > > > > > >>>>>> > > > > > >>>>>>> Hi everyone, > > > > > >>>>>>> > > > > > >>>>>>> I'm happy to present the results of long discussions that > we > > > had > > > > > >>>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and > > many > > > > > more > > > > > >>>>>>> have contributed to this design document. > > > > > >>>>>>> > > > > > >>>>>>> We would like to propose new long-term table source and > table > > > > sink > > > > > >>>>>>> interfaces: > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > > > > >>>>>>> > > > > > >>>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. > > > > > >>>>>>> > > > > > >>>>>>> The goals of this FLIP are: > > > > > >>>>>>> > > > > > >>>>>>> - Simplify the current interface architecture: > > > > > >>>>>>> - Merge upsert, retract, and append sinks. > > > > > >>>>>>> - Unify batch and streaming sources. > > > > > >>>>>>> - Unify batch and streaming sinks. > > > > > >>>>>>> > > > > > >>>>>>> - Allow sources to produce a changelog: > > > > > >>>>>>> - UpsertTableSources have been requested a lot by > > users. > > > > Now > > > > > >>>>>>> is the > > > > > >>>>>>> time to open the internal planner capabilities via the new > > > > > interfaces. > > > > > >>>>>>> - According to FLIP-105, we would like to support > > > > > changelogs for > > > > > >>>>>>> processing formats such as Debezium. > > > > > >>>>>>> > > > > > >>>>>>> - Don't rely on DataStream API for source and sinks: > > > > > >>>>>>> - According to FLIP-32, the Table API and SQL should > > be > > > > > >>>>>>> independent > > > > > >>>>>>> of the DataStream API which is why the `table-common` > module > > > has > > > > no > > > > > >>>>>>> dependencies on `flink-streaming-java`. > > > > > >>>>>>> - Source and sink implementations should only depend > > on > > > > the > > > > > >>>>>>> `table-common` module after FLIP-27. > > > > > >>>>>>> - Until FLIP-27 is ready, we still put most of the > > > > > interfaces in > > > > > >>>>>>> `table-common` and strictly separate interfaces that > > > communicate > > > > > >>>>>>> with a > > > > > >>>>>>> planner and actual runtime reader/writers. > > > > > >>>>>>> > > > > > >>>>>>> - Implement efficient sources and sinks without planner > > > > > dependencies: > > > > > >>>>>>> - Make Blink's internal data structures available to > > > > > connectors. > > > > > >>>>>>> - Introduce stable interfaces for data structures > that > > > can > > > > > be > > > > > >>>>>>> marked as `@PublicEvolving`. > > > > > >>>>>>> - Only require dependencies on `flink-table-common` > in > > > the > > > > > >>>>>>> future > > > > > >>>>>>> > > > > > >>>>>>> It finalizes the concept of dynamic tables and consideres > how > > > all > > > > > >>>>>>> source/sink related classes play together. > > > > > >>>>>>> > > > > > >>>>>>> We look forward to your feedback. > > > > > >>>>>>> > > > > > >>>>>>> Regards, > > > > > >>>>>>> Timo > > > > > >>>>>>> > > > > > >>>>>> > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, Jingsong Lee > > > > > > > > > > |
Hi Jark,
However, the interfaces proposed by FLIP-95 are mainly used during > optimization (compiling), not runtime. Yes, I am aware of that, I am wondering whether the SQL planner can use the counterpart interface in the Source to apply the optimizations. It seems should also work, right? If we have another set of interfaces for Table Source, all the connector must expose two different sets of interfaces if they want to make it work for both DataStream and Table. This will be a per-connector work, regardless of how different Flink uses these interfaces internally. And we will face the problem that sometimes a Source is available for DataStream but not Table, which sounds weird. If we can make the single Source interface work for SQL, the story is much simpler. Once you have that Source implemented, the minimum thing needed to make it work for Table is pretty much a SchemaDeserializer<Row>, anything else will be some optional additional feature. For example, filter push down, DataStream FilterableSource may allow to > accept a FilterFunction (which is a black box for the source). > However, table sources should pick the pushed filter expressions, some > sources may only support "=", "<", ">" conditions. > Pushing a FilterFunction doesn't work in table ecosystem. That means, the > connectors have to have some table-specific implementations. Can you articulate a bit more on this? For filterable there might be two cases: 1. The filter predicate is source specific, i.e. the source needs to understand it. Such filtering usually improves the performance by leveraging some index / reducing the network traffic. 2. The filter predicate is source agnostic, i.e. the source does not understand it. Such filtering happens in the Source itself after the records have been read from the remote system. By FilterableSource, I meant the first case. So a Supplier<FilterPredicate> can be provided to convert from SQL expression to the required FilterPredicate. And that FilterPredicate will be used by the Source. For the second case, I'd say it should be part of the Deserializer, again assuming the Deserializer can also have some parsing logic in addition to its basic function. Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 10:03 PM Becket Qin <[hidden email]> wrote: > Hey Kurt, > > I don't think DataStream should see some SQL specific concepts such as >> Filtering or ComputedColumn. > > > Projectable and Filterable seems not necessarily SQL concepts, but could > be applicable to DataStream source as well to reduce the network load. For > example ORC and Parquet should probably also be readable from DataStream, > right? > > ComputedColumn is not part of the Source, it is an interface extends the > Deserializer, which is a pluggable for the Source. From the SQL's > perspective it has the concept of computed column, but from the Source > perspective, It is essentially a Deserializer which also converts the > records internally, assuming we allow some conversion to be embedded to > the source in addition to just deserialization. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <[hidden email]> wrote: > >> Thanks Timo for updating the formats section. That would be very helpful >> for changelog supporting (FLIP-105). >> >> I just left 2 minor comment about some method names. In general, I'm +1 to >> start a voting. >> >> >> -------------------------------------------------------------------------------------------------- >> >> Hi Becket, >> >> I agree we shouldn't duplicate codes, especiall the runtime >> implementations. >> However, the interfaces proposed by FLIP-95 are mainly used during >> optimization (compiling), not runtime. >> I don't think there is much to share for this. Because table/sql >> is declarative, but DataStream is imperative. >> For example, filter push down, DataStream FilterableSource may allow to >> accept a FilterFunction (which is a black box for the source). >> However, table sources should pick the pushed filter expressions, some >> sources may only support "=", "<", ">" conditions. >> Pushing a FilterFunction doesn't work in table ecosystem. That means, the >> connectors have to have some table-specific implementations. >> >> >> Best, >> Jark >> >> On Tue, 24 Mar 2020 at 20:41, Kurt Young <[hidden email]> wrote: >> >> > Hi Becket, >> > >> > I don't think DataStream should see some SQL specific concepts such as >> > Filtering or ComputedColumn. It's >> > better to stay within SQL area and translate to more generic concept >> when >> > translating to DataStream/Runtime >> > layer, such as use MapFunction to represent computed column logic. >> > >> > Best, >> > Kurt >> > >> > >> > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <[hidden email]> >> wrote: >> > >> > > Hi Timo and Dawid, >> > > >> > > It's really great that we have the same goal. I am actually wondering >> if >> > we >> > > can go one step further to avoid some of the interfaces in Table as >> well. >> > > >> > > For example, if we have the FilterableSource, do we still need the >> > > FilterableTableSource? Should DynamicTableSource just become a >> > > Source<*Row*, >> > > SourceSplitT, EnumChkT>? >> > > >> > > Can you help me understand a bit more about the reason we need the >> > > following relational representation / wrapper interfaces v.s. the >> > > interfaces that we could put to the Source in FLIP-27? >> > > >> > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> >> > > SupportsFilterablePushDown v.s. FilterableSource >> > > SupportsProjectablePushDown v.s. ProjectableSource >> > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner >> > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer >> > > ScanTableSource v.s. ChangeLogDeserializer. >> > > LookUpTableSource v.s. LookUpSource >> > > >> > > Assuming we have all the interfaces on the right side, do we still >> need >> > the >> > > interfaces on the left side? Note that the interfaces on the right >> can be >> > > used by both DataStream and Table. If we do this, there will only be >> one >> > > set of Source interfaces Table and DataStream, the only difference is >> > that >> > > the Source for table will have some specific plugins and >> configurations. >> > An >> > > omnipotent Source can implement all the the above interfaces and take >> a >> > > Deserializer that implements both ComputedColumnDeserializer and >> > > ChangeLogDeserializer. >> > > >> > > Would the SQL planner work with that? >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > >> > > >> > > >> > > >> > > >> > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> >> > > wrote: >> > > >> > > > +1. Thanks Timo for the design doc. >> > > > >> > > > We can also consider @Experimental too. But I am +1 to >> @PublicEvolving, >> > > we >> > > > should be confident in the current change. >> > > > >> > > > Best, >> > > > Jingsong Lee >> > > > >> > > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> >> > wrote: >> > > > >> > > > > @Becket: We totally agree that we don't need table specific >> > connectors >> > > > > during runtime. As Dawid said, the interfaces proposed here are >> just >> > > for >> > > > > communication with the planner. Once the properties (watermarks, >> > > > > computed column, filters, projecttion etc.) are negotiated, we can >> > > > > configure a regular Flink connector. >> > > > > >> > > > > E.g. setting the watermark assigner and deserialization schema of >> a >> > > > > Kafka connector. >> > > > > >> > > > > For better separation of concerns, Flink connectors should not >> > include >> > > > > relational interfaces and depend on flink-table. This is the >> > > > > responsibility of table source/sink. >> > > > > >> > > > > @Kurt: I would like to mark them @PublicEvolving already because >> we >> > > need >> > > > > to deprecate the old interfaces as early as possible. We cannot >> > > redirect >> > > > > to @Internal interfaces. They are not marked @Public, so we can >> still >> > > > > evolve them. But a core design shift should not happen again, it >> > would >> > > > > leave a bad impression if we are redesign over and over again. >> > Instead >> > > > > we should be confident in the current change. >> > > > > >> > > > > Regards, >> > > > > Timo >> > > > > >> > > > > >> > > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: >> > > > > > Hi Becket, >> > > > > > >> > > > > > Answering your question, we have the same intention not to >> > duplicate >> > > > > > connectors between datastream and table apis. The interfaces >> > proposed >> > > > in >> > > > > > the FLIP are a way to describe relational properties of a >> source. >> > The >> > > > > > intention is as you described to translate all of those >> expressed >> > as >> > > > > > expressions or other Table specific structures into a DataStream >> > > > source. >> > > > > > In other words I think what we are doing here is in line with >> what >> > > you >> > > > > > described. >> > > > > > >> > > > > > Best, >> > > > > > >> > > > > > Dawid >> > > > > > >> > > > > > On 24/03/2020 02:23, Becket Qin wrote: >> > > > > >> Hi Timo, >> > > > > >> >> > > > > >> Thanks for the proposal. I completely agree that the current >> Table >> > > > > >> connectors could be simplified quite a bit. I haven't finished >> > > reading >> > > > > >> everything, but here are some quick thoughts. >> > > > > >> >> > > > > >> Actually to me the biggest question is why should there be two >> > > > different >> > > > > >> connector systems for DataStream and Table? What is the >> > fundamental >> > > > > reason >> > > > > >> that is preventing us from merging them to one? >> > > > > >> >> > > > > >> The basic functionality of a connector is to provide >> capabilities >> > to >> > > > do >> > > > > IO >> > > > > >> and Serde. Conceptually, Table connectors should just be >> > DataStream >> > > > > >> connectors that are dealing with Rows. It seems that quite a >> few >> > of >> > > > the >> > > > > >> special connector requirements are just a specific way to do >> IO / >> > > > Serde. >> > > > > >> Taking SupportsFilterPushDown as an example, imagine we have >> the >> > > > > following >> > > > > >> interface: >> > > > > >> >> > > > > >> interface FilterableSource<PREDICATE> { >> > > > > >> void applyFilterable(Supplier<PREDICATE> predicate); >> > > > > >> } >> > > > > >> >> > > > > >> And if a ParquetSource would like to support filterable, it >> will >> > > > become: >> > > > > >> >> > > > > >> class ParquetSource implements Source, >> > > > > FilterableSource(FilterPredicate> { >> > > > > >> ... >> > > > > >> } >> > > > > >> >> > > > > >> For Table, one just need to provide an predicate supplier that >> > > > converts >> > > > > an >> > > > > >> Expression to the specified predicate type. This has a few >> > benefit: >> > > > > >> 1. Same unified API for filterable for sources, regardless of >> > > > > DataStream or >> > > > > >> Table. >> > > > > >> 2. The DataStream users now can also use the >> > ExpressionToPredicate >> > > > > >> supplier if they want to. >> > > > > >> >> > > > > >> To summarize, my main point is that I am wondering if it is >> > possible >> > > > to >> > > > > >> have a single set of connector interface for both Table and >> > > > DataStream, >> > > > > >> rather than having two hierarchies. I am not 100% sure if this >> > would >> > > > > work, >> > > > > >> but if it works, this would be a huge win from both code >> > maintenance >> > > > and >> > > > > >> user experience perspective. >> > > > > >> >> > > > > >> Thanks, >> > > > > >> >> > > > > >> Jiangjie (Becket) Qin >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < >> > > > > [hidden email]> >> > > > > >> wrote: >> > > > > >> >> > > > > >>> Hi Timo, >> > > > > >>> >> > > > > >>> Thank you for the proposal. I think it is an important >> > improvement >> > > > that >> > > > > >>> will benefit many parts of the Table API. The proposal looks >> > really >> > > > > good >> > > > > >>> to me and personally I would be comfortable with voting on the >> > > > current >> > > > > >>> state. >> > > > > >>> >> > > > > >>> Best, >> > > > > >>> >> > > > > >>> Dawid >> > > > > >>> >> > > > > >>> On 23/03/2020 18:53, Timo Walther wrote: >> > > > > >>>> Hi everyone, >> > > > > >>>> >> > > > > >>>> I received some questions around how the new interfaces play >> > > > together >> > > > > >>>> with formats and their factories. >> > > > > >>>> >> > > > > >>>> Furthermore, for MySQL or Postgres CDC logs, the format >> should >> > be >> > > > able >> > > > > >>>> to return a `ChangelogMode`. >> > > > > >>>> >> > > > > >>>> Also, I incorporated the feedback around the factory design >> in >> > > > > general. >> > > > > >>>> >> > > > > >>>> I added a new section `Factory Interfaces` to the design >> > document. >> > > > > >>>> This should be helpful to understand the big picture and >> > > connecting >> > > > > >>>> the concepts. >> > > > > >>>> >> > > > > >>>> Please let me know what you think? >> > > > > >>>> >> > > > > >>>> Thanks, >> > > > > >>>> Timo >> > > > > >>>> >> > > > > >>>> >> > > > > >>>> On 18.03.20 13:43, Timo Walther wrote: >> > > > > >>>>> Hi Benchao, >> > > > > >>>>> >> > > > > >>>>> this is a very good question. I will update the FLIP about >> > this. >> > > > > >>>>> >> > > > > >>>>> The legacy planner will not support the new interfaces. It >> will >> > > > only >> > > > > >>>>> support the old interfaces. With the next release, I think >> the >> > > > Blink >> > > > > >>>>> planner is stable enough to be the default one as well. >> > > > > >>>>> >> > > > > >>>>> Regards, >> > > > > >>>>> Timo >> > > > > >>>>> >> > > > > >>>>> On 18.03.20 08:45, Benchao Li wrote: >> > > > > >>>>>> Hi Timo, >> > > > > >>>>>> >> > > > > >>>>>> Thank you and others for the efforts to prepare this FLIP. >> > > > > >>>>>> >> > > > > >>>>>> The FLIP LGTM generally. >> > > > > >>>>>> >> > > > > >>>>>> +1 for moving blink data structures to table-common, it's >> > useful >> > > > to >> > > > > >>>>>> udf too >> > > > > >>>>>> in the future. >> > > > > >>>>>> A little question is, do we plan to support the new >> interfaces >> > > and >> > > > > data >> > > > > >>>>>> types in legacy planner? >> > > > > >>>>>> Or we only plan to support these new interfaces in blink >> > > planner. >> > > > > >>>>>> >> > > > > >>>>>> And using primary keys from DDL instead of derived key >> > > information >> > > > > from >> > > > > >>>>>> each query is also a good idea, >> > > > > >>>>>> we met some use cases where this does not works very well >> > > before. >> > > > > >>>>>> >> > > > > >>>>>> This FLIP also makes the dependencies of table modules more >> > > > clear, I >> > > > > >>>>>> like >> > > > > >>>>>> it very much. >> > > > > >>>>>> >> > > > > >>>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: >> > > > > >>>>>> >> > > > > >>>>>>> Hi everyone, >> > > > > >>>>>>> >> > > > > >>>>>>> I'm happy to present the results of long discussions that >> we >> > > had >> > > > > >>>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and >> > many >> > > > > more >> > > > > >>>>>>> have contributed to this design document. >> > > > > >>>>>>> >> > > > > >>>>>>> We would like to propose new long-term table source and >> table >> > > > sink >> > > > > >>>>>>> interfaces: >> > > > > >>>>>>> >> > > > > >>>>>>> >> > > > > >>>>>>> >> > > > > >>> >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >> > > > > >>>>>>> >> > > > > >>>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. >> > > > > >>>>>>> >> > > > > >>>>>>> The goals of this FLIP are: >> > > > > >>>>>>> >> > > > > >>>>>>> - Simplify the current interface architecture: >> > > > > >>>>>>> - Merge upsert, retract, and append sinks. >> > > > > >>>>>>> - Unify batch and streaming sources. >> > > > > >>>>>>> - Unify batch and streaming sinks. >> > > > > >>>>>>> >> > > > > >>>>>>> - Allow sources to produce a changelog: >> > > > > >>>>>>> - UpsertTableSources have been requested a lot by >> > users. >> > > > Now >> > > > > >>>>>>> is the >> > > > > >>>>>>> time to open the internal planner capabilities via the new >> > > > > interfaces. >> > > > > >>>>>>> - According to FLIP-105, we would like to support >> > > > > changelogs for >> > > > > >>>>>>> processing formats such as Debezium. >> > > > > >>>>>>> >> > > > > >>>>>>> - Don't rely on DataStream API for source and sinks: >> > > > > >>>>>>> - According to FLIP-32, the Table API and SQL >> should >> > be >> > > > > >>>>>>> independent >> > > > > >>>>>>> of the DataStream API which is why the `table-common` >> module >> > > has >> > > > no >> > > > > >>>>>>> dependencies on `flink-streaming-java`. >> > > > > >>>>>>> - Source and sink implementations should only >> depend >> > on >> > > > the >> > > > > >>>>>>> `table-common` module after FLIP-27. >> > > > > >>>>>>> - Until FLIP-27 is ready, we still put most of the >> > > > > interfaces in >> > > > > >>>>>>> `table-common` and strictly separate interfaces that >> > > communicate >> > > > > >>>>>>> with a >> > > > > >>>>>>> planner and actual runtime reader/writers. >> > > > > >>>>>>> >> > > > > >>>>>>> - Implement efficient sources and sinks without planner >> > > > > dependencies: >> > > > > >>>>>>> - Make Blink's internal data structures available >> to >> > > > > connectors. >> > > > > >>>>>>> - Introduce stable interfaces for data structures >> that >> > > can >> > > > > be >> > > > > >>>>>>> marked as `@PublicEvolving`. >> > > > > >>>>>>> - Only require dependencies on >> `flink-table-common` in >> > > the >> > > > > >>>>>>> future >> > > > > >>>>>>> >> > > > > >>>>>>> It finalizes the concept of dynamic tables and consideres >> how >> > > all >> > > > > >>>>>>> source/sink related classes play together. >> > > > > >>>>>>> >> > > > > >>>>>>> We look forward to your feedback. >> > > > > >>>>>>> >> > > > > >>>>>>> Regards, >> > > > > >>>>>>> Timo >> > > > > >>>>>>> >> > > > > >>>>>> >> > > > > >>> >> > > > > > >> > > > > >> > > > > >> > > > >> > > > -- >> > > > Best, Jingsong Lee >> > > > >> > > >> > >> > |
In reply to this post by Becket Qin
Hi Becket,
it is true that concepts such as projection and filtering are worth having in DataStream API as well. And a SourceFunction can provide interfaces for those concepts. In the table related classes we will generate runtime classes that adhere to those interfaces and deal with RowData (possibly code generated and on binary data). We consider everything in DataStream API as "runtime" classes. It is an orthogonal discussion which components a SourceFunction needs. Currently, I could imagine something like: SourceFunction(FlatMapFunction, WatermarkAssigner) Once we have completed this discussion, the table sources/and sinks will generate FlatMapFunction (out of projection, filter, and computed columns) and WatermarkAssigner (out of computed columns). Until now we are using the old interfaces. Regards, Timo On 24.03.20 15:03, Becket Qin wrote: > Hey Kurt, > > I don't think DataStream should see some SQL specific concepts such as >> Filtering or ComputedColumn. > > > Projectable and Filterable seems not necessarily SQL concepts, but could be > applicable to DataStream source as well to reduce the network load. For > example ORC and Parquet should probably also be readable from DataStream, > right? > > ComputedColumn is not part of the Source, it is an interface extends the > Deserializer, which is a pluggable for the Source. From the SQL's > perspective it has the concept of computed column, but from the Source > perspective, It is essentially a Deserializer which also converts the > records internally, assuming we allow some conversion to be embedded to > the source in addition to just deserialization. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <[hidden email]> wrote: > >> Thanks Timo for updating the formats section. That would be very helpful >> for changelog supporting (FLIP-105). >> >> I just left 2 minor comment about some method names. In general, I'm +1 to >> start a voting. >> >> >> -------------------------------------------------------------------------------------------------- >> >> Hi Becket, >> >> I agree we shouldn't duplicate codes, especiall the runtime >> implementations. >> However, the interfaces proposed by FLIP-95 are mainly used during >> optimization (compiling), not runtime. >> I don't think there is much to share for this. Because table/sql >> is declarative, but DataStream is imperative. >> For example, filter push down, DataStream FilterableSource may allow to >> accept a FilterFunction (which is a black box for the source). >> However, table sources should pick the pushed filter expressions, some >> sources may only support "=", "<", ">" conditions. >> Pushing a FilterFunction doesn't work in table ecosystem. That means, the >> connectors have to have some table-specific implementations. >> >> >> Best, >> Jark >> >> On Tue, 24 Mar 2020 at 20:41, Kurt Young <[hidden email]> wrote: >> >>> Hi Becket, >>> >>> I don't think DataStream should see some SQL specific concepts such as >>> Filtering or ComputedColumn. It's >>> better to stay within SQL area and translate to more generic concept when >>> translating to DataStream/Runtime >>> layer, such as use MapFunction to represent computed column logic. >>> >>> Best, >>> Kurt >>> >>> >>> On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <[hidden email]> wrote: >>> >>>> Hi Timo and Dawid, >>>> >>>> It's really great that we have the same goal. I am actually wondering >> if >>> we >>>> can go one step further to avoid some of the interfaces in Table as >> well. >>>> >>>> For example, if we have the FilterableSource, do we still need the >>>> FilterableTableSource? Should DynamicTableSource just become a >>>> Source<*Row*, >>>> SourceSplitT, EnumChkT>? >>>> >>>> Can you help me understand a bit more about the reason we need the >>>> following relational representation / wrapper interfaces v.s. the >>>> interfaces that we could put to the Source in FLIP-27? >>>> >>>> DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> >>>> SupportsFilterablePushDown v.s. FilterableSource >>>> SupportsProjectablePushDown v.s. ProjectableSource >>>> SupportsWatermarkPushDown v.s. WithWatermarkAssigner >>>> SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer >>>> ScanTableSource v.s. ChangeLogDeserializer. >>>> LookUpTableSource v.s. LookUpSource >>>> >>>> Assuming we have all the interfaces on the right side, do we still need >>> the >>>> interfaces on the left side? Note that the interfaces on the right can >> be >>>> used by both DataStream and Table. If we do this, there will only be >> one >>>> set of Source interfaces Table and DataStream, the only difference is >>> that >>>> the Source for table will have some specific plugins and >> configurations. >>> An >>>> omnipotent Source can implement all the the above interfaces and take a >>>> Deserializer that implements both ComputedColumnDeserializer and >>>> ChangeLogDeserializer. >>>> >>>> Would the SQL planner work with that? >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> >>>> wrote: >>>> >>>>> +1. Thanks Timo for the design doc. >>>>> >>>>> We can also consider @Experimental too. But I am +1 to >> @PublicEvolving, >>>> we >>>>> should be confident in the current change. >>>>> >>>>> Best, >>>>> Jingsong Lee >>>>> >>>>> On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> >>> wrote: >>>>> >>>>>> @Becket: We totally agree that we don't need table specific >>> connectors >>>>>> during runtime. As Dawid said, the interfaces proposed here are >> just >>>> for >>>>>> communication with the planner. Once the properties (watermarks, >>>>>> computed column, filters, projecttion etc.) are negotiated, we can >>>>>> configure a regular Flink connector. >>>>>> >>>>>> E.g. setting the watermark assigner and deserialization schema of a >>>>>> Kafka connector. >>>>>> >>>>>> For better separation of concerns, Flink connectors should not >>> include >>>>>> relational interfaces and depend on flink-table. This is the >>>>>> responsibility of table source/sink. >>>>>> >>>>>> @Kurt: I would like to mark them @PublicEvolving already because we >>>> need >>>>>> to deprecate the old interfaces as early as possible. We cannot >>>> redirect >>>>>> to @Internal interfaces. They are not marked @Public, so we can >> still >>>>>> evolve them. But a core design shift should not happen again, it >>> would >>>>>> leave a bad impression if we are redesign over and over again. >>> Instead >>>>>> we should be confident in the current change. >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> >>>>>> On 24.03.20 09:20, Dawid Wysakowicz wrote: >>>>>>> Hi Becket, >>>>>>> >>>>>>> Answering your question, we have the same intention not to >>> duplicate >>>>>>> connectors between datastream and table apis. The interfaces >>> proposed >>>>> in >>>>>>> the FLIP are a way to describe relational properties of a source. >>> The >>>>>>> intention is as you described to translate all of those expressed >>> as >>>>>>> expressions or other Table specific structures into a DataStream >>>>> source. >>>>>>> In other words I think what we are doing here is in line with >> what >>>> you >>>>>>> described. >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Dawid >>>>>>> >>>>>>> On 24/03/2020 02:23, Becket Qin wrote: >>>>>>>> Hi Timo, >>>>>>>> >>>>>>>> Thanks for the proposal. I completely agree that the current >> Table >>>>>>>> connectors could be simplified quite a bit. I haven't finished >>>> reading >>>>>>>> everything, but here are some quick thoughts. >>>>>>>> >>>>>>>> Actually to me the biggest question is why should there be two >>>>> different >>>>>>>> connector systems for DataStream and Table? What is the >>> fundamental >>>>>> reason >>>>>>>> that is preventing us from merging them to one? >>>>>>>> >>>>>>>> The basic functionality of a connector is to provide >> capabilities >>> to >>>>> do >>>>>> IO >>>>>>>> and Serde. Conceptually, Table connectors should just be >>> DataStream >>>>>>>> connectors that are dealing with Rows. It seems that quite a few >>> of >>>>> the >>>>>>>> special connector requirements are just a specific way to do IO >> / >>>>> Serde. >>>>>>>> Taking SupportsFilterPushDown as an example, imagine we have the >>>>>> following >>>>>>>> interface: >>>>>>>> >>>>>>>> interface FilterableSource<PREDICATE> { >>>>>>>> void applyFilterable(Supplier<PREDICATE> predicate); >>>>>>>> } >>>>>>>> >>>>>>>> And if a ParquetSource would like to support filterable, it will >>>>> become: >>>>>>>> >>>>>>>> class ParquetSource implements Source, >>>>>> FilterableSource(FilterPredicate> { >>>>>>>> ... >>>>>>>> } >>>>>>>> >>>>>>>> For Table, one just need to provide an predicate supplier that >>>>> converts >>>>>> an >>>>>>>> Expression to the specified predicate type. This has a few >>> benefit: >>>>>>>> 1. Same unified API for filterable for sources, regardless of >>>>>> DataStream or >>>>>>>> Table. >>>>>>>> 2. The DataStream users now can also use the >>> ExpressionToPredicate >>>>>>>> supplier if they want to. >>>>>>>> >>>>>>>> To summarize, my main point is that I am wondering if it is >>> possible >>>>> to >>>>>>>> have a single set of connector interface for both Table and >>>>> DataStream, >>>>>>>> rather than having two hierarchies. I am not 100% sure if this >>> would >>>>>> work, >>>>>>>> but if it works, this would be a huge win from both code >>> maintenance >>>>> and >>>>>>>> user experience perspective. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < >>>>>> [hidden email]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Timo, >>>>>>>>> >>>>>>>>> Thank you for the proposal. I think it is an important >>> improvement >>>>> that >>>>>>>>> will benefit many parts of the Table API. The proposal looks >>> really >>>>>> good >>>>>>>>> to me and personally I would be comfortable with voting on the >>>>> current >>>>>>>>> state. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Dawid >>>>>>>>> >>>>>>>>> On 23/03/2020 18:53, Timo Walther wrote: >>>>>>>>>> Hi everyone, >>>>>>>>>> >>>>>>>>>> I received some questions around how the new interfaces play >>>>> together >>>>>>>>>> with formats and their factories. >>>>>>>>>> >>>>>>>>>> Furthermore, for MySQL or Postgres CDC logs, the format should >>> be >>>>> able >>>>>>>>>> to return a `ChangelogMode`. >>>>>>>>>> >>>>>>>>>> Also, I incorporated the feedback around the factory design in >>>>>> general. >>>>>>>>>> >>>>>>>>>> I added a new section `Factory Interfaces` to the design >>> document. >>>>>>>>>> This should be helpful to understand the big picture and >>>> connecting >>>>>>>>>> the concepts. >>>>>>>>>> >>>>>>>>>> Please let me know what you think? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Timo >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 18.03.20 13:43, Timo Walther wrote: >>>>>>>>>>> Hi Benchao, >>>>>>>>>>> >>>>>>>>>>> this is a very good question. I will update the FLIP about >>> this. >>>>>>>>>>> >>>>>>>>>>> The legacy planner will not support the new interfaces. It >> will >>>>> only >>>>>>>>>>> support the old interfaces. With the next release, I think >> the >>>>> Blink >>>>>>>>>>> planner is stable enough to be the default one as well. >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Timo >>>>>>>>>>> >>>>>>>>>>> On 18.03.20 08:45, Benchao Li wrote: >>>>>>>>>>>> Hi Timo, >>>>>>>>>>>> >>>>>>>>>>>> Thank you and others for the efforts to prepare this FLIP. >>>>>>>>>>>> >>>>>>>>>>>> The FLIP LGTM generally. >>>>>>>>>>>> >>>>>>>>>>>> +1 for moving blink data structures to table-common, it's >>> useful >>>>> to >>>>>>>>>>>> udf too >>>>>>>>>>>> in the future. >>>>>>>>>>>> A little question is, do we plan to support the new >> interfaces >>>> and >>>>>> data >>>>>>>>>>>> types in legacy planner? >>>>>>>>>>>> Or we only plan to support these new interfaces in blink >>>> planner. >>>>>>>>>>>> >>>>>>>>>>>> And using primary keys from DDL instead of derived key >>>> information >>>>>> from >>>>>>>>>>>> each query is also a good idea, >>>>>>>>>>>> we met some use cases where this does not works very well >>>> before. >>>>>>>>>>>> >>>>>>>>>>>> This FLIP also makes the dependencies of table modules more >>>>> clear, I >>>>>>>>>>>> like >>>>>>>>>>>> it very much. >>>>>>>>>>>> >>>>>>>>>>>> Timo Walther <[hidden email]> 于2020年3月17日周二 上午1:36写道: >>>>>>>>>>>> >>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm happy to present the results of long discussions that >> we >>>> had >>>>>>>>>>>>> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and >>> many >>>>>> more >>>>>>>>>>>>> have contributed to this design document. >>>>>>>>>>>>> >>>>>>>>>>>>> We would like to propose new long-term table source and >> table >>>>> sink >>>>>>>>>>>>> interfaces: >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>>>>>>>> >>>>>>>>>>>>> This is a requirement for FLIP-105 and finalizing FLIP-32. >>>>>>>>>>>>> >>>>>>>>>>>>> The goals of this FLIP are: >>>>>>>>>>>>> >>>>>>>>>>>>> - Simplify the current interface architecture: >>>>>>>>>>>>> - Merge upsert, retract, and append sinks. >>>>>>>>>>>>> - Unify batch and streaming sources. >>>>>>>>>>>>> - Unify batch and streaming sinks. >>>>>>>>>>>>> >>>>>>>>>>>>> - Allow sources to produce a changelog: >>>>>>>>>>>>> - UpsertTableSources have been requested a lot by >>> users. >>>>> Now >>>>>>>>>>>>> is the >>>>>>>>>>>>> time to open the internal planner capabilities via the new >>>>>> interfaces. >>>>>>>>>>>>> - According to FLIP-105, we would like to support >>>>>> changelogs for >>>>>>>>>>>>> processing formats such as Debezium. >>>>>>>>>>>>> >>>>>>>>>>>>> - Don't rely on DataStream API for source and sinks: >>>>>>>>>>>>> - According to FLIP-32, the Table API and SQL should >>> be >>>>>>>>>>>>> independent >>>>>>>>>>>>> of the DataStream API which is why the `table-common` >> module >>>> has >>>>> no >>>>>>>>>>>>> dependencies on `flink-streaming-java`. >>>>>>>>>>>>> - Source and sink implementations should only depend >>> on >>>>> the >>>>>>>>>>>>> `table-common` module after FLIP-27. >>>>>>>>>>>>> - Until FLIP-27 is ready, we still put most of the >>>>>> interfaces in >>>>>>>>>>>>> `table-common` and strictly separate interfaces that >>>> communicate >>>>>>>>>>>>> with a >>>>>>>>>>>>> planner and actual runtime reader/writers. >>>>>>>>>>>>> >>>>>>>>>>>>> - Implement efficient sources and sinks without planner >>>>>> dependencies: >>>>>>>>>>>>> - Make Blink's internal data structures available to >>>>>> connectors. >>>>>>>>>>>>> - Introduce stable interfaces for data structures >> that >>>> can >>>>>> be >>>>>>>>>>>>> marked as `@PublicEvolving`. >>>>>>>>>>>>> - Only require dependencies on `flink-table-common` >> in >>>> the >>>>>>>>>>>>> future >>>>>>>>>>>>> >>>>>>>>>>>>> It finalizes the concept of dynamic tables and consideres >> how >>>> all >>>>>>>>>>>>> source/sink related classes play together. >>>>>>>>>>>>> >>>>>>>>>>>>> We look forward to your feedback. >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Timo >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>> >> > |
In reply to this post by Becket Qin
Hi Becket, I really think we don't have a differing opinions. We might not see the changes in the same way yet. Personally I think of the DynamicTableSource as of a factory for a Source implemented for the DataStream API. The important fact about the DynamicTableSource and all feature traits (SupportsFilterablePushDown, SupportsProjectPushDown etc.) work with Table API concepts such as e.g. Expressions, SQL specific types etc. In the end what the implementation would resemble is (bear in mind I tremendously simplified the example, just to show the relation between the two APIs): SupportsFilterablePushDown { applyFilters(List<ResolvedExpression> filters) { this.filters = convertToDataStreamFilters(filters); } Source createSource() { return Source.create() .applyFilters(this.filters); } } or exactly as you said for the computed columns:
SupportsComputedColumnsPushDown {
this.deserializationSchema = new DeserializationSchema<Row> { Row deserialize(...) { RowData row = format.deserialize(bytes); // original
format, e.g json, avro, etc. RowData enriched = converter(row) } } } Source createSource() { return Source.create() .withDeserialization(deserializationSchema); } } So to sum it up again, all those interfaces are factories that
configure appropriate parts of the DataStream API using Table API
concepts. Finally to answer you question for particular
comparisons: DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> SupportsFilterablePushDown v.s. FilterableSource SupportsProjectablePushDown v.s. ProjectableSource SupportsWatermarkPushDown v.s. WithWatermarkAssigner SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer ScanTableSource v.s. ChangeLogDeserializer. pretty much you can think of all on the left as factories for the
right side, left side works with Table API classes (Expressions,
DataTypes). I hope this clarifies it a bit. Best, Dawid On 24/03/2020 15:03, Becket Qin wrote:
Hey Kurt, I don't think DataStream should see some SQL specific concepts such asFiltering or ComputedColumn.Projectable and Filterable seems not necessarily SQL concepts, but could be applicable to DataStream source as well to reduce the network load. For example ORC and Parquet should probably also be readable from DataStream, right? ComputedColumn is not part of the Source, it is an interface extends the Deserializer, which is a pluggable for the Source. From the SQL's perspective it has the concept of computed column, but from the Source perspective, It is essentially a Deserializer which also converts the records internally, assuming we allow some conversion to be embedded to the source in addition to just deserialization. Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 9:36 PM Jark Wu [hidden email] wrote:Thanks Timo for updating the formats section. That would be very helpful for changelog supporting (FLIP-105). I just left 2 minor comment about some method names. In general, I'm +1 to start a voting. -------------------------------------------------------------------------------------------------- Hi Becket, I agree we shouldn't duplicate codes, especiall the runtime implementations. However, the interfaces proposed by FLIP-95 are mainly used during optimization (compiling), not runtime. I don't think there is much to share for this. Because table/sql is declarative, but DataStream is imperative. For example, filter push down, DataStream FilterableSource may allow to accept a FilterFunction (which is a black box for the source). However, table sources should pick the pushed filter expressions, some sources may only support "=", "<", ">" conditions. Pushing a FilterFunction doesn't work in table ecosystem. That means, the connectors have to have some table-specific implementations. Best, Jark On Tue, 24 Mar 2020 at 20:41, Kurt Young [hidden email] wrote:Hi Becket, I don't think DataStream should see some SQL specific concepts such as Filtering or ComputedColumn. It's better to stay within SQL area and translate to more generic concept when translating to DataStream/Runtime layer, such as use MapFunction to represent computed column logic. Best, Kurt On Tue, Mar 24, 2020 at 5:47 PM Becket Qin [hidden email] wrote:Hi Timo and Dawid, It's really great that we have the same goal. I am actually wonderingifwecan go one step further to avoid some of the interfaces in Table aswell.For example, if we have the FilterableSource, do we still need the FilterableTableSource? Should DynamicTableSource just become a Source<*Row*, SourceSplitT, EnumChkT>? Can you help me understand a bit more about the reason we need the following relational representation / wrapper interfaces v.s. the interfaces that we could put to the Source in FLIP-27? DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> SupportsFilterablePushDown v.s. FilterableSource SupportsProjectablePushDown v.s. ProjectableSource SupportsWatermarkPushDown v.s. WithWatermarkAssigner SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer ScanTableSource v.s. ChangeLogDeserializer. LookUpTableSource v.s. LookUpSource Assuming we have all the interfaces on the right side, do we still needtheinterfaces on the left side? Note that the interfaces on the right canbeused by both DataStream and Table. If we do this, there will only beoneset of Source interfaces Table and DataStream, the only difference isthatthe Source for table will have some specific plugins andconfigurations.Anomnipotent Source can implement all the the above interfaces and take a Deserializer that implements both ComputedColumnDeserializer and ChangeLogDeserializer. Would the SQL planner work with that? Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li [hidden email] wrote:+1. Thanks Timo for the design doc. We can also consider @Experimental too. But I am +1 to@PublicEvolving,weshould be confident in the current change. Best, Jingsong Lee On Tue, Mar 24, 2020 at 4:30 PM Timo Walther [hidden email]wrote:@Becket: We totally agree that we don't need table specificconnectorsduring runtime. As Dawid said, the interfaces proposed here arejustforcommunication with the planner. Once the properties (watermarks, computed column, filters, projecttion etc.) are negotiated, we can configure a regular Flink connector. E.g. setting the watermark assigner and deserialization schema of a Kafka connector. For better separation of concerns, Flink connectors should notincluderelational interfaces and depend on flink-table. This is the responsibility of table source/sink. @Kurt: I would like to mark them @PublicEvolving already because weneedto deprecate the old interfaces as early as possible. We cannotredirectto @Internal interfaces. They are not marked @Public, so we canstillevolve them. But a core design shift should not happen again, itwouldleave a bad impression if we are redesign over and over again.Insteadwe should be confident in the current change. Regards, Timo On 24.03.20 09:20, Dawid Wysakowicz wrote:Hi Becket, Answering your question, we have the same intention not toduplicateconnectors between datastream and table apis. The interfacesproposedinthe FLIP are a way to describe relational properties of a source.Theintention is as you described to translate all of those expressedasexpressions or other Table specific structures into a DataStreamsource.In other words I think what we are doing here is in line withwhatyoudescribed. Best, Dawid On 24/03/2020 02:23, Becket Qin wrote:Hi Timo, Thanks for the proposal. I completely agree that the currentTableconnectors could be simplified quite a bit. I haven't finishedreadingeverything, but here are some quick thoughts. Actually to me the biggest question is why should there be twodifferentconnector systems for DataStream and Table? What is thefundamentalreasonthat is preventing us from merging them to one? The basic functionality of a connector is to providecapabilitiestodoIOand Serde. Conceptually, Table connectors should just beDataStreamconnectors that are dealing with Rows. It seems that quite a fewofthespecial connector requirements are just a specific way to do IO/Serde.Taking SupportsFilterPushDown as an example, imagine we have thefollowinginterface: interface FilterableSource<PREDICATE> { void applyFilterable(Supplier<PREDICATE> predicate); } And if a ParquetSource would like to support filterable, it willbecome:class ParquetSource implements Source,FilterableSource(FilterPredicate> {... } For Table, one just need to provide an predicate supplier thatconvertsanExpression to the specified predicate type. This has a fewbenefit:1. Same unified API for filterable for sources, regardless ofDataStream orTable. 2. The DataStream users now can also use theExpressionToPredicatesupplier if they want to. To summarize, my main point is that I am wondering if it ispossibletohave a single set of connector interface for both Table andDataStream,rather than having two hierarchies. I am not 100% sure if thiswouldwork,but if it works, this would be a huge win from both codemaintenanceanduser experience perspective. Thanks, Jiangjie (Becket) Qin On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz <[hidden email]>wrote:Hi Timo, Thank you for the proposal. I think it is an importantimprovementthatwill benefit many parts of the Table API. The proposal looksreallygoodto me and personally I would be comfortable with voting on thecurrentstate. Best, Dawid On 23/03/2020 18:53, Timo Walther wrote:Hi everyone, I received some questions around how the new interfaces playtogetherwith formats and their factories. Furthermore, for MySQL or Postgres CDC logs, the format shouldbeableto return a `ChangelogMode`. Also, I incorporated the feedback around the factory design ingeneral.I added a new section `Factory Interfaces` to the designdocument.This should be helpful to understand the big picture andconnectingthe concepts. Please let me know what you think? Thanks, Timo On 18.03.20 13:43, Timo Walther wrote:Hi Benchao, this is a very good question. I will update the FLIP aboutthis.The legacy planner will not support the new interfaces. Itwillonlysupport the old interfaces. With the next release, I thinktheBlinkplanner is stable enough to be the default one as well. Regards, Timo On 18.03.20 08:45, Benchao Li wrote:Hi Timo, Thank you and others for the efforts to prepare this FLIP. The FLIP LGTM generally. +1 for moving blink data structures to table-common, it'susefultoudf too in the future. A little question is, do we plan to support the newinterfacesanddatatypes in legacy planner? Or we only plan to support these new interfaces in blinkplanner.And using primary keys from DDL instead of derived keyinformationfromeach query is also a good idea, we met some use cases where this does not works very wellbefore.This FLIP also makes the dependencies of table modules moreclear, Ilike it very much. Timo Walther [hidden email] 于2020年3月17日周二 上午1:36写道:Hi everyone, I'm happy to present the results of long discussions thatwehadinternally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, andmanymorehave contributed to this design document. We would like to propose new long-term table source andtablesinkinterfaces:https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfacesThis is a requirement for FLIP-105 and finalizing FLIP-32. The goals of this FLIP are: - Simplify the current interface architecture: - Merge upsert, retract, and append sinks. - Unify batch and streaming sources. - Unify batch and streaming sinks. - Allow sources to produce a changelog: - UpsertTableSources have been requested a lot byusers.Nowis the time to open the internal planner capabilities via the newinterfaces.- According to FLIP-105, we would like to supportchangelogs forprocessing formats such as Debezium. - Don't rely on DataStream API for source and sinks: - According to FLIP-32, the Table API and SQL shouldbeindependent of the DataStream API which is why the `table-common`modulehasnodependencies on `flink-streaming-java`. - Source and sink implementations should only dependonthe`table-common` module after FLIP-27. - Until FLIP-27 is ready, we still put most of theinterfaces in`table-common` and strictly separate interfaces thatcommunicatewith a planner and actual runtime reader/writers. - Implement efficient sources and sinks without plannerdependencies:- Make Blink's internal data structures available toconnectors.- Introduce stable interfaces for data structuresthatcanbemarked as `@PublicEvolving`. - Only require dependencies on `flink-table-common`inthefuture It finalizes the concept of dynamic tables and considereshowallsource/sink related classes play together. We look forward to your feedback. Regards, Timo-- Best, Jingsong Lee signature.asc (849 bytes) Download Attachment |
Hi Timo and Dawid,
Thanks for the clarification. They really help. You are right that we are on the same page regarding the hierarchy. I think the only difference between our view is the flavor of the interfaces. There are two flavors of the source interface for DataStream and Table source. *Flavor 1. Table Sources are some wrapper interfaces around DataStream source.* Following this way, we will reach the design of the current proposal, i.e. each pluggable exposed in the DataStream source will have a corresponding TableSource interface counterpart, which are at the Factory level. Users will write code like this: { MyTableSource myTableSource = MyTableSourceFactory.create(); myTableSource.setSchema(mySchema); myTableSource.applyFilterPredicate(expression); ... } The good thing for this flavor is that from the SQL / Table's perspective, there is a dedicated set of Table oriented interface. The downsides are: A. From the user's perspective, DataStream Source and Table Source are just two different sets of interfaces, regardless of how they are the same internally. B. The source developers have to develop for those two sets of interfaces in order to support both DataStream and Table. C. It is not explicit that DataStream can actually share the pluggable in Table / SQL. For example, in order to provide a filter pluggable with SQL expression, users will have to know the actual converter class that converts the expression to the filter predicate and construct that converter by themselves. --------------- *Flavor 2. A TableSource is a DataStream source with a bunch of pluggables. No Table specific interfaces at all.* Following this way, we will reach another design where you have a SourceFactory and a single Pluggable factory for all the table pluggables. And users will write something like: { Deserializer<Row> myTableDeserializer = MyTablePluggableFactory.createDeserializer(schema) MySource<Row> mySource = MySourceFactory.create(properties, myTableDeserializer); mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); } The good thing for this flavor is that there is just one set of interface that works for both Table and DataStream. There is no difference between creating a DataStream source and creating a Table source. DataStream can easily reuse the pluggables from the Table sources. The downside is that Table / SQL won't have a dedicated API for optimization. Instead of writing: if (MyTableSource instanceOf FilterableTableSource) { // Some filter push down logic. MyTableSource.applyPredicate(expression) } One have to write: if (MySource instanceOf FilterableSource) { // Some filter push down logic. mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); } ------------------------- Just to be clear, I am not saying flavor 2 is necessarily better than flavor 1, but I want to make sure flavor 2 is also considered and discussed. Thanks, Jiangjie (Becket) Qin. On Tue, Mar 24, 2020 at 10:53 PM Dawid Wysakowicz <[hidden email]> wrote: > Hi Becket, > > I really think we don't have a differing opinions. We might not see the > changes in the same way yet. Personally I think of the DynamicTableSource > as of a factory for a Source implemented for the DataStream API. The > important fact about the DynamicTableSource and all feature traits > (SupportsFilterablePushDown, SupportsProjectPushDown etc.) work with Table > API concepts such as e.g. Expressions, SQL specific types etc. In the end > what the implementation would resemble is (bear in mind I tremendously > simplified the example, just to show the relation between the two APIs): > > SupportsFilterablePushDown { > > applyFilters(List<ResolvedExpression> filters) { > > this.filters = convertToDataStreamFilters(filters); > > } > > Source createSource() { > > return Source.create() > > .applyFilters(this.filters); > > } > > } > > or exactly as you said for the computed columns: > > > SupportsComputedColumnsPushDown { > > > > applyComputedColumn(ComputedColumnConverter converter) { > > this.deserializationSchema = new DeserializationSchema<Row> { > > Row deserialize(...) { > > RowData row = format.deserialize(bytes); // original format, e.g > json, avro, etc. > > RowData enriched = converter(row) > > } > > } > > } > > Source createSource() { > > return Source.create() > > .withDeserialization(deserializationSchema); > > } > > } > > So to sum it up again, all those interfaces are factories that configure > appropriate parts of the DataStream API using Table API concepts. Finally > to answer you question for particular comparisons: > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > SupportsFilterablePushDown v.s. FilterableSource > SupportsProjectablePushDown v.s. ProjectableSource > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > ScanTableSource v.s. ChangeLogDeserializer. > > pretty much you can think of all on the left as factories for the right > side, left side works with Table API classes (Expressions, DataTypes). I > hope this clarifies it a bit. > > Best, > > Dawid > On 24/03/2020 15:03, Becket Qin wrote: > > Hey Kurt, > > I don't think DataStream should see some SQL specific concepts such as > > Filtering or ComputedColumn. > > Projectable and Filterable seems not necessarily SQL concepts, but could be > applicable to DataStream source as well to reduce the network load. For > example ORC and Parquet should probably also be readable from DataStream, > right? > > ComputedColumn is not part of the Source, it is an interface extends the > Deserializer, which is a pluggable for the Source. From the SQL's > perspective it has the concept of computed column, but from the Source > perspective, It is essentially a Deserializer which also converts the > records internally, assuming we allow some conversion to be embedded to > the source in addition to just deserialization. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <[hidden email]> <[hidden email]> wrote: > > > Thanks Timo for updating the formats section. That would be very helpful > for changelog supporting (FLIP-105). > > I just left 2 minor comment about some method names. In general, I'm +1 to > start a voting. > > > -------------------------------------------------------------------------------------------------- > > Hi Becket, > > I agree we shouldn't duplicate codes, especiall the runtime > implementations. > However, the interfaces proposed by FLIP-95 are mainly used during > optimization (compiling), not runtime. > I don't think there is much to share for this. Because table/sql > is declarative, but DataStream is imperative. > For example, filter push down, DataStream FilterableSource may allow to > accept a FilterFunction (which is a black box for the source). > However, table sources should pick the pushed filter expressions, some > sources may only support "=", "<", ">" conditions. > Pushing a FilterFunction doesn't work in table ecosystem. That means, the > connectors have to have some table-specific implementations. > > > Best, > Jark > > On Tue, 24 Mar 2020 at 20:41, Kurt Young <[hidden email]> <[hidden email]> wrote: > > > Hi Becket, > > I don't think DataStream should see some SQL specific concepts such as > Filtering or ComputedColumn. It's > better to stay within SQL area and translate to more generic concept when > translating to DataStream/Runtime > layer, such as use MapFunction to represent computed column logic. > > Best, > Kurt > > > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <[hidden email]> <[hidden email]> wrote: > > > Hi Timo and Dawid, > > It's really great that we have the same goal. I am actually wondering > > if > > we > > can go one step further to avoid some of the interfaces in Table as > > well. > > For example, if we have the FilterableSource, do we still need the > FilterableTableSource? Should DynamicTableSource just become a > Source<*Row*, > SourceSplitT, EnumChkT>? > > Can you help me understand a bit more about the reason we need the > following relational representation / wrapper interfaces v.s. the > interfaces that we could put to the Source in FLIP-27? > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > SupportsFilterablePushDown v.s. FilterableSource > SupportsProjectablePushDown v.s. ProjectableSource > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > ScanTableSource v.s. ChangeLogDeserializer. > LookUpTableSource v.s. LookUpSource > > Assuming we have all the interfaces on the right side, do we still need > > the > > interfaces on the left side? Note that the interfaces on the right can > > be > > used by both DataStream and Table. If we do this, there will only be > > one > > set of Source interfaces Table and DataStream, the only difference is > > that > > the Source for table will have some specific plugins and > > configurations. > > An > > omnipotent Source can implement all the the above interfaces and take a > Deserializer that implements both ComputedColumnDeserializer and > ChangeLogDeserializer. > > Would the SQL planner work with that? > > Thanks, > > Jiangjie (Becket) Qin > > > > > > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> <[hidden email]> > wrote: > > > +1. Thanks Timo for the design doc. > > We can also consider @Experimental too. But I am +1 to > > @PublicEvolving, > > we > > should be confident in the current change. > > Best, > Jingsong Lee > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> <[hidden email]> > > wrote: > > @Becket: We totally agree that we don't need table specific > > connectors > > during runtime. As Dawid said, the interfaces proposed here are > > just > > for > > communication with the planner. Once the properties (watermarks, > computed column, filters, projecttion etc.) are negotiated, we can > configure a regular Flink connector. > > E.g. setting the watermark assigner and deserialization schema of a > Kafka connector. > > For better separation of concerns, Flink connectors should not > > include > > relational interfaces and depend on flink-table. This is the > responsibility of table source/sink. > > @Kurt: I would like to mark them @PublicEvolving already because we > > need > > to deprecate the old interfaces as early as possible. We cannot > > redirect > > to @Internal interfaces. They are not marked @Public, so we can > > still > > evolve them. But a core design shift should not happen again, it > > would > > leave a bad impression if we are redesign over and over again. > > Instead > > we should be confident in the current change. > > Regards, > Timo > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > Hi Becket, > > Answering your question, we have the same intention not to > > duplicate > > connectors between datastream and table apis. The interfaces > > proposed > > in > > the FLIP are a way to describe relational properties of a source. > > The > > intention is as you described to translate all of those expressed > > as > > expressions or other Table specific structures into a DataStream > > source. > > In other words I think what we are doing here is in line with > > what > > you > > described. > > Best, > > Dawid > > On 24/03/2020 02:23, Becket Qin wrote: > > Hi Timo, > > Thanks for the proposal. I completely agree that the current > > Table > > connectors could be simplified quite a bit. I haven't finished > > reading > > everything, but here are some quick thoughts. > > Actually to me the biggest question is why should there be two > > different > > connector systems for DataStream and Table? What is the > > fundamental > > reason > > that is preventing us from merging them to one? > > The basic functionality of a connector is to provide > > capabilities > > to > > do > > IO > > and Serde. Conceptually, Table connectors should just be > > DataStream > > connectors that are dealing with Rows. It seems that quite a few > > of > > the > > special connector requirements are just a specific way to do IO > > / > > Serde. > > Taking SupportsFilterPushDown as an example, imagine we have the > > following > > interface: > > interface FilterableSource<PREDICATE> { > void applyFilterable(Supplier<PREDICATE> predicate); > } > > And if a ParquetSource would like to support filterable, it will > > become: > > class ParquetSource implements Source, > > FilterableSource(FilterPredicate> { > > ... > } > > For Table, one just need to provide an predicate supplier that > > converts > > an > > Expression to the specified predicate type. This has a few > > benefit: > > 1. Same unified API for filterable for sources, regardless of > > DataStream or > > Table. > 2. The DataStream users now can also use the > > ExpressionToPredicate > > supplier if they want to. > > To summarize, my main point is that I am wondering if it is > > possible > > to > > have a single set of connector interface for both Table and > > DataStream, > > rather than having two hierarchies. I am not 100% sure if this > > would > > work, > > but if it works, this would be a huge win from both code > > maintenance > > and > > user experience perspective. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > > [hidden email]> > > wrote: > > > Hi Timo, > > Thank you for the proposal. I think it is an important > > improvement > > that > > will benefit many parts of the Table API. The proposal looks > > really > > good > > to me and personally I would be comfortable with voting on the > > current > > state. > > Best, > > Dawid > > On 23/03/2020 18:53, Timo Walther wrote: > > Hi everyone, > > I received some questions around how the new interfaces play > > together > > with formats and their factories. > > Furthermore, for MySQL or Postgres CDC logs, the format should > > be > > able > > to return a `ChangelogMode`. > > Also, I incorporated the feedback around the factory design in > > general. > > I added a new section `Factory Interfaces` to the design > > document. > > This should be helpful to understand the big picture and > > connecting > > the concepts. > > Please let me know what you think? > > Thanks, > Timo > > > On 18.03.20 13:43, Timo Walther wrote: > > Hi Benchao, > > this is a very good question. I will update the FLIP about > > this. > > The legacy planner will not support the new interfaces. It > > will > > only > > support the old interfaces. With the next release, I think > > the > > Blink > > planner is stable enough to be the default one as well. > > Regards, > Timo > > On 18.03.20 08:45, Benchao Li wrote: > > Hi Timo, > > Thank you and others for the efforts to prepare this FLIP. > > The FLIP LGTM generally. > > +1 for moving blink data structures to table-common, it's > > useful > > to > > udf too > in the future. > A little question is, do we plan to support the new > > interfaces > > and > > data > > types in legacy planner? > Or we only plan to support these new interfaces in blink > > planner. > > And using primary keys from DDL instead of derived key > > information > > from > > each query is also a good idea, > we met some use cases where this does not works very well > > before. > > This FLIP also makes the dependencies of table modules more > > clear, I > > like > it very much. > > Timo Walther <[hidden email]> <[hidden email]> 于2020年3月17日周二 上午1:36写道: > > > Hi everyone, > > I'm happy to present the results of long discussions that > > we > > had > > internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and > > many > > more > > have contributed to this design document. > > We would like to propose new long-term table source and > > table > > sink > > interfaces: > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > This is a requirement for FLIP-105 and finalizing FLIP-32. > > The goals of this FLIP are: > > - Simplify the current interface architecture: > - Merge upsert, retract, and append sinks. > - Unify batch and streaming sources. > - Unify batch and streaming sinks. > > - Allow sources to produce a changelog: > - UpsertTableSources have been requested a lot by > > users. > > Now > > is the > time to open the internal planner capabilities via the new > > interfaces. > > - According to FLIP-105, we would like to support > > changelogs for > > processing formats such as Debezium. > > - Don't rely on DataStream API for source and sinks: > - According to FLIP-32, the Table API and SQL should > > be > > independent > of the DataStream API which is why the `table-common` > > module > > has > > no > > dependencies on `flink-streaming-java`. > - Source and sink implementations should only depend > > on > > the > > `table-common` module after FLIP-27. > - Until FLIP-27 is ready, we still put most of the > > interfaces in > > `table-common` and strictly separate interfaces that > > communicate > > with a > planner and actual runtime reader/writers. > > - Implement efficient sources and sinks without planner > > dependencies: > > - Make Blink's internal data structures available to > > connectors. > > - Introduce stable interfaces for data structures > > that > > can > > be > > marked as `@PublicEvolving`. > - Only require dependencies on `flink-table-common` > > in > > the > > future > > It finalizes the concept of dynamic tables and consideres > > how > > all > > source/sink related classes play together. > > We look forward to your feedback. > > Regards, > Timo > > > -- > Best, Jingsong Lee > > > |
Hi Becket,
Regarding to Flavor1 and Flavor2, I want to clarify that user will never use table source like this: { MyTableSource myTableSource = MyTableSourceFactory.create(); myTableSource.setSchema(mySchema); myTableSource.applyFilterPredicate(expression); ... } TableFactory and TableSource are not directly exposed to end users, all the methods are called by planner, not users. Users always use DDL or descriptor to register a table, and planner will find the factory and create sources according to the properties. All the optimization are applied automatically, e.g. filter/projection pushdown, users don't need to call `applyFilterPredicate` explicitly. On Wed, 25 Mar 2020 at 09:25, Becket Qin <[hidden email]> wrote: > Hi Timo and Dawid, > > Thanks for the clarification. They really help. You are right that we are > on the same page regarding the hierarchy. I think the only difference > between our view is the flavor of the interfaces. There are two flavors of > the source interface for DataStream and Table source. > > *Flavor 1. Table Sources are some wrapper interfaces around DataStream > source.* > Following this way, we will reach the design of the current proposal, i.e. > each pluggable exposed in the DataStream source will have a corresponding > TableSource interface counterpart, which are at the Factory level. Users > will write code like this: > > { > MyTableSource myTableSource = MyTableSourceFactory.create(); > myTableSource.setSchema(mySchema); > myTableSource.applyFilterPredicate(expression); > ... > } > > The good thing for this flavor is that from the SQL / Table's perspective, > there is a dedicated set of Table oriented interface. > > The downsides are: > A. From the user's perspective, DataStream Source and Table Source are just > two different sets of interfaces, regardless of how they are the same > internally. > B. The source developers have to develop for those two sets of interfaces > in order to support both DataStream and Table. > C. It is not explicit that DataStream can actually share the pluggable in > Table / SQL. For example, in order to provide a filter pluggable with SQL > expression, users will have to know the actual converter class that > converts the expression to the filter predicate and construct that > converter by themselves. > > --------------- > > *Flavor 2. A TableSource is a DataStream source with a bunch of pluggables. > No Table specific interfaces at all.* > Following this way, we will reach another design where you have a > SourceFactory and a single Pluggable factory for all the table pluggables. > And users will write something like: > > { > Deserializer<Row> myTableDeserializer = > MyTablePluggableFactory.createDeserializer(schema) > MySource<Row> mySource = MySourceFactory.create(properties, > myTableDeserializer); > > > mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); > } > > The good thing for this flavor is that there is just one set of interface > that works for both Table and DataStream. There is no difference between > creating a DataStream source and creating a Table source. DataStream can > easily reuse the pluggables from the Table sources. > > The downside is that Table / SQL won't have a dedicated API for > optimization. Instead of writing: > > if (MyTableSource instanceOf FilterableTableSource) { > // Some filter push down logic. > MyTableSource.applyPredicate(expression) > } > > One have to write: > > if (MySource instanceOf FilterableSource) { > // Some filter push down logic. > > > mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); > } > > ------------------------- > > Just to be clear, I am not saying flavor 2 is necessarily better than > flavor 1, but I want to make sure flavor 2 is also considered and > discussed. > > Thanks, > > Jiangjie (Becket) Qin. > > On Tue, Mar 24, 2020 at 10:53 PM Dawid Wysakowicz <[hidden email]> > wrote: > > > Hi Becket, > > > > I really think we don't have a differing opinions. We might not see the > > changes in the same way yet. Personally I think of the DynamicTableSource > > as of a factory for a Source implemented for the DataStream API. The > > important fact about the DynamicTableSource and all feature traits > > (SupportsFilterablePushDown, SupportsProjectPushDown etc.) work with > Table > > API concepts such as e.g. Expressions, SQL specific types etc. In the end > > what the implementation would resemble is (bear in mind I tremendously > > simplified the example, just to show the relation between the two APIs): > > > > SupportsFilterablePushDown { > > > > applyFilters(List<ResolvedExpression> filters) { > > > > this.filters = convertToDataStreamFilters(filters); > > > > } > > > > Source createSource() { > > > > return Source.create() > > > > .applyFilters(this.filters); > > > > } > > > > } > > > > or exactly as you said for the computed columns: > > > > > > SupportsComputedColumnsPushDown { > > > > > > > > applyComputedColumn(ComputedColumnConverter converter) { > > > > this.deserializationSchema = new DeserializationSchema<Row> { > > > > Row deserialize(...) { > > > > RowData row = format.deserialize(bytes); // original format, e.g > > json, avro, etc. > > > > RowData enriched = converter(row) > > > > } > > > > } > > > > } > > > > Source createSource() { > > > > return Source.create() > > > > .withDeserialization(deserializationSchema); > > > > } > > > > } > > > > So to sum it up again, all those interfaces are factories that configure > > appropriate parts of the DataStream API using Table API concepts. Finally > > to answer you question for particular comparisons: > > > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > > SupportsFilterablePushDown v.s. FilterableSource > > SupportsProjectablePushDown v.s. ProjectableSource > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > > ScanTableSource v.s. ChangeLogDeserializer. > > > > pretty much you can think of all on the left as factories for the right > > side, left side works with Table API classes (Expressions, DataTypes). I > > hope this clarifies it a bit. > > > > Best, > > > > Dawid > > On 24/03/2020 15:03, Becket Qin wrote: > > > > Hey Kurt, > > > > I don't think DataStream should see some SQL specific concepts such as > > > > Filtering or ComputedColumn. > > > > Projectable and Filterable seems not necessarily SQL concepts, but could > be > > applicable to DataStream source as well to reduce the network load. For > > example ORC and Parquet should probably also be readable from DataStream, > > right? > > > > ComputedColumn is not part of the Source, it is an interface extends the > > Deserializer, which is a pluggable for the Source. From the SQL's > > perspective it has the concept of computed column, but from the Source > > perspective, It is essentially a Deserializer which also converts the > > records internally, assuming we allow some conversion to be embedded to > > the source in addition to just deserialization. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <[hidden email]> < > [hidden email]> wrote: > > > > > > Thanks Timo for updating the formats section. That would be very helpful > > for changelog supporting (FLIP-105). > > > > I just left 2 minor comment about some method names. In general, I'm +1 > to > > start a voting. > > > > > > > -------------------------------------------------------------------------------------------------- > > > > Hi Becket, > > > > I agree we shouldn't duplicate codes, especiall the runtime > > implementations. > > However, the interfaces proposed by FLIP-95 are mainly used during > > optimization (compiling), not runtime. > > I don't think there is much to share for this. Because table/sql > > is declarative, but DataStream is imperative. > > For example, filter push down, DataStream FilterableSource may allow to > > accept a FilterFunction (which is a black box for the source). > > However, table sources should pick the pushed filter expressions, some > > sources may only support "=", "<", ">" conditions. > > Pushing a FilterFunction doesn't work in table ecosystem. That means, the > > connectors have to have some table-specific implementations. > > > > > > Best, > > Jark > > > > On Tue, 24 Mar 2020 at 20:41, Kurt Young <[hidden email]> < > [hidden email]> wrote: > > > > > > Hi Becket, > > > > I don't think DataStream should see some SQL specific concepts such as > > Filtering or ComputedColumn. It's > > better to stay within SQL area and translate to more generic concept when > > translating to DataStream/Runtime > > layer, such as use MapFunction to represent computed column logic. > > > > Best, > > Kurt > > > > > > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <[hidden email]> < > [hidden email]> wrote: > > > > > > Hi Timo and Dawid, > > > > It's really great that we have the same goal. I am actually wondering > > > > if > > > > we > > > > can go one step further to avoid some of the interfaces in Table as > > > > well. > > > > For example, if we have the FilterableSource, do we still need the > > FilterableTableSource? Should DynamicTableSource just become a > > Source<*Row*, > > SourceSplitT, EnumChkT>? > > > > Can you help me understand a bit more about the reason we need the > > following relational representation / wrapper interfaces v.s. the > > interfaces that we could put to the Source in FLIP-27? > > > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > > SupportsFilterablePushDown v.s. FilterableSource > > SupportsProjectablePushDown v.s. ProjectableSource > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > > ScanTableSource v.s. ChangeLogDeserializer. > > LookUpTableSource v.s. LookUpSource > > > > Assuming we have all the interfaces on the right side, do we still need > > > > the > > > > interfaces on the left side? Note that the interfaces on the right can > > > > be > > > > used by both DataStream and Table. If we do this, there will only be > > > > one > > > > set of Source interfaces Table and DataStream, the only difference is > > > > that > > > > the Source for table will have some specific plugins and > > > > configurations. > > > > An > > > > omnipotent Source can implement all the the above interfaces and take a > > Deserializer that implements both ComputedColumnDeserializer and > > ChangeLogDeserializer. > > > > Would the SQL planner work with that? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> < > [hidden email]> > > wrote: > > > > > > +1. Thanks Timo for the design doc. > > > > We can also consider @Experimental too. But I am +1 to > > > > @PublicEvolving, > > > > we > > > > should be confident in the current change. > > > > Best, > > Jingsong Lee > > > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> < > [hidden email]> > > > > wrote: > > > > @Becket: We totally agree that we don't need table specific > > > > connectors > > > > during runtime. As Dawid said, the interfaces proposed here are > > > > just > > > > for > > > > communication with the planner. Once the properties (watermarks, > > computed column, filters, projecttion etc.) are negotiated, we can > > configure a regular Flink connector. > > > > E.g. setting the watermark assigner and deserialization schema of a > > Kafka connector. > > > > For better separation of concerns, Flink connectors should not > > > > include > > > > relational interfaces and depend on flink-table. This is the > > responsibility of table source/sink. > > > > @Kurt: I would like to mark them @PublicEvolving already because we > > > > need > > > > to deprecate the old interfaces as early as possible. We cannot > > > > redirect > > > > to @Internal interfaces. They are not marked @Public, so we can > > > > still > > > > evolve them. But a core design shift should not happen again, it > > > > would > > > > leave a bad impression if we are redesign over and over again. > > > > Instead > > > > we should be confident in the current change. > > > > Regards, > > Timo > > > > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > > > Hi Becket, > > > > Answering your question, we have the same intention not to > > > > duplicate > > > > connectors between datastream and table apis. The interfaces > > > > proposed > > > > in > > > > the FLIP are a way to describe relational properties of a source. > > > > The > > > > intention is as you described to translate all of those expressed > > > > as > > > > expressions or other Table specific structures into a DataStream > > > > source. > > > > In other words I think what we are doing here is in line with > > > > what > > > > you > > > > described. > > > > Best, > > > > Dawid > > > > On 24/03/2020 02:23, Becket Qin wrote: > > > > Hi Timo, > > > > Thanks for the proposal. I completely agree that the current > > > > Table > > > > connectors could be simplified quite a bit. I haven't finished > > > > reading > > > > everything, but here are some quick thoughts. > > > > Actually to me the biggest question is why should there be two > > > > different > > > > connector systems for DataStream and Table? What is the > > > > fundamental > > > > reason > > > > that is preventing us from merging them to one? > > > > The basic functionality of a connector is to provide > > > > capabilities > > > > to > > > > do > > > > IO > > > > and Serde. Conceptually, Table connectors should just be > > > > DataStream > > > > connectors that are dealing with Rows. It seems that quite a few > > > > of > > > > the > > > > special connector requirements are just a specific way to do IO > > > > / > > > > Serde. > > > > Taking SupportsFilterPushDown as an example, imagine we have the > > > > following > > > > interface: > > > > interface FilterableSource<PREDICATE> { > > void applyFilterable(Supplier<PREDICATE> predicate); > > } > > > > And if a ParquetSource would like to support filterable, it will > > > > become: > > > > class ParquetSource implements Source, > > > > FilterableSource(FilterPredicate> { > > > > ... > > } > > > > For Table, one just need to provide an predicate supplier that > > > > converts > > > > an > > > > Expression to the specified predicate type. This has a few > > > > benefit: > > > > 1. Same unified API for filterable for sources, regardless of > > > > DataStream or > > > > Table. > > 2. The DataStream users now can also use the > > > > ExpressionToPredicate > > > > supplier if they want to. > > > > To summarize, my main point is that I am wondering if it is > > > > possible > > > > to > > > > have a single set of connector interface for both Table and > > > > DataStream, > > > > rather than having two hierarchies. I am not 100% sure if this > > > > would > > > > work, > > > > but if it works, this would be a huge win from both code > > > > maintenance > > > > and > > > > user experience perspective. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > > > > [hidden email]> > > > > wrote: > > > > > > Hi Timo, > > > > Thank you for the proposal. I think it is an important > > > > improvement > > > > that > > > > will benefit many parts of the Table API. The proposal looks > > > > really > > > > good > > > > to me and personally I would be comfortable with voting on the > > > > current > > > > state. > > > > Best, > > > > Dawid > > > > On 23/03/2020 18:53, Timo Walther wrote: > > > > Hi everyone, > > > > I received some questions around how the new interfaces play > > > > together > > > > with formats and their factories. > > > > Furthermore, for MySQL or Postgres CDC logs, the format should > > > > be > > > > able > > > > to return a `ChangelogMode`. > > > > Also, I incorporated the feedback around the factory design in > > > > general. > > > > I added a new section `Factory Interfaces` to the design > > > > document. > > > > This should be helpful to understand the big picture and > > > > connecting > > > > the concepts. > > > > Please let me know what you think? > > > > Thanks, > > Timo > > > > > > On 18.03.20 13:43, Timo Walther wrote: > > > > Hi Benchao, > > > > this is a very good question. I will update the FLIP about > > > > this. > > > > The legacy planner will not support the new interfaces. It > > > > will > > > > only > > > > support the old interfaces. With the next release, I think > > > > the > > > > Blink > > > > planner is stable enough to be the default one as well. > > > > Regards, > > Timo > > > > On 18.03.20 08:45, Benchao Li wrote: > > > > Hi Timo, > > > > Thank you and others for the efforts to prepare this FLIP. > > > > The FLIP LGTM generally. > > > > +1 for moving blink data structures to table-common, it's > > > > useful > > > > to > > > > udf too > > in the future. > > A little question is, do we plan to support the new > > > > interfaces > > > > and > > > > data > > > > types in legacy planner? > > Or we only plan to support these new interfaces in blink > > > > planner. > > > > And using primary keys from DDL instead of derived key > > > > information > > > > from > > > > each query is also a good idea, > > we met some use cases where this does not works very well > > > > before. > > > > This FLIP also makes the dependencies of table modules more > > > > clear, I > > > > like > > it very much. > > > > Timo Walther <[hidden email]> <[hidden email]> 于2020年3月17日周二 > 上午1:36写道: > > > > > > Hi everyone, > > > > I'm happy to present the results of long discussions that > > > > we > > > > had > > > > internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and > > > > many > > > > more > > > > have contributed to this design document. > > > > We would like to propose new long-term table source and > > > > table > > > > sink > > > > interfaces: > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > > > This is a requirement for FLIP-105 and finalizing FLIP-32. > > > > The goals of this FLIP are: > > > > - Simplify the current interface architecture: > > - Merge upsert, retract, and append sinks. > > - Unify batch and streaming sources. > > - Unify batch and streaming sinks. > > > > - Allow sources to produce a changelog: > > - UpsertTableSources have been requested a lot by > > > > users. > > > > Now > > > > is the > > time to open the internal planner capabilities via the new > > > > interfaces. > > > > - According to FLIP-105, we would like to support > > > > changelogs for > > > > processing formats such as Debezium. > > > > - Don't rely on DataStream API for source and sinks: > > - According to FLIP-32, the Table API and SQL should > > > > be > > > > independent > > of the DataStream API which is why the `table-common` > > > > module > > > > has > > > > no > > > > dependencies on `flink-streaming-java`. > > - Source and sink implementations should only depend > > > > on > > > > the > > > > `table-common` module after FLIP-27. > > - Until FLIP-27 is ready, we still put most of the > > > > interfaces in > > > > `table-common` and strictly separate interfaces that > > > > communicate > > > > with a > > planner and actual runtime reader/writers. > > > > - Implement efficient sources and sinks without planner > > > > dependencies: > > > > - Make Blink's internal data structures available to > > > > connectors. > > > > - Introduce stable interfaces for data structures > > > > that > > > > can > > > > be > > > > marked as `@PublicEvolving`. > > - Only require dependencies on `flink-table-common` > > > > in > > > > the > > > > future > > > > It finalizes the concept of dynamic tables and consideres > > > > how > > > > all > > > > source/sink related classes play together. > > > > We look forward to your feedback. > > > > Regards, > > Timo > > > > > > -- > > Best, Jingsong Lee > > > > > > > |
Hi Jark,
It is good to know that we do not expect the end users to touch those interfaces. Then the question boils down to whether the connector developers should be aware of the interfaces that are only used by the SQL optimizer. It seems a win if we can avoid that. Two potential solutions off the top of my head are: 1. An internal helper class doing the instanceOf based on DataStream source interface and create pluggables for that DataStream source. 2. codegen the set of TableSource interfaces given a DataStream Source and its corresponding TablePluggablesFactory. Thanks, Jiangjie (Becket) Qin On Wed, Mar 25, 2020 at 10:07 AM Jark Wu <[hidden email]> wrote: > Hi Becket, > > Regarding to Flavor1 and Flavor2, I want to clarify that user will never > use table source like this: > > { > MyTableSource myTableSource = MyTableSourceFactory.create(); > myTableSource.setSchema(mySchema); > myTableSource.applyFilterPredicate(expression); > ... > } > > TableFactory and TableSource are not directly exposed to end users, all the > methods are called by planner, not users. > Users always use DDL or descriptor to register a table, and planner will > find the factory and create sources according to the properties. > All the optimization are applied automatically, e.g. filter/projection > pushdown, users don't need to call `applyFilterPredicate` explicitly. > > > > On Wed, 25 Mar 2020 at 09:25, Becket Qin <[hidden email]> wrote: > > > Hi Timo and Dawid, > > > > Thanks for the clarification. They really help. You are right that we are > > on the same page regarding the hierarchy. I think the only difference > > between our view is the flavor of the interfaces. There are two flavors > of > > the source interface for DataStream and Table source. > > > > *Flavor 1. Table Sources are some wrapper interfaces around DataStream > > source.* > > Following this way, we will reach the design of the current proposal, > i.e. > > each pluggable exposed in the DataStream source will have a corresponding > > TableSource interface counterpart, which are at the Factory level. Users > > will write code like this: > > > > { > > MyTableSource myTableSource = MyTableSourceFactory.create(); > > myTableSource.setSchema(mySchema); > > myTableSource.applyFilterPredicate(expression); > > ... > > } > > > > The good thing for this flavor is that from the SQL / Table's > perspective, > > there is a dedicated set of Table oriented interface. > > > > The downsides are: > > A. From the user's perspective, DataStream Source and Table Source are > just > > two different sets of interfaces, regardless of how they are the same > > internally. > > B. The source developers have to develop for those two sets of interfaces > > in order to support both DataStream and Table. > > C. It is not explicit that DataStream can actually share the pluggable in > > Table / SQL. For example, in order to provide a filter pluggable with SQL > > expression, users will have to know the actual converter class that > > converts the expression to the filter predicate and construct that > > converter by themselves. > > > > --------------- > > > > *Flavor 2. A TableSource is a DataStream source with a bunch of > pluggables. > > No Table specific interfaces at all.* > > Following this way, we will reach another design where you have a > > SourceFactory and a single Pluggable factory for all the table > pluggables. > > And users will write something like: > > > > { > > Deserializer<Row> myTableDeserializer = > > MyTablePluggableFactory.createDeserializer(schema) > > MySource<Row> mySource = MySourceFactory.create(properties, > > myTableDeserializer); > > > > > > > mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); > > } > > > > The good thing for this flavor is that there is just one set of interface > > that works for both Table and DataStream. There is no difference between > > creating a DataStream source and creating a Table source. DataStream can > > easily reuse the pluggables from the Table sources. > > > > The downside is that Table / SQL won't have a dedicated API for > > optimization. Instead of writing: > > > > if (MyTableSource instanceOf FilterableTableSource) { > > // Some filter push down logic. > > MyTableSource.applyPredicate(expression) > > } > > > > One have to write: > > > > if (MySource instanceOf FilterableSource) { > > // Some filter push down logic. > > > > > > > mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); > > } > > > > ------------------------- > > > > Just to be clear, I am not saying flavor 2 is necessarily better than > > flavor 1, but I want to make sure flavor 2 is also considered and > > discussed. > > > > Thanks, > > > > Jiangjie (Becket) Qin. > > > > On Tue, Mar 24, 2020 at 10:53 PM Dawid Wysakowicz < > [hidden email]> > > wrote: > > > > > Hi Becket, > > > > > > I really think we don't have a differing opinions. We might not see the > > > changes in the same way yet. Personally I think of the > DynamicTableSource > > > as of a factory for a Source implemented for the DataStream API. The > > > important fact about the DynamicTableSource and all feature traits > > > (SupportsFilterablePushDown, SupportsProjectPushDown etc.) work with > > Table > > > API concepts such as e.g. Expressions, SQL specific types etc. In the > end > > > what the implementation would resemble is (bear in mind I tremendously > > > simplified the example, just to show the relation between the two > APIs): > > > > > > SupportsFilterablePushDown { > > > > > > applyFilters(List<ResolvedExpression> filters) { > > > > > > this.filters = convertToDataStreamFilters(filters); > > > > > > } > > > > > > Source createSource() { > > > > > > return Source.create() > > > > > > .applyFilters(this.filters); > > > > > > } > > > > > > } > > > > > > or exactly as you said for the computed columns: > > > > > > > > > SupportsComputedColumnsPushDown { > > > > > > > > > > > > applyComputedColumn(ComputedColumnConverter converter) { > > > > > > this.deserializationSchema = new DeserializationSchema<Row> { > > > > > > Row deserialize(...) { > > > > > > RowData row = format.deserialize(bytes); // original format, > e.g > > > json, avro, etc. > > > > > > RowData enriched = converter(row) > > > > > > } > > > > > > } > > > > > > } > > > > > > Source createSource() { > > > > > > return Source.create() > > > > > > .withDeserialization(deserializationSchema); > > > > > > } > > > > > > } > > > > > > So to sum it up again, all those interfaces are factories that > configure > > > appropriate parts of the DataStream API using Table API concepts. > Finally > > > to answer you question for particular comparisons: > > > > > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > > > SupportsFilterablePushDown v.s. FilterableSource > > > SupportsProjectablePushDown v.s. ProjectableSource > > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > > > ScanTableSource v.s. ChangeLogDeserializer. > > > > > > pretty much you can think of all on the left as factories for the right > > > side, left side works with Table API classes (Expressions, DataTypes). > I > > > hope this clarifies it a bit. > > > > > > Best, > > > > > > Dawid > > > On 24/03/2020 15:03, Becket Qin wrote: > > > > > > Hey Kurt, > > > > > > I don't think DataStream should see some SQL specific concepts such as > > > > > > Filtering or ComputedColumn. > > > > > > Projectable and Filterable seems not necessarily SQL concepts, but > could > > be > > > applicable to DataStream source as well to reduce the network load. For > > > example ORC and Parquet should probably also be readable from > DataStream, > > > right? > > > > > > ComputedColumn is not part of the Source, it is an interface extends > the > > > Deserializer, which is a pluggable for the Source. From the SQL's > > > perspective it has the concept of computed column, but from the Source > > > perspective, It is essentially a Deserializer which also converts the > > > records internally, assuming we allow some conversion to be embedded to > > > the source in addition to just deserialization. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <[hidden email]> < > > [hidden email]> wrote: > > > > > > > > > Thanks Timo for updating the formats section. That would be very > helpful > > > for changelog supporting (FLIP-105). > > > > > > I just left 2 minor comment about some method names. In general, I'm +1 > > to > > > start a voting. > > > > > > > > > > > > -------------------------------------------------------------------------------------------------- > > > > > > Hi Becket, > > > > > > I agree we shouldn't duplicate codes, especiall the runtime > > > implementations. > > > However, the interfaces proposed by FLIP-95 are mainly used during > > > optimization (compiling), not runtime. > > > I don't think there is much to share for this. Because table/sql > > > is declarative, but DataStream is imperative. > > > For example, filter push down, DataStream FilterableSource may allow to > > > accept a FilterFunction (which is a black box for the source). > > > However, table sources should pick the pushed filter expressions, some > > > sources may only support "=", "<", ">" conditions. > > > Pushing a FilterFunction doesn't work in table ecosystem. That means, > the > > > connectors have to have some table-specific implementations. > > > > > > > > > Best, > > > Jark > > > > > > On Tue, 24 Mar 2020 at 20:41, Kurt Young <[hidden email]> < > > [hidden email]> wrote: > > > > > > > > > Hi Becket, > > > > > > I don't think DataStream should see some SQL specific concepts such as > > > Filtering or ComputedColumn. It's > > > better to stay within SQL area and translate to more generic concept > when > > > translating to DataStream/Runtime > > > layer, such as use MapFunction to represent computed column logic. > > > > > > Best, > > > Kurt > > > > > > > > > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <[hidden email]> < > > [hidden email]> wrote: > > > > > > > > > Hi Timo and Dawid, > > > > > > It's really great that we have the same goal. I am actually wondering > > > > > > if > > > > > > we > > > > > > can go one step further to avoid some of the interfaces in Table as > > > > > > well. > > > > > > For example, if we have the FilterableSource, do we still need the > > > FilterableTableSource? Should DynamicTableSource just become a > > > Source<*Row*, > > > SourceSplitT, EnumChkT>? > > > > > > Can you help me understand a bit more about the reason we need the > > > following relational representation / wrapper interfaces v.s. the > > > interfaces that we could put to the Source in FLIP-27? > > > > > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > > > SupportsFilterablePushDown v.s. FilterableSource > > > SupportsProjectablePushDown v.s. ProjectableSource > > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > > > ScanTableSource v.s. ChangeLogDeserializer. > > > LookUpTableSource v.s. LookUpSource > > > > > > Assuming we have all the interfaces on the right side, do we still need > > > > > > the > > > > > > interfaces on the left side? Note that the interfaces on the right can > > > > > > be > > > > > > used by both DataStream and Table. If we do this, there will only be > > > > > > one > > > > > > set of Source interfaces Table and DataStream, the only difference is > > > > > > that > > > > > > the Source for table will have some specific plugins and > > > > > > configurations. > > > > > > An > > > > > > omnipotent Source can implement all the the above interfaces and take a > > > Deserializer that implements both ComputedColumnDeserializer and > > > ChangeLogDeserializer. > > > > > > Would the SQL planner work with that? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <[hidden email]> < > > [hidden email]> > > > wrote: > > > > > > > > > +1. Thanks Timo for the design doc. > > > > > > We can also consider @Experimental too. But I am +1 to > > > > > > @PublicEvolving, > > > > > > we > > > > > > should be confident in the current change. > > > > > > Best, > > > Jingsong Lee > > > > > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <[hidden email]> < > > [hidden email]> > > > > > > wrote: > > > > > > @Becket: We totally agree that we don't need table specific > > > > > > connectors > > > > > > during runtime. As Dawid said, the interfaces proposed here are > > > > > > just > > > > > > for > > > > > > communication with the planner. Once the properties (watermarks, > > > computed column, filters, projecttion etc.) are negotiated, we can > > > configure a regular Flink connector. > > > > > > E.g. setting the watermark assigner and deserialization schema of a > > > Kafka connector. > > > > > > For better separation of concerns, Flink connectors should not > > > > > > include > > > > > > relational interfaces and depend on flink-table. This is the > > > responsibility of table source/sink. > > > > > > @Kurt: I would like to mark them @PublicEvolving already because we > > > > > > need > > > > > > to deprecate the old interfaces as early as possible. We cannot > > > > > > redirect > > > > > > to @Internal interfaces. They are not marked @Public, so we can > > > > > > still > > > > > > evolve them. But a core design shift should not happen again, it > > > > > > would > > > > > > leave a bad impression if we are redesign over and over again. > > > > > > Instead > > > > > > we should be confident in the current change. > > > > > > Regards, > > > Timo > > > > > > > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > > > > > Hi Becket, > > > > > > Answering your question, we have the same intention not to > > > > > > duplicate > > > > > > connectors between datastream and table apis. The interfaces > > > > > > proposed > > > > > > in > > > > > > the FLIP are a way to describe relational properties of a source. > > > > > > The > > > > > > intention is as you described to translate all of those expressed > > > > > > as > > > > > > expressions or other Table specific structures into a DataStream > > > > > > source. > > > > > > In other words I think what we are doing here is in line with > > > > > > what > > > > > > you > > > > > > described. > > > > > > Best, > > > > > > Dawid > > > > > > On 24/03/2020 02:23, Becket Qin wrote: > > > > > > Hi Timo, > > > > > > Thanks for the proposal. I completely agree that the current > > > > > > Table > > > > > > connectors could be simplified quite a bit. I haven't finished > > > > > > reading > > > > > > everything, but here are some quick thoughts. > > > > > > Actually to me the biggest question is why should there be two > > > > > > different > > > > > > connector systems for DataStream and Table? What is the > > > > > > fundamental > > > > > > reason > > > > > > that is preventing us from merging them to one? > > > > > > The basic functionality of a connector is to provide > > > > > > capabilities > > > > > > to > > > > > > do > > > > > > IO > > > > > > and Serde. Conceptually, Table connectors should just be > > > > > > DataStream > > > > > > connectors that are dealing with Rows. It seems that quite a few > > > > > > of > > > > > > the > > > > > > special connector requirements are just a specific way to do IO > > > > > > / > > > > > > Serde. > > > > > > Taking SupportsFilterPushDown as an example, imagine we have the > > > > > > following > > > > > > interface: > > > > > > interface FilterableSource<PREDICATE> { > > > void applyFilterable(Supplier<PREDICATE> predicate); > > > } > > > > > > And if a ParquetSource would like to support filterable, it will > > > > > > become: > > > > > > class ParquetSource implements Source, > > > > > > FilterableSource(FilterPredicate> { > > > > > > ... > > > } > > > > > > For Table, one just need to provide an predicate supplier that > > > > > > converts > > > > > > an > > > > > > Expression to the specified predicate type. This has a few > > > > > > benefit: > > > > > > 1. Same unified API for filterable for sources, regardless of > > > > > > DataStream or > > > > > > Table. > > > 2. The DataStream users now can also use the > > > > > > ExpressionToPredicate > > > > > > supplier if they want to. > > > > > > To summarize, my main point is that I am wondering if it is > > > > > > possible > > > > > > to > > > > > > have a single set of connector interface for both Table and > > > > > > DataStream, > > > > > > rather than having two hierarchies. I am not 100% sure if this > > > > > > would > > > > > > work, > > > > > > but if it works, this would be a huge win from both code > > > > > > maintenance > > > > > > and > > > > > > user experience perspective. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > > > > > > [hidden email]> > > > > > > wrote: > > > > > > > > > Hi Timo, > > > > > > Thank you for the proposal. I think it is an important > > > > > > improvement > > > > > > that > > > > > > will benefit many parts of the Table API. The proposal looks > > > > > > really > > > > > > good > > > > > > to me and personally I would be comfortable with voting on the > > > > > > current > > > > > > state. > > > > > > Best, > > > > > > Dawid > > > > > > On 23/03/2020 18:53, Timo Walther wrote: > > > > > > Hi everyone, > > > > > > I received some questions around how the new interfaces play > > > > > > together > > > > > > with formats and their factories. > > > > > > Furthermore, for MySQL or Postgres CDC logs, the format should > > > > > > be > > > > > > able > > > > > > to return a `ChangelogMode`. > > > > > > Also, I incorporated the feedback around the factory design in > > > > > > general. > > > > > > I added a new section `Factory Interfaces` to the design > > > > > > document. > > > > > > This should be helpful to understand the big picture and > > > > > > connecting > > > > > > the concepts. > > > > > > Please let me know what you think? > > > > > > Thanks, > > > Timo > > > > > > > > > On 18.03.20 13:43, Timo Walther wrote: > > > > > > Hi Benchao, > > > > > > this is a very good question. I will update the FLIP about > > > > > > this. > > > > > > The legacy planner will not support the new interfaces. It > > > > > > will > > > > > > only > > > > > > support the old interfaces. With the next release, I think > > > > > > the > > > > > > Blink > > > > > > planner is stable enough to be the default one as well. > > > > > > Regards, > > > Timo > > > > > > On 18.03.20 08:45, Benchao Li wrote: > > > > > > Hi Timo, > > > > > > Thank you and others for the efforts to prepare this FLIP. > > > > > > The FLIP LGTM generally. > > > > > > +1 for moving blink data structures to table-common, it's > > > > > > useful > > > > > > to > > > > > > udf too > > > in the future. > > > A little question is, do we plan to support the new > > > > > > interfaces > > > > > > and > > > > > > data > > > > > > types in legacy planner? > > > Or we only plan to support these new interfaces in blink > > > > > > planner. > > > > > > And using primary keys from DDL instead of derived key > > > > > > information > > > > > > from > > > > > > each query is also a good idea, > > > we met some use cases where this does not works very well > > > > > > before. > > > > > > This FLIP also makes the dependencies of table modules more > > > > > > clear, I > > > > > > like > > > it very much. > > > > > > Timo Walther <[hidden email]> <[hidden email]> 于2020年3月17日周二 > > 上午1:36写道: > > > > > > > > > Hi everyone, > > > > > > I'm happy to present the results of long discussions that > > > > > > we > > > > > > had > > > > > > internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and > > > > > > many > > > > > > more > > > > > > have contributed to this design document. > > > > > > We would like to propose new long-term table source and > > > > > > table > > > > > > sink > > > > > > interfaces: > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > > > > > This is a requirement for FLIP-105 and finalizing FLIP-32. > > > > > > The goals of this FLIP are: > > > > > > - Simplify the current interface architecture: > > > - Merge upsert, retract, and append sinks. > > > - Unify batch and streaming sources. > > > - Unify batch and streaming sinks. > > > > > > - Allow sources to produce a changelog: > > > - UpsertTableSources have been requested a lot by > > > > > > users. > > > > > > Now > > > > > > is the > > > time to open the internal planner capabilities via the new > > > > > > interfaces. > > > > > > - According to FLIP-105, we would like to support > > > > > > changelogs for > > > > > > processing formats such as Debezium. > > > > > > - Don't rely on DataStream API for source and sinks: > > > - According to FLIP-32, the Table API and SQL should > > > > > > be > > > > > > independent > > > of the DataStream API which is why the `table-common` > > > > > > module > > > > > > has > > > > > > no > > > > > > dependencies on `flink-streaming-java`. > > > - Source and sink implementations should only depend > > > > > > on > > > > > > the > > > > > > `table-common` module after FLIP-27. > > > - Until FLIP-27 is ready, we still put most of the > > > > > > interfaces in > > > > > > `table-common` and strictly separate interfaces that > > > > > > communicate > > > > > > with a > > > planner and actual runtime reader/writers. > > > > > > - Implement efficient sources and sinks without planner > > > > > > dependencies: > > > > > > - Make Blink's internal data structures available to > > > > > > connectors. > > > > > > - Introduce stable interfaces for data structures > > > > > > that > > > > > > can > > > > > > be > > > > > > marked as `@PublicEvolving`. > > > - Only require dependencies on `flink-table-common` > > > > > > in > > > > > > the > > > > > > future > > > > > > It finalizes the concept of dynamic tables and consideres > > > > > > how > > > > > > all > > > > > > source/sink related classes play together. > > > > > > We look forward to your feedback. > > > > > > Regards, > > > Timo > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |