Hi all,
I'd like to start a discussion about improving the new TableSource and TableSink interfaces. Most connectors have been migrated to FLIP-95, but there are still the Filesystem and Hive that have not been migrated. They have some requirements on table connector API. And users also have some additional requirements: - Some connectors have the ability to infer parallelism, the parallelism is good for most cases. - Users have customized parallelism configuration requirements for source and sink. - The connectors need to use topology to build their source/sink instead of a single function. Like JIRA[1], Partition Commit feature and File Compaction feature. Details are in [2]. [1]https://issues.apache.org/jira/browse/FLINK-18674 [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces Best, Jingsong |
+1,it’s a good news
> 2020年9月23日 下午6:22,Jingsong Li <[hidden email]> 写道: > > Hi all, > > I'd like to start a discussion about improving the new TableSource and > TableSink interfaces. > > Most connectors have been migrated to FLIP-95, but there are still the > Filesystem and Hive that have not been migrated. They have some > requirements on table connector API. And users also have some additional > requirements: > - Some connectors have the ability to infer parallelism, the parallelism is > good for most cases. > - Users have customized parallelism configuration requirements for source > and sink. > - The connectors need to use topology to build their source/sink instead of > a single function. Like JIRA[1], Partition Commit feature and File > Compaction feature. > > Details are in [2]. > > [1]https://issues.apache.org/jira/browse/FLINK-18674 > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces > > Best, > Jingsong |
Thanks Jingsong for driving this, this is indeed a useful feature and lots
of users are asking for it. For setting a fixed source parallelism, I'm wondering whether this is necessary. For kafka, I can imagine users would expect Flink will use the number of partitions as the parallelism. If it's too large, one can use the max parallelism to make it smaller. But for ES, which doesn't have ability to decide a reasonable parallelism on its own, it might make sense to introduce a user specified parallelism for such table source. So I think it would be better to reorganize the document a little bit, to explain the connectors one by one. Briefly introduce use cases and what kind of options are needed in your opinion. Regarding the interface `DataStreamScanProvider`, a concrete example would help the discussion. What kind of scenarios do you want to support? And what kind of connectors need such an interface? Best, Kurt On Wed, Sep 23, 2020 at 9:30 PM admin <[hidden email]> wrote: > +1,it’s a good news > > > 2020年9月23日 下午6:22,Jingsong Li <[hidden email]> 写道: > > > > Hi all, > > > > I'd like to start a discussion about improving the new TableSource and > > TableSink interfaces. > > > > Most connectors have been migrated to FLIP-95, but there are still the > > Filesystem and Hive that have not been migrated. They have some > > requirements on table connector API. And users also have some additional > > requirements: > > - Some connectors have the ability to infer parallelism, the parallelism > is > > good for most cases. > > - Users have customized parallelism configuration requirements for source > > and sink. > > - The connectors need to use topology to build their source/sink instead > of > > a single function. Like JIRA[1], Partition Commit feature and File > > Compaction feature. > > > > Details are in [2]. > > > > [1]https://issues.apache.org/jira/browse/FLINK-18674 > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces > > > > Best, > > Jingsong > > |
Hi Kurt, in the past we had a very interesting use case in this regard: our
customer (oracle) db was quite big and running too many queries in parallel was too heavy and it was causing the queries to fail. So we had to limit the source parallelism to 2 threads. After the fetching of the data the other operators could use the max parallelism as usual.. Best, Flavio On Thu, Sep 24, 2020 at 9:59 AM Kurt Young <[hidden email]> wrote: > Thanks Jingsong for driving this, this is indeed a useful feature and lots > of users are asking for it. > > For setting a fixed source parallelism, I'm wondering whether this is > necessary. For kafka, > I can imagine users would expect Flink will use the number of partitions as > the parallelism. If it's too > large, one can use the max parallelism to make it smaller. > But for ES, which doesn't have ability to decide a reasonable parallelism > on its own, it might make sense > to introduce a user specified parallelism for such table source. > > So I think it would be better to reorganize the document a little bit, to > explain the connectors one by one. Briefly > introduce use cases and what kind of options are needed in your opinion. > > Regarding the interface `DataStreamScanProvider`, a concrete example would > help the discussion. What kind > of scenarios do you want to support? And what kind of connectors need such > an interface? > > Best, > Kurt > > > On Wed, Sep 23, 2020 at 9:30 PM admin <[hidden email]> wrote: > > > +1,it’s a good news > > > > > 2020年9月23日 下午6:22,Jingsong Li <[hidden email]> 写道: > > > > > > Hi all, > > > > > > I'd like to start a discussion about improving the new TableSource and > > > TableSink interfaces. > > > > > > Most connectors have been migrated to FLIP-95, but there are still the > > > Filesystem and Hive that have not been migrated. They have some > > > requirements on table connector API. And users also have some > additional > > > requirements: > > > - Some connectors have the ability to infer parallelism, the > parallelism > > is > > > good for most cases. > > > - Users have customized parallelism configuration requirements for > source > > > and sink. > > > - The connectors need to use topology to build their source/sink > instead > > of > > > a single function. Like JIRA[1], Partition Commit feature and File > > > Compaction feature. > > > > > > Details are in [2]. > > > > > > [1]https://issues.apache.org/jira/browse/FLINK-18674 > > > [2] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces > > > > > > Best, > > > Jingsong > > > > |
Yeah, JDBC is definitely a popular use case we should consider.
Best, Kurt On Thu, Sep 24, 2020 at 4:11 PM Flavio Pompermaier <[hidden email]> wrote: > Hi Kurt, in the past we had a very interesting use case in this regard: our > customer (oracle) db was quite big and running too many queries in parallel > was too heavy and it was causing the queries to fail. > So we had to limit the source parallelism to 2 threads. After the fetching > of the data the other operators could use the max parallelism as usual.. > > Best, > Flavio > > On Thu, Sep 24, 2020 at 9:59 AM Kurt Young <[hidden email]> wrote: > > > Thanks Jingsong for driving this, this is indeed a useful feature and > lots > > of users are asking for it. > > > > For setting a fixed source parallelism, I'm wondering whether this is > > necessary. For kafka, > > I can imagine users would expect Flink will use the number of partitions > as > > the parallelism. If it's too > > large, one can use the max parallelism to make it smaller. > > But for ES, which doesn't have ability to decide a reasonable parallelism > > on its own, it might make sense > > to introduce a user specified parallelism for such table source. > > > > So I think it would be better to reorganize the document a little bit, to > > explain the connectors one by one. Briefly > > introduce use cases and what kind of options are needed in your opinion. > > > > Regarding the interface `DataStreamScanProvider`, a concrete example > would > > help the discussion. What kind > > of scenarios do you want to support? And what kind of connectors need > such > > an interface? > > > > Best, > > Kurt > > > > > > On Wed, Sep 23, 2020 at 9:30 PM admin <[hidden email]> wrote: > > > > > +1,it’s a good news > > > > > > > 2020年9月23日 下午6:22,Jingsong Li <[hidden email]> 写道: > > > > > > > > Hi all, > > > > > > > > I'd like to start a discussion about improving the new TableSource > and > > > > TableSink interfaces. > > > > > > > > Most connectors have been migrated to FLIP-95, but there are still > the > > > > Filesystem and Hive that have not been migrated. They have some > > > > requirements on table connector API. And users also have some > > additional > > > > requirements: > > > > - Some connectors have the ability to infer parallelism, the > > parallelism > > > is > > > > good for most cases. > > > > - Users have customized parallelism configuration requirements for > > source > > > > and sink. > > > > - The connectors need to use topology to build their source/sink > > instead > > > of > > > > a single function. Like JIRA[1], Partition Commit feature and File > > > > Compaction feature. > > > > > > > > Details are in [2]. > > > > > > > > [1]https://issues.apache.org/jira/browse/FLINK-18674 > > > > [2] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces > > > > > > > > Best, > > > > Jingsong > > > > > > > |
Thanks Kurt and Flavio for your feedback.
To Kurt: > Briefly introduce use cases and what kind of options are needed in your opinion. In the "Choose Scan Parallelism" chapter: - I explained the user cases - I adjusted the relationship to make user specified parallelism more convenient To Flavio: Yes, you can configure `scan.infer-parallelism.max` or directly use `scan.parallelism`. To Kurt: > Regarding the interface `DataStreamScanProvider`, a concrete example would help the discussion. From the user feedback in [1]. There are two users have similar following feedback (CC: liu shouwei): (It is from user-zh, Translate to English) Briefly introduce the background. One of the tasks of our group is that users write SQL on the page. We are responsible for converting SQL processing into Flink jobs and running them on our platform. The conversion process depends on our SQL SDK. Let me give you a few examples that we often use and feel that the new API 1.11 is not easy to implement: 1. We now have a specific Kafka data format. One Kafka data will be converted into n (n is a positive integer) row data. Our approach is to add a process / flatmap phase to emit datastream to deal with this situation, which is transparent to users. 2. At present, we have encapsulated some of our own sinks. We will add a process / filter before the sink to perform buffer pool / micro batch / data filtering functions. 3. Adjust or specify the source / sink parallelism to the user specified value. We also do this on the datastream level. 4. For some special source sinks, they will be combined with keyby operations (transparent to users). We also do this on the datastream level. For example, in question 2 above, we can implement it in the sinkfunction, but I personally think it may not be ideal in design. When designing and arranging functions / operators, I am used to following the principle of "single responsibility of operators". This is why I split multiple process / filter operators in front of sink functions instead of coupling these functions to sink functions. On the other hand, without datastream, the cost of migrating to the new API is relatively higher. Or, we have some special reasons. When operators are arranged, we will modify the task chain strategy. At this time, the flexibility of datastream is essential. [1]https://issues.apache.org/jira/browse/FLINK-18674 Best, Jingsong On Thu, Sep 24, 2020 at 4:15 PM Kurt Young <[hidden email]> wrote: > Yeah, JDBC is definitely a popular use case we should consider. > > Best, > Kurt > > > On Thu, Sep 24, 2020 at 4:11 PM Flavio Pompermaier <[hidden email]> > wrote: > > > Hi Kurt, in the past we had a very interesting use case in this regard: > our > > customer (oracle) db was quite big and running too many queries in > parallel > > was too heavy and it was causing the queries to fail. > > So we had to limit the source parallelism to 2 threads. After the > fetching > > of the data the other operators could use the max parallelism as usual.. > > > > Best, > > Flavio > > > > On Thu, Sep 24, 2020 at 9:59 AM Kurt Young <[hidden email]> wrote: > > > > > Thanks Jingsong for driving this, this is indeed a useful feature and > > lots > > > of users are asking for it. > > > > > > For setting a fixed source parallelism, I'm wondering whether this is > > > necessary. For kafka, > > > I can imagine users would expect Flink will use the number of > partitions > > as > > > the parallelism. If it's too > > > large, one can use the max parallelism to make it smaller. > > > But for ES, which doesn't have ability to decide a reasonable > parallelism > > > on its own, it might make sense > > > to introduce a user specified parallelism for such table source. > > > > > > So I think it would be better to reorganize the document a little bit, > to > > > explain the connectors one by one. Briefly > > > introduce use cases and what kind of options are needed in your > opinion. > > > > > > Regarding the interface `DataStreamScanProvider`, a concrete example > > would > > > help the discussion. What kind > > > of scenarios do you want to support? And what kind of connectors need > > such > > > an interface? > > > > > > Best, > > > Kurt > > > > > > > > > On Wed, Sep 23, 2020 at 9:30 PM admin <[hidden email]> wrote: > > > > > > > +1,it’s a good news > > > > > > > > > 2020年9月23日 下午6:22,Jingsong Li <[hidden email]> 写道: > > > > > > > > > > Hi all, > > > > > > > > > > I'd like to start a discussion about improving the new TableSource > > and > > > > > TableSink interfaces. > > > > > > > > > > Most connectors have been migrated to FLIP-95, but there are still > > the > > > > > Filesystem and Hive that have not been migrated. They have some > > > > > requirements on table connector API. And users also have some > > > additional > > > > > requirements: > > > > > - Some connectors have the ability to infer parallelism, the > > > parallelism > > > > is > > > > > good for most cases. > > > > > - Users have customized parallelism configuration requirements for > > > source > > > > > and sink. > > > > > - The connectors need to use topology to build their source/sink > > > instead > > > > of > > > > > a single function. Like JIRA[1], Partition Commit feature and File > > > > > Compaction feature. > > > > > > > > > > Details are in [2]. > > > > > > > > > > [1]https://issues.apache.org/jira/browse/FLINK-18674 > > > > > [2] > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces > > > > > > > > > > Best, > > > > > Jingsong > > > > > > > > > > > -- Best, Jingsong Lee |
In reply to this post by Kurt Young
Hi Jingsong,
Thanks for preparing this FLIP. WRT ParallelismProvider, it looks good to me. Kurt Young <[hidden email]> 于2020年9月24日周四 下午4:14写道: > Yeah, JDBC is definitely a popular use case we should consider. > > Best, > Kurt > > > On Thu, Sep 24, 2020 at 4:11 PM Flavio Pompermaier <[hidden email]> > wrote: > > > Hi Kurt, in the past we had a very interesting use case in this regard: > our > > customer (oracle) db was quite big and running too many queries in > parallel > > was too heavy and it was causing the queries to fail. > > So we had to limit the source parallelism to 2 threads. After the > fetching > > of the data the other operators could use the max parallelism as usual.. > > > > Best, > > Flavio > > > > On Thu, Sep 24, 2020 at 9:59 AM Kurt Young <[hidden email]> wrote: > > > > > Thanks Jingsong for driving this, this is indeed a useful feature and > > lots > > > of users are asking for it. > > > > > > For setting a fixed source parallelism, I'm wondering whether this is > > > necessary. For kafka, > > > I can imagine users would expect Flink will use the number of > partitions > > as > > > the parallelism. If it's too > > > large, one can use the max parallelism to make it smaller. > > > But for ES, which doesn't have ability to decide a reasonable > parallelism > > > on its own, it might make sense > > > to introduce a user specified parallelism for such table source. > > > > > > So I think it would be better to reorganize the document a little bit, > to > > > explain the connectors one by one. Briefly > > > introduce use cases and what kind of options are needed in your > opinion. > > > > > > Regarding the interface `DataStreamScanProvider`, a concrete example > > would > > > help the discussion. What kind > > > of scenarios do you want to support? And what kind of connectors need > > such > > > an interface? > > > > > > Best, > > > Kurt > > > > > > > > > On Wed, Sep 23, 2020 at 9:30 PM admin <[hidden email]> wrote: > > > > > > > +1,it’s a good news > > > > > > > > > 2020年9月23日 下午6:22,Jingsong Li <[hidden email]> 写道: > > > > > > > > > > Hi all, > > > > > > > > > > I'd like to start a discussion about improving the new TableSource > > and > > > > > TableSink interfaces. > > > > > > > > > > Most connectors have been migrated to FLIP-95, but there are still > > the > > > > > Filesystem and Hive that have not been migrated. They have some > > > > > requirements on table connector API. And users also have some > > > additional > > > > > requirements: > > > > > - Some connectors have the ability to infer parallelism, the > > > parallelism > > > > is > > > > > good for most cases. > > > > > - Users have customized parallelism configuration requirements for > > > source > > > > > and sink. > > > > > - The connectors need to use topology to build their source/sink > > > instead > > > > of > > > > > a single function. Like JIRA[1], Partition Commit feature and File > > > > > Compaction feature. > > > > > > > > > > Details are in [2]. > > > > > > > > > > [1]https://issues.apache.org/jira/browse/FLINK-18674 > > > > > [2] > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces > > > > > > > > > > Best, > > > > > Jingsong > > > > > > > > > > > -- Best, Benchao Li |
Thanks for the proposal! I think the use cases that we are trying to
solve are indeed valid. However, I think we might have to take a step back to look at what we're trying to solve and how we can solve it. The FLIP seems to have two broader topics: 1) add "get parallelism" to sinks/sources 2) let users write DataStream topologies for sinks/sources. I'll treat them separately below. I think we should not add "get parallelism" to the Table Sink API because I think it's the wrong level of abstraction. The Table API connectors are (or should be) more or less thin wrappers around "physical" connectors. By "physical" I mean the underlying (mostly DataStream API) connectors. For example, with the Kafka Connector the Table API connector just does the configuration parsing and determines a good (de)serialization format and then creates the underlying FlinkKafkaConsumer/FlinkKafkaProducer. If we wanted to add a "get parallelism" it would be in those underlying connectors but I'm also skeptical about adding such a method there because it is a static assignment and would preclude clever optimizations about the parallelism of a connector at runtime. But maybe that's thinking too much about future work so I'm open to discussion there. Regarding the second point of letting Table connector developers use DataStream: I think we should not do it. One of the purposes of FLIP-95 [1] was to decouple the Table API from the DataStream API for the basic interfaces. Coupling the two too closely at that basic level will make our live harder in the future when we want to evolve those APIs or when we want the system to be better at choosing how to execute sources and sinks. An example of this is actually the past of the Table API. Before FLIP-95 we had connectors that dealt directly with DataSet and DataStream, meaning that if users wanted their Table Sink to work in both BATCH and STREAMING mode they had to provide two implementations. The trend is towards unifying the sources/sinks to common interfaces that can be used for both BATCH and STREAMING execution but, again, I think exposing DataStream here would be a step back in the wrong direction. I think the solution to the existing user requirement of using DataStream sources and sinks with the Table API should be better interoperability between the two APIs, which is being tackled right now in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that we're trying to solve here, maybe we should think about FLIP-136 some more. What do you think? Best, Aljoscha [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API |
Hi,Aljoscha, I would like to share a use case to second setting parallelism
of table sink(or limiting parallelism range of table sink): When writing data to databases, there is limitation for number of jdbc connections and query TPS. we would get errors of too many connections or high load for db and poor performance because of too many small requests if the optimizer didn't know such information, and set a large parallelism for sink when matching the parallelism of its input. On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email]> wrote: > Thanks for the proposal! I think the use cases that we are trying to > solve are indeed valid. However, I think we might have to take a step > back to look at what we're trying to solve and how we can solve it. > > The FLIP seems to have two broader topics: 1) add "get parallelism" to > sinks/sources 2) let users write DataStream topologies for > sinks/sources. I'll treat them separately below. > > I think we should not add "get parallelism" to the Table Sink API > because I think it's the wrong level of abstraction. The Table API > connectors are (or should be) more or less thin wrappers around > "physical" connectors. By "physical" I mean the underlying (mostly > DataStream API) connectors. For example, with the Kafka Connector the > Table API connector just does the configuration parsing and determines a > good (de)serialization format and then creates the underlying > FlinkKafkaConsumer/FlinkKafkaProducer. > > If we wanted to add a "get parallelism" it would be in those underlying > connectors but I'm also skeptical about adding such a method there > because it is a static assignment and would preclude clever > optimizations about the parallelism of a connector at runtime. But maybe > that's thinking too much about future work so I'm open to discussion there. > > Regarding the second point of letting Table connector developers use > DataStream: I think we should not do it. One of the purposes of FLIP-95 > [1] was to decouple the Table API from the DataStream API for the basic > interfaces. Coupling the two too closely at that basic level will make > our live harder in the future when we want to evolve those APIs or when > we want the system to be better at choosing how to execute sources and > sinks. An example of this is actually the past of the Table API. Before > FLIP-95 we had connectors that dealt directly with DataSet and > DataStream, meaning that if users wanted their Table Sink to work in > both BATCH and STREAMING mode they had to provide two implementations. > The trend is towards unifying the sources/sinks to common interfaces > that can be used for both BATCH and STREAMING execution but, again, I > think exposing DataStream here would be a step back in the wrong direction. > > I think the solution to the existing user requirement of using > DataStream sources and sinks with the Table API should be better > interoperability between the two APIs, which is being tackled right now > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that > we're trying to solve here, maybe we should think about FLIP-136 some more. > > What do you think? > > Best, > Aljoscha > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > |
Hi Aljoscha,
Thank you for your feedback, ## Connector parallelism Requirements: Set parallelism by user specified or inferred by connector. How to configure parallelism in DataStream: In the DataStream world, the only way to configure parallelism is `SingleOutputStreamOperator.setParallelism`. Actually, users need to have access to DataStream when using a connector, not just the `SourceFunction` / `Source` interface. Is parallelism related to connectors? I think yes, there are many connectors that can support obtaining parallelism related information from them, and users do exactly that. This means parallelism inference (From connectors). The key is that `DataStream` is an open programming API, and users can freely program to set parallelism. How to configure parallelism in Table/SQL: But Table/SQL is not an open programming API, every feature needs a corresponding mechanism, because the user is no longer able to program. Our current connector interface: SourceFunctionProvider, SinkFunctionProvider, through these interfaces, there is no ability to generate connector related parallelism. Back to our original intention: to avoid users directly manipulating `DataStream`. Since we want to avoid it, we need to provide corresponding features. And parallelism is the runtime information of connectors, It fits the name of `ScanRuntimeProvider`. > If we wanted to add a "get parallelism" it would be in those underlying connectors but I'm also skeptical about adding such a method there because it is a static assignment and would preclude clever optimizations about the parallelism of a connector at runtime. I think that when a job is submitted, it is in compile time. It should only provide static parallelism. ## DataStream in table connector As I said before, if we want to completely cancel DataStream in the table connector, we need to provide corresponding functions in `xxRuntimeProvider`. Otherwise, we and users may not be able to migrate the old connectors. Including Hive/FileSystem connectors and the user cases I mentioned above. CC: @liu shouwei We really need to consider these cases. If there is no alternative in a short period of time, for a long time, users need to continue to use the old table connector API, which has been deprecated. Why not use StreamTableEnvironment fromDataStream/toDataStream? - These tables are just temporary tables. Can not be integrated/stored into Catalog. - Creating table DDL can not work... - We need to lose the kinds of useful features of Table/SQL on the connector. For example, projection pushdown, filter pushdown, partitions and etc... But I believe you are right in the long run. The source and sink APIs should be powerful enough to cover all reasonable cases. Maybe we can just introduce them in a minimal way. For example, we only introduce `DataStreamSinkProvider` in planner as an internal API. Your points are very meaningful, hope to get your reply. Best, Jingsong On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <[hidden email]> wrote: > Hi,Aljoscha, I would like to share a use case to second setting parallelism > of table sink(or limiting parallelism range of table sink): When writing > data to databases, there is limitation for number of jdbc connections and > query TPS. we would get errors of too many connections or high load for > db and poor performance because of too many small requests if the optimizer > didn't know such information, and set a large parallelism for sink when > matching the parallelism of its input. > > On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email]> > wrote: > > > Thanks for the proposal! I think the use cases that we are trying to > > solve are indeed valid. However, I think we might have to take a step > > back to look at what we're trying to solve and how we can solve it. > > > > The FLIP seems to have two broader topics: 1) add "get parallelism" to > > sinks/sources 2) let users write DataStream topologies for > > sinks/sources. I'll treat them separately below. > > > > I think we should not add "get parallelism" to the Table Sink API > > because I think it's the wrong level of abstraction. The Table API > > connectors are (or should be) more or less thin wrappers around > > "physical" connectors. By "physical" I mean the underlying (mostly > > DataStream API) connectors. For example, with the Kafka Connector the > > Table API connector just does the configuration parsing and determines a > > good (de)serialization format and then creates the underlying > > FlinkKafkaConsumer/FlinkKafkaProducer. > > > > If we wanted to add a "get parallelism" it would be in those underlying > > connectors but I'm also skeptical about adding such a method there > > because it is a static assignment and would preclude clever > > optimizations about the parallelism of a connector at runtime. But maybe > > that's thinking too much about future work so I'm open to discussion > there. > > > > Regarding the second point of letting Table connector developers use > > DataStream: I think we should not do it. One of the purposes of FLIP-95 > > [1] was to decouple the Table API from the DataStream API for the basic > > interfaces. Coupling the two too closely at that basic level will make > > our live harder in the future when we want to evolve those APIs or when > > we want the system to be better at choosing how to execute sources and > > sinks. An example of this is actually the past of the Table API. Before > > FLIP-95 we had connectors that dealt directly with DataSet and > > DataStream, meaning that if users wanted their Table Sink to work in > > both BATCH and STREAMING mode they had to provide two implementations. > > The trend is towards unifying the sources/sinks to common interfaces > > that can be used for both BATCH and STREAMING execution but, again, I > > think exposing DataStream here would be a step back in the wrong > direction. > > > > I think the solution to the existing user requirement of using > > DataStream sources and sinks with the Table API should be better > > interoperability between the two APIs, which is being tackled right now > > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that > > we're trying to solve here, maybe we should think about FLIP-136 some > more. > > > > What do you think? > > > > Best, > > Aljoscha > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > > > > -- Best, Jingsong Lee |
Hi Jingsong,
Thanks for driving this effort. I have two minor comments. 1. IMHO, parallelism is a concept that applies to all ScanTableSource. So instead of defining a new interface, is it more natural to incorporate parallel inference to existing interfaces, e.g. ScanTableSource or ScanRuntimeProvider? 2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From a user's perspective, parallelism is either set by `scan.parallelism`, or automatically decided by Flink. If a user doesn't want the connector to infer parallelism, he/she can simply set `scan.parallelism`, no? On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <[hidden email]> wrote: > Hi Aljoscha, > > Thank you for your feedback, > > ## Connector parallelism > > Requirements: > Set parallelism by user specified or inferred by connector. > > How to configure parallelism in DataStream: > In the DataStream world, the only way to configure parallelism is > `SingleOutputStreamOperator.setParallelism`. Actually, users need to have > access to DataStream when using a connector, not just the `SourceFunction` > / `Source` interface. > Is parallelism related to connectors? I think yes, there are many > connectors that can support obtaining parallelism related information from > them, and users do exactly that. This means parallelism inference (From > connectors). > The key is that `DataStream` is an open programming API, and users can > freely program to set parallelism. > > How to configure parallelism in Table/SQL: > But Table/SQL is not an open programming API, every feature needs a > corresponding mechanism, because the user is no longer able to program. Our > current connector interface: SourceFunctionProvider, SinkFunctionProvider, > through these interfaces, there is no ability to generate connector related > parallelism. > Back to our original intention: to avoid users directly manipulating > `DataStream`. Since we want to avoid it, we need to provide corresponding > features. > > And parallelism is the runtime information of connectors, It fits the name > of `ScanRuntimeProvider`. > > > If we wanted to add a "get parallelism" it would be in those underlying > connectors but I'm also skeptical about adding such a method there because > it is a static assignment and would preclude clever optimizations about the > parallelism of a connector at runtime. > > I think that when a job is submitted, it is in compile time. It should only > provide static parallelism. > > ## DataStream in table connector > > As I said before, if we want to completely cancel DataStream in the table > connector, we need to provide corresponding functions in > `xxRuntimeProvider`. > Otherwise, we and users may not be able to migrate the old connectors. > Including Hive/FileSystem connectors and the user cases I mentioned above. > CC: @liu shouwei > > We really need to consider these cases. > If there is no alternative in a short period of time, for a long > time, users need to continue to use the old table connector API, which has > been deprecated. > > Why not use StreamTableEnvironment fromDataStream/toDataStream? > - These tables are just temporary tables. Can not be integrated/stored into > Catalog. > - Creating table DDL can not work... > - We need to lose the kinds of useful features of Table/SQL on the > connector. For example, projection pushdown, filter pushdown, partitions > and etc... > > But I believe you are right in the long run. The source and sink APIs > should be powerful enough to cover all reasonable cases. > Maybe we can just introduce them in a minimal way. For example, we only > introduce `DataStreamSinkProvider` in planner as an internal API. > > Your points are very meaningful, hope to get your reply. > > Best, > Jingsong > > On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <[hidden email]> > wrote: > > > Hi,Aljoscha, I would like to share a use case to second setting > parallelism > > of table sink(or limiting parallelism range of table sink): When writing > > data to databases, there is limitation for number of jdbc connections and > > query TPS. we would get errors of too many connections or high load for > > db and poor performance because of too many small requests if the > optimizer > > didn't know such information, and set a large parallelism for sink when > > matching the parallelism of its input. > > > > On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > Thanks for the proposal! I think the use cases that we are trying to > > > solve are indeed valid. However, I think we might have to take a step > > > back to look at what we're trying to solve and how we can solve it. > > > > > > The FLIP seems to have two broader topics: 1) add "get parallelism" to > > > sinks/sources 2) let users write DataStream topologies for > > > sinks/sources. I'll treat them separately below. > > > > > > I think we should not add "get parallelism" to the Table Sink API > > > because I think it's the wrong level of abstraction. The Table API > > > connectors are (or should be) more or less thin wrappers around > > > "physical" connectors. By "physical" I mean the underlying (mostly > > > DataStream API) connectors. For example, with the Kafka Connector the > > > Table API connector just does the configuration parsing and determines > a > > > good (de)serialization format and then creates the underlying > > > FlinkKafkaConsumer/FlinkKafkaProducer. > > > > > > If we wanted to add a "get parallelism" it would be in those underlying > > > connectors but I'm also skeptical about adding such a method there > > > because it is a static assignment and would preclude clever > > > optimizations about the parallelism of a connector at runtime. But > maybe > > > that's thinking too much about future work so I'm open to discussion > > there. > > > > > > Regarding the second point of letting Table connector developers use > > > DataStream: I think we should not do it. One of the purposes of FLIP-95 > > > [1] was to decouple the Table API from the DataStream API for the basic > > > interfaces. Coupling the two too closely at that basic level will make > > > our live harder in the future when we want to evolve those APIs or when > > > we want the system to be better at choosing how to execute sources and > > > sinks. An example of this is actually the past of the Table API. Before > > > FLIP-95 we had connectors that dealt directly with DataSet and > > > DataStream, meaning that if users wanted their Table Sink to work in > > > both BATCH and STREAMING mode they had to provide two implementations. > > > The trend is towards unifying the sources/sinks to common interfaces > > > that can be used for both BATCH and STREAMING execution but, again, I > > > think exposing DataStream here would be a step back in the wrong > > direction. > > > > > > I think the solution to the existing user requirement of using > > > DataStream sources and sinks with the Table API should be better > > > interoperability between the two APIs, which is being tackled right now > > > in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that > > > we're trying to solve here, maybe we should think about FLIP-136 some > > more. > > > > > > What do you think? > > > > > > Best, > > > Aljoscha > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > > [2] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > > > > > > > > > > -- > Best, Jingsong Lee > -- Best regards! Rui Li |
Hi everyone,
Thanks for the proposal. In our company,we meet the same situation as @liu shouwei. We developed some features base on flink.Such as parallelism of sql source/sink connector, and kafka delay consumer which is adding a flatmap and a keyby transformation after the source Datastream. What make us embarrassing is that when we migrate this features to Flink 1.11,we found that the DataSteam is missing,So we modify the blink’s code to support parallelism.But kafka delay comsumer is unsolved until now. From user’s perspective,it necessary to manipulate DataStream or have the interoperability between Table API and DataStream. Best > 2020年9月25日 下午4:18,Rui Li <[hidden email]> 写道: > > Hi Jingsong, > > Thanks for driving this effort. I have two minor comments. > > > 1. IMHO, parallelism is a concept that applies to all ScanTableSource. > So instead of defining a new interface, is it more natural to incorporate > parallel inference to existing interfaces, e.g. ScanTableSource > or ScanRuntimeProvider? > 2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From > a user's perspective, parallelism is either set by `scan.parallelism`, or > automatically decided by Flink. If a user doesn't want the connector to > infer parallelism, he/she can simply set `scan.parallelism`, no? > > > On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <[hidden email]> wrote: > >> Hi Aljoscha, >> >> Thank you for your feedback, >> >> ## Connector parallelism >> >> Requirements: >> Set parallelism by user specified or inferred by connector. >> >> How to configure parallelism in DataStream: >> In the DataStream world, the only way to configure parallelism is >> `SingleOutputStreamOperator.setParallelism`. Actually, users need to have >> access to DataStream when using a connector, not just the `SourceFunction` >> / `Source` interface. >> Is parallelism related to connectors? I think yes, there are many >> connectors that can support obtaining parallelism related information from >> them, and users do exactly that. This means parallelism inference (From >> connectors). >> The key is that `DataStream` is an open programming API, and users can >> freely program to set parallelism. >> >> How to configure parallelism in Table/SQL: >> But Table/SQL is not an open programming API, every feature needs a >> corresponding mechanism, because the user is no longer able to program. Our >> current connector interface: SourceFunctionProvider, SinkFunctionProvider, >> through these interfaces, there is no ability to generate connector related >> parallelism. >> Back to our original intention: to avoid users directly manipulating >> `DataStream`. Since we want to avoid it, we need to provide corresponding >> features. >> >> And parallelism is the runtime information of connectors, It fits the name >> of `ScanRuntimeProvider`. >> >>> If we wanted to add a "get parallelism" it would be in those underlying >> connectors but I'm also skeptical about adding such a method there because >> it is a static assignment and would preclude clever optimizations about the >> parallelism of a connector at runtime. >> >> I think that when a job is submitted, it is in compile time. It should only >> provide static parallelism. >> >> ## DataStream in table connector >> >> As I said before, if we want to completely cancel DataStream in the table >> connector, we need to provide corresponding functions in >> `xxRuntimeProvider`. >> Otherwise, we and users may not be able to migrate the old connectors. >> Including Hive/FileSystem connectors and the user cases I mentioned above. >> CC: @liu shouwei >> >> We really need to consider these cases. >> If there is no alternative in a short period of time, for a long >> time, users need to continue to use the old table connector API, which has >> been deprecated. >> >> Why not use StreamTableEnvironment fromDataStream/toDataStream? >> - These tables are just temporary tables. Can not be integrated/stored into >> Catalog. >> - Creating table DDL can not work... >> - We need to lose the kinds of useful features of Table/SQL on the >> connector. For example, projection pushdown, filter pushdown, partitions >> and etc... >> >> But I believe you are right in the long run. The source and sink APIs >> should be powerful enough to cover all reasonable cases. >> Maybe we can just introduce them in a minimal way. For example, we only >> introduce `DataStreamSinkProvider` in planner as an internal API. >> >> Your points are very meaningful, hope to get your reply. >> >> Best, >> Jingsong >> >> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <[hidden email]> >> wrote: >> >>> Hi,Aljoscha, I would like to share a use case to second setting >> parallelism >>> of table sink(or limiting parallelism range of table sink): When writing >>> data to databases, there is limitation for number of jdbc connections and >>> query TPS. we would get errors of too many connections or high load for >>> db and poor performance because of too many small requests if the >> optimizer >>> didn't know such information, and set a large parallelism for sink when >>> matching the parallelism of its input. >>> >>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email]> >>> wrote: >>> >>>> Thanks for the proposal! I think the use cases that we are trying to >>>> solve are indeed valid. However, I think we might have to take a step >>>> back to look at what we're trying to solve and how we can solve it. >>>> >>>> The FLIP seems to have two broader topics: 1) add "get parallelism" to >>>> sinks/sources 2) let users write DataStream topologies for >>>> sinks/sources. I'll treat them separately below. >>>> >>>> I think we should not add "get parallelism" to the Table Sink API >>>> because I think it's the wrong level of abstraction. The Table API >>>> connectors are (or should be) more or less thin wrappers around >>>> "physical" connectors. By "physical" I mean the underlying (mostly >>>> DataStream API) connectors. For example, with the Kafka Connector the >>>> Table API connector just does the configuration parsing and determines >> a >>>> good (de)serialization format and then creates the underlying >>>> FlinkKafkaConsumer/FlinkKafkaProducer. >>>> >>>> If we wanted to add a "get parallelism" it would be in those underlying >>>> connectors but I'm also skeptical about adding such a method there >>>> because it is a static assignment and would preclude clever >>>> optimizations about the parallelism of a connector at runtime. But >> maybe >>>> that's thinking too much about future work so I'm open to discussion >>> there. >>>> >>>> Regarding the second point of letting Table connector developers use >>>> DataStream: I think we should not do it. One of the purposes of FLIP-95 >>>> [1] was to decouple the Table API from the DataStream API for the basic >>>> interfaces. Coupling the two too closely at that basic level will make >>>> our live harder in the future when we want to evolve those APIs or when >>>> we want the system to be better at choosing how to execute sources and >>>> sinks. An example of this is actually the past of the Table API. Before >>>> FLIP-95 we had connectors that dealt directly with DataSet and >>>> DataStream, meaning that if users wanted their Table Sink to work in >>>> both BATCH and STREAMING mode they had to provide two implementations. >>>> The trend is towards unifying the sources/sinks to common interfaces >>>> that can be used for both BATCH and STREAMING execution but, again, I >>>> think exposing DataStream here would be a step back in the wrong >>> direction. >>>> >>>> I think the solution to the existing user requirement of using >>>> DataStream sources and sinks with the Table API should be better >>>> interoperability between the two APIs, which is being tackled right now >>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that >>>> we're trying to solve here, maybe we should think about FLIP-136 some >>> more. >>>> >>>> What do you think? >>>> >>>> Best, >>>> Aljoscha >>>> >>>> [1] >>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>> [2] >>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API >>>> >>>> >>> >> >> >> -- >> Best, Jingsong Lee >> > > > -- > Best regards! > Rui Li |
Hi,
I'll only respond regarding the parallelism for now because I need to think some more about DataStream. What I'm saying is that exposing a parallelism only for Table Connectors is not the right thing. If we want to allow sources to tell the system/framework what would be a good parallelism it would be at the underlying level. I'll explain with the SourceFunction. A Table API Source connector is basically a factory that will give you a SourceFunction that corresponds to whatever the user configured via properties and other means. If the Table Connector somehow happens to know what would be a good parallelism for the source it could "tell" the source when creating it, i.e. new MyRealSource(path, and, whatnot, parallelismHint) Then the source could either work with that information it got, by shutting down (at runtime) some of its parallel instances. Or we could extend the Source (SourceFunction) API to expose a "parallelism hint" to the system. The basic thing is that Table Connectors are not the real connectors, they just delegate to underlying real connectors. So those underlying connectors are where we need to change things. Otherwise we would just have special-case solutions for the Table API. Best, Aljoscha On 25.09.20 14:30, admin wrote: > Hi everyone, > Thanks for the proposal. > > In our company,we meet the same situation as @liu shouwei. > We developed some features base on flink.Such as parallelism of sql source/sink connector, and kafka delay consumer which is adding a flatmap and a keyby transformation after the source Datastream. > What make us embarrassing is that when we migrate this features to Flink 1.11,we found that the DataSteam is missing,So we modify the blink’s code to support parallelism.But kafka delay comsumer is unsolved until now. > > From user’s perspective,it necessary to manipulate DataStream or have the interoperability between Table API and DataStream. > > Best > > > >> 2020年9月25日 下午4:18,Rui Li <[hidden email]> 写道: >> >> Hi Jingsong, >> >> Thanks for driving this effort. I have two minor comments. >> >> >> 1. IMHO, parallelism is a concept that applies to all ScanTableSource. >> So instead of defining a new interface, is it more natural to incorporate >> parallel inference to existing interfaces, e.g. ScanTableSource >> or ScanRuntimeProvider? >> 2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. From >> a user's perspective, parallelism is either set by `scan.parallelism`, or >> automatically decided by Flink. If a user doesn't want the connector to >> infer parallelism, he/she can simply set `scan.parallelism`, no? >> >> >> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <[hidden email]> wrote: >> >>> Hi Aljoscha, >>> >>> Thank you for your feedback, >>> >>> ## Connector parallelism >>> >>> Requirements: >>> Set parallelism by user specified or inferred by connector. >>> >>> How to configure parallelism in DataStream: >>> In the DataStream world, the only way to configure parallelism is >>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to have >>> access to DataStream when using a connector, not just the `SourceFunction` >>> / `Source` interface. >>> Is parallelism related to connectors? I think yes, there are many >>> connectors that can support obtaining parallelism related information from >>> them, and users do exactly that. This means parallelism inference (From >>> connectors). >>> The key is that `DataStream` is an open programming API, and users can >>> freely program to set parallelism. >>> >>> How to configure parallelism in Table/SQL: >>> But Table/SQL is not an open programming API, every feature needs a >>> corresponding mechanism, because the user is no longer able to program. Our >>> current connector interface: SourceFunctionProvider, SinkFunctionProvider, >>> through these interfaces, there is no ability to generate connector related >>> parallelism. >>> Back to our original intention: to avoid users directly manipulating >>> `DataStream`. Since we want to avoid it, we need to provide corresponding >>> features. >>> >>> And parallelism is the runtime information of connectors, It fits the name >>> of `ScanRuntimeProvider`. >>> >>>> If we wanted to add a "get parallelism" it would be in those underlying >>> connectors but I'm also skeptical about adding such a method there because >>> it is a static assignment and would preclude clever optimizations about the >>> parallelism of a connector at runtime. >>> >>> I think that when a job is submitted, it is in compile time. It should only >>> provide static parallelism. >>> >>> ## DataStream in table connector >>> >>> As I said before, if we want to completely cancel DataStream in the table >>> connector, we need to provide corresponding functions in >>> `xxRuntimeProvider`. >>> Otherwise, we and users may not be able to migrate the old connectors. >>> Including Hive/FileSystem connectors and the user cases I mentioned above. >>> CC: @liu shouwei >>> >>> We really need to consider these cases. >>> If there is no alternative in a short period of time, for a long >>> time, users need to continue to use the old table connector API, which has >>> been deprecated. >>> >>> Why not use StreamTableEnvironment fromDataStream/toDataStream? >>> - These tables are just temporary tables. Can not be integrated/stored into >>> Catalog. >>> - Creating table DDL can not work... >>> - We need to lose the kinds of useful features of Table/SQL on the >>> connector. For example, projection pushdown, filter pushdown, partitions >>> and etc... >>> >>> But I believe you are right in the long run. The source and sink APIs >>> should be powerful enough to cover all reasonable cases. >>> Maybe we can just introduce them in a minimal way. For example, we only >>> introduce `DataStreamSinkProvider` in planner as an internal API. >>> >>> Your points are very meaningful, hope to get your reply. >>> >>> Best, >>> Jingsong >>> >>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <[hidden email]> >>> wrote: >>> >>>> Hi,Aljoscha, I would like to share a use case to second setting >>> parallelism >>>> of table sink(or limiting parallelism range of table sink): When writing >>>> data to databases, there is limitation for number of jdbc connections and >>>> query TPS. we would get errors of too many connections or high load for >>>> db and poor performance because of too many small requests if the >>> optimizer >>>> didn't know such information, and set a large parallelism for sink when >>>> matching the parallelism of its input. >>>> >>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email]> >>>> wrote: >>>> >>>>> Thanks for the proposal! I think the use cases that we are trying to >>>>> solve are indeed valid. However, I think we might have to take a step >>>>> back to look at what we're trying to solve and how we can solve it. >>>>> >>>>> The FLIP seems to have two broader topics: 1) add "get parallelism" to >>>>> sinks/sources 2) let users write DataStream topologies for >>>>> sinks/sources. I'll treat them separately below. >>>>> >>>>> I think we should not add "get parallelism" to the Table Sink API >>>>> because I think it's the wrong level of abstraction. The Table API >>>>> connectors are (or should be) more or less thin wrappers around >>>>> "physical" connectors. By "physical" I mean the underlying (mostly >>>>> DataStream API) connectors. For example, with the Kafka Connector the >>>>> Table API connector just does the configuration parsing and determines >>> a >>>>> good (de)serialization format and then creates the underlying >>>>> FlinkKafkaConsumer/FlinkKafkaProducer. >>>>> >>>>> If we wanted to add a "get parallelism" it would be in those underlying >>>>> connectors but I'm also skeptical about adding such a method there >>>>> because it is a static assignment and would preclude clever >>>>> optimizations about the parallelism of a connector at runtime. But >>> maybe >>>>> that's thinking too much about future work so I'm open to discussion >>>> there. >>>>> >>>>> Regarding the second point of letting Table connector developers use >>>>> DataStream: I think we should not do it. One of the purposes of FLIP-95 >>>>> [1] was to decouple the Table API from the DataStream API for the basic >>>>> interfaces. Coupling the two too closely at that basic level will make >>>>> our live harder in the future when we want to evolve those APIs or when >>>>> we want the system to be better at choosing how to execute sources and >>>>> sinks. An example of this is actually the past of the Table API. Before >>>>> FLIP-95 we had connectors that dealt directly with DataSet and >>>>> DataStream, meaning that if users wanted their Table Sink to work in >>>>> both BATCH and STREAMING mode they had to provide two implementations. >>>>> The trend is towards unifying the sources/sinks to common interfaces >>>>> that can be used for both BATCH and STREAMING execution but, again, I >>>>> think exposing DataStream here would be a step back in the wrong >>>> direction. >>>>> >>>>> I think the solution to the existing user requirement of using >>>>> DataStream sources and sinks with the Table API should be better >>>>> interoperability between the two APIs, which is being tackled right now >>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that >>>>> we're trying to solve here, maybe we should think about FLIP-136 some >>>> more. >>>>> >>>>> What do you think? >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> [1] >>>>> >>>>> >>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>> [2] >>>>> >>>>> >>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API >>>>> >>>>> >>>> >>> >>> >>> -- >>> Best, Jingsong Lee >>> >> >> >> -- >> Best regards! >> Rui Li > |
Hi Aljoscha,
I want to separate `Customized parallelism` and `Parallelism inference`. ### Customized parallelism First, I want to explain the current DataStream parallelism setting: `env.fromSource(...).setParallelism(...)`. This is how users explicitly specify parallelism, and it is the only way to set parallelism. The underlying Source (Eg.: SourceFunction) is completely independent of specific parallelism. The peripheral DataStream is responsible for setting parallelism. The table layer also needs to provide peer-to-peer capability. ### Parallelism inference Some sources have the ability to infer parallelism, like Kafka, parallelism can be inferred from the partition number. I think you are right, we should provide this to the underlying Source. This capability must be related to the underlying Source (Eg.: SourceFunction), so this capability must introduce a new interface for the underlying Source. The Table layer just tell underlying Source that user want to open parallelism inference: new MyRealSource(path, and, whatnot, parallelismInfer = true) What do you think? Best, Jingsong On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <[hidden email]> wrote: > Hi, > > I'll only respond regarding the parallelism for now because I need to > think some more about DataStream. > > What I'm saying is that exposing a parallelism only for Table Connectors > is not the right thing. If we want to allow sources to tell the > system/framework what would be a good parallelism it would be at the > underlying level. > > I'll explain with the SourceFunction. A Table API Source connector is > basically a factory that will give you a SourceFunction that corresponds > to whatever the user configured via properties and other means. If the > Table Connector somehow happens to know what would be a good parallelism > for the source it could "tell" the source when creating it, i.e. > > new MyRealSource(path, and, whatnot, parallelismHint) > > Then the source could either work with that information it got, by > shutting down (at runtime) some of its parallel instances. Or we could > extend the Source (SourceFunction) API to expose a "parallelism hint" to > the system. > > The basic thing is that Table Connectors are not the real connectors, > they just delegate to underlying real connectors. So those underlying > connectors are where we need to change things. Otherwise we would just > have special-case solutions for the Table API. > > Best, > Aljoscha > > On 25.09.20 14:30, admin wrote: > > Hi everyone, > > Thanks for the proposal. > > > > In our company,we meet the same situation as @liu shouwei. > > We developed some features base on flink.Such as parallelism of sql > source/sink connector, and kafka delay consumer which is adding a flatmap > and a keyby transformation after the source Datastream. > > What make us embarrassing is that when we migrate this features to Flink > 1.11,we found that the DataSteam is missing,So we modify the blink’s code > to support parallelism.But kafka delay comsumer is unsolved until now. > > > > From user’s perspective,it necessary to manipulate DataStream or have > the interoperability between Table API and DataStream. > > > > Best > > > > > > > >> 2020年9月25日 下午4:18,Rui Li <[hidden email]> 写道: > >> > >> Hi Jingsong, > >> > >> Thanks for driving this effort. I have two minor comments. > >> > >> > >> 1. IMHO, parallelism is a concept that applies to all > ScanTableSource. > >> So instead of defining a new interface, is it more natural to > incorporate > >> parallel inference to existing interfaces, e.g. ScanTableSource > >> or ScanRuntimeProvider? > >> 2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. > From > >> a user's perspective, parallelism is either set by > `scan.parallelism`, or > >> automatically decided by Flink. If a user doesn't want the connector > to > >> infer parallelism, he/she can simply set `scan.parallelism`, no? > >> > >> > >> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <[hidden email]> > wrote: > >> > >>> Hi Aljoscha, > >>> > >>> Thank you for your feedback, > >>> > >>> ## Connector parallelism > >>> > >>> Requirements: > >>> Set parallelism by user specified or inferred by connector. > >>> > >>> How to configure parallelism in DataStream: > >>> In the DataStream world, the only way to configure parallelism is > >>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to > have > >>> access to DataStream when using a connector, not just the > `SourceFunction` > >>> / `Source` interface. > >>> Is parallelism related to connectors? I think yes, there are many > >>> connectors that can support obtaining parallelism related information > from > >>> them, and users do exactly that. This means parallelism inference (From > >>> connectors). > >>> The key is that `DataStream` is an open programming API, and users can > >>> freely program to set parallelism. > >>> > >>> How to configure parallelism in Table/SQL: > >>> But Table/SQL is not an open programming API, every feature needs a > >>> corresponding mechanism, because the user is no longer able to > program. Our > >>> current connector interface: SourceFunctionProvider, > SinkFunctionProvider, > >>> through these interfaces, there is no ability to generate connector > related > >>> parallelism. > >>> Back to our original intention: to avoid users directly manipulating > >>> `DataStream`. Since we want to avoid it, we need to provide > corresponding > >>> features. > >>> > >>> And parallelism is the runtime information of connectors, It fits the > name > >>> of `ScanRuntimeProvider`. > >>> > >>>> If we wanted to add a "get parallelism" it would be in those > underlying > >>> connectors but I'm also skeptical about adding such a method there > because > >>> it is a static assignment and would preclude clever optimizations > about the > >>> parallelism of a connector at runtime. > >>> > >>> I think that when a job is submitted, it is in compile time. It should > only > >>> provide static parallelism. > >>> > >>> ## DataStream in table connector > >>> > >>> As I said before, if we want to completely cancel DataStream in the > table > >>> connector, we need to provide corresponding functions in > >>> `xxRuntimeProvider`. > >>> Otherwise, we and users may not be able to migrate the old connectors. > >>> Including Hive/FileSystem connectors and the user cases I mentioned > above. > >>> CC: @liu shouwei > >>> > >>> We really need to consider these cases. > >>> If there is no alternative in a short period of time, for a long > >>> time, users need to continue to use the old table connector API, which > has > >>> been deprecated. > >>> > >>> Why not use StreamTableEnvironment fromDataStream/toDataStream? > >>> - These tables are just temporary tables. Can not be integrated/stored > into > >>> Catalog. > >>> - Creating table DDL can not work... > >>> - We need to lose the kinds of useful features of Table/SQL on the > >>> connector. For example, projection pushdown, filter pushdown, > partitions > >>> and etc... > >>> > >>> But I believe you are right in the long run. The source and sink APIs > >>> should be powerful enough to cover all reasonable cases. > >>> Maybe we can just introduce them in a minimal way. For example, we only > >>> introduce `DataStreamSinkProvider` in planner as an internal API. > >>> > >>> Your points are very meaningful, hope to get your reply. > >>> > >>> Best, > >>> Jingsong > >>> > >>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <[hidden email]> > >>> wrote: > >>> > >>>> Hi,Aljoscha, I would like to share a use case to second setting > >>> parallelism > >>>> of table sink(or limiting parallelism range of table sink): When > writing > >>>> data to databases, there is limitation for number of jdbc connections > and > >>>> query TPS. we would get errors of too many connections or high load > for > >>>> db and poor performance because of too many small requests if the > >>> optimizer > >>>> didn't know such information, and set a large parallelism for sink > when > >>>> matching the parallelism of its input. > >>>> > >>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email]> > >>>> wrote: > >>>> > >>>>> Thanks for the proposal! I think the use cases that we are trying to > >>>>> solve are indeed valid. However, I think we might have to take a step > >>>>> back to look at what we're trying to solve and how we can solve it. > >>>>> > >>>>> The FLIP seems to have two broader topics: 1) add "get parallelism" > to > >>>>> sinks/sources 2) let users write DataStream topologies for > >>>>> sinks/sources. I'll treat them separately below. > >>>>> > >>>>> I think we should not add "get parallelism" to the Table Sink API > >>>>> because I think it's the wrong level of abstraction. The Table API > >>>>> connectors are (or should be) more or less thin wrappers around > >>>>> "physical" connectors. By "physical" I mean the underlying (mostly > >>>>> DataStream API) connectors. For example, with the Kafka Connector the > >>>>> Table API connector just does the configuration parsing and > determines > >>> a > >>>>> good (de)serialization format and then creates the underlying > >>>>> FlinkKafkaConsumer/FlinkKafkaProducer. > >>>>> > >>>>> If we wanted to add a "get parallelism" it would be in those > underlying > >>>>> connectors but I'm also skeptical about adding such a method there > >>>>> because it is a static assignment and would preclude clever > >>>>> optimizations about the parallelism of a connector at runtime. But > >>> maybe > >>>>> that's thinking too much about future work so I'm open to discussion > >>>> there. > >>>>> > >>>>> Regarding the second point of letting Table connector developers use > >>>>> DataStream: I think we should not do it. One of the purposes of > FLIP-95 > >>>>> [1] was to decouple the Table API from the DataStream API for the > basic > >>>>> interfaces. Coupling the two too closely at that basic level will > make > >>>>> our live harder in the future when we want to evolve those APIs or > when > >>>>> we want the system to be better at choosing how to execute sources > and > >>>>> sinks. An example of this is actually the past of the Table API. > Before > >>>>> FLIP-95 we had connectors that dealt directly with DataSet and > >>>>> DataStream, meaning that if users wanted their Table Sink to work in > >>>>> both BATCH and STREAMING mode they had to provide two > implementations. > >>>>> The trend is towards unifying the sources/sinks to common interfaces > >>>>> that can be used for both BATCH and STREAMING execution but, again, I > >>>>> think exposing DataStream here would be a step back in the wrong > >>>> direction. > >>>>> > >>>>> I think the solution to the existing user requirement of using > >>>>> DataStream sources and sinks with the Table API should be better > >>>>> interoperability between the two APIs, which is being tackled right > now > >>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that > >>>>> we're trying to solve here, maybe we should think about FLIP-136 some > >>>> more. > >>>>> > >>>>> What do you think? > >>>>> > >>>>> Best, > >>>>> Aljoscha > >>>>> > >>>>> [1] > >>>>> > >>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > >>>>> [2] > >>>>> > >>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > >>>>> > >>>>> > >>>> > >>> > >>> > >>> -- > >>> Best, Jingsong Lee > >>> > >> > >> > >> -- > >> Best regards! > >> Rui Li > > > > -- Best, Jingsong Lee |
Hi Jingsong,
I'm sorry, I didn't want to block you for so long on this. I thought about it again. I think it's fine to add a DataStream Provider if this really unblocks users from migrating to newer Flink versions. I'm guessing you will add that to the table bridge module? Regarding the parallelism: I see your point of letting users set that explicitly. I'm still skeptical about it but I also think it wasn't such a good idea to let users specify the parallelism of individual operations in the DataStream API because it again takes freedom away from the framework. So if it's really sth that users need we should go ahead. Best, Aljoscha On 09.10.20 13:57, Jingsong Li wrote: > Hi Aljoscha, > > I want to separate `Customized parallelism` and `Parallelism inference`. > > ### Customized parallelism > > First, I want to explain the current DataStream parallelism setting: > `env.fromSource(...).setParallelism(...)`. > This is how users explicitly specify parallelism, and it is the only way to > set parallelism. > > The underlying Source (Eg.: SourceFunction) is completely independent of > specific parallelism. The peripheral DataStream is responsible for setting > parallelism. > The table layer also needs to provide peer-to-peer capability. > > ### Parallelism inference > > Some sources have the ability to infer parallelism, like Kafka, parallelism > can be inferred from the partition number. > > I think you are right, we should provide this to the underlying Source. > This capability must be related to the underlying Source (Eg.: > SourceFunction), so this capability must introduce a new interface for the > underlying Source. > > The Table layer just tell underlying Source that user want to open > parallelism inference: > > new MyRealSource(path, and, whatnot, parallelismInfer = true) > > What do you think? > > Best, > Jingsong > > On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <[hidden email]> > wrote: > >> Hi, >> >> I'll only respond regarding the parallelism for now because I need to >> think some more about DataStream. >> >> What I'm saying is that exposing a parallelism only for Table Connectors >> is not the right thing. If we want to allow sources to tell the >> system/framework what would be a good parallelism it would be at the >> underlying level. >> >> I'll explain with the SourceFunction. A Table API Source connector is >> basically a factory that will give you a SourceFunction that corresponds >> to whatever the user configured via properties and other means. If the >> Table Connector somehow happens to know what would be a good parallelism >> for the source it could "tell" the source when creating it, i.e. >> >> new MyRealSource(path, and, whatnot, parallelismHint) >> >> Then the source could either work with that information it got, by >> shutting down (at runtime) some of its parallel instances. Or we could >> extend the Source (SourceFunction) API to expose a "parallelism hint" to >> the system. >> >> The basic thing is that Table Connectors are not the real connectors, >> they just delegate to underlying real connectors. So those underlying >> connectors are where we need to change things. Otherwise we would just >> have special-case solutions for the Table API. >> >> Best, >> Aljoscha >> >> On 25.09.20 14:30, admin wrote: >>> Hi everyone, >>> Thanks for the proposal. >>> >>> In our company,we meet the same situation as @liu shouwei. >>> We developed some features base on flink.Such as parallelism of sql >> source/sink connector, and kafka delay consumer which is adding a flatmap >> and a keyby transformation after the source Datastream. >>> What make us embarrassing is that when we migrate this features to Flink >> 1.11,we found that the DataSteam is missing,So we modify the blink’s code >> to support parallelism.But kafka delay comsumer is unsolved until now. >>> >>> From user’s perspective,it necessary to manipulate DataStream or have >> the interoperability between Table API and DataStream. >>> >>> Best >>> >>> >>> >>>> 2020年9月25日 下午4:18,Rui Li <[hidden email]> 写道: >>>> >>>> Hi Jingsong, >>>> >>>> Thanks for driving this effort. I have two minor comments. >>>> >>>> >>>> 1. IMHO, parallelism is a concept that applies to all >> ScanTableSource. >>>> So instead of defining a new interface, is it more natural to >> incorporate >>>> parallel inference to existing interfaces, e.g. ScanTableSource >>>> or ScanRuntimeProvider? >>>> 2. `scan.infer-parallelism.enabled` doesn't seem very useful to me. >> From >>>> a user's perspective, parallelism is either set by >> `scan.parallelism`, or >>>> automatically decided by Flink. If a user doesn't want the connector >> to >>>> infer parallelism, he/she can simply set `scan.parallelism`, no? >>>> >>>> >>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <[hidden email]> >> wrote: >>>> >>>>> Hi Aljoscha, >>>>> >>>>> Thank you for your feedback, >>>>> >>>>> ## Connector parallelism >>>>> >>>>> Requirements: >>>>> Set parallelism by user specified or inferred by connector. >>>>> >>>>> How to configure parallelism in DataStream: >>>>> In the DataStream world, the only way to configure parallelism is >>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to >> have >>>>> access to DataStream when using a connector, not just the >> `SourceFunction` >>>>> / `Source` interface. >>>>> Is parallelism related to connectors? I think yes, there are many >>>>> connectors that can support obtaining parallelism related information >> from >>>>> them, and users do exactly that. This means parallelism inference (From >>>>> connectors). >>>>> The key is that `DataStream` is an open programming API, and users can >>>>> freely program to set parallelism. >>>>> >>>>> How to configure parallelism in Table/SQL: >>>>> But Table/SQL is not an open programming API, every feature needs a >>>>> corresponding mechanism, because the user is no longer able to >> program. Our >>>>> current connector interface: SourceFunctionProvider, >> SinkFunctionProvider, >>>>> through these interfaces, there is no ability to generate connector >> related >>>>> parallelism. >>>>> Back to our original intention: to avoid users directly manipulating >>>>> `DataStream`. Since we want to avoid it, we need to provide >> corresponding >>>>> features. >>>>> >>>>> And parallelism is the runtime information of connectors, It fits the >> name >>>>> of `ScanRuntimeProvider`. >>>>> >>>>>> If we wanted to add a "get parallelism" it would be in those >> underlying >>>>> connectors but I'm also skeptical about adding such a method there >> because >>>>> it is a static assignment and would preclude clever optimizations >> about the >>>>> parallelism of a connector at runtime. >>>>> >>>>> I think that when a job is submitted, it is in compile time. It should >> only >>>>> provide static parallelism. >>>>> >>>>> ## DataStream in table connector >>>>> >>>>> As I said before, if we want to completely cancel DataStream in the >> table >>>>> connector, we need to provide corresponding functions in >>>>> `xxRuntimeProvider`. >>>>> Otherwise, we and users may not be able to migrate the old connectors. >>>>> Including Hive/FileSystem connectors and the user cases I mentioned >> above. >>>>> CC: @liu shouwei >>>>> >>>>> We really need to consider these cases. >>>>> If there is no alternative in a short period of time, for a long >>>>> time, users need to continue to use the old table connector API, which >> has >>>>> been deprecated. >>>>> >>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream? >>>>> - These tables are just temporary tables. Can not be integrated/stored >> into >>>>> Catalog. >>>>> - Creating table DDL can not work... >>>>> - We need to lose the kinds of useful features of Table/SQL on the >>>>> connector. For example, projection pushdown, filter pushdown, >> partitions >>>>> and etc... >>>>> >>>>> But I believe you are right in the long run. The source and sink APIs >>>>> should be powerful enough to cover all reasonable cases. >>>>> Maybe we can just introduce them in a minimal way. For example, we only >>>>> introduce `DataStreamSinkProvider` in planner as an internal API. >>>>> >>>>> Your points are very meaningful, hope to get your reply. >>>>> >>>>> Best, >>>>> Jingsong >>>>> >>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl <[hidden email]> >>>>> wrote: >>>>> >>>>>> Hi,Aljoscha, I would like to share a use case to second setting >>>>> parallelism >>>>>> of table sink(or limiting parallelism range of table sink): When >> writing >>>>>> data to databases, there is limitation for number of jdbc connections >> and >>>>>> query TPS. we would get errors of too many connections or high load >> for >>>>>> db and poor performance because of too many small requests if the >>>>> optimizer >>>>>> didn't know such information, and set a large parallelism for sink >> when >>>>>> matching the parallelism of its input. >>>>>> >>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email]> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the proposal! I think the use cases that we are trying to >>>>>>> solve are indeed valid. However, I think we might have to take a step >>>>>>> back to look at what we're trying to solve and how we can solve it. >>>>>>> >>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism" >> to >>>>>>> sinks/sources 2) let users write DataStream topologies for >>>>>>> sinks/sources. I'll treat them separately below. >>>>>>> >>>>>>> I think we should not add "get parallelism" to the Table Sink API >>>>>>> because I think it's the wrong level of abstraction. The Table API >>>>>>> connectors are (or should be) more or less thin wrappers around >>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly >>>>>>> DataStream API) connectors. For example, with the Kafka Connector the >>>>>>> Table API connector just does the configuration parsing and >> determines >>>>> a >>>>>>> good (de)serialization format and then creates the underlying >>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer. >>>>>>> >>>>>>> If we wanted to add a "get parallelism" it would be in those >> underlying >>>>>>> connectors but I'm also skeptical about adding such a method there >>>>>>> because it is a static assignment and would preclude clever >>>>>>> optimizations about the parallelism of a connector at runtime. But >>>>> maybe >>>>>>> that's thinking too much about future work so I'm open to discussion >>>>>> there. >>>>>>> >>>>>>> Regarding the second point of letting Table connector developers use >>>>>>> DataStream: I think we should not do it. One of the purposes of >> FLIP-95 >>>>>>> [1] was to decouple the Table API from the DataStream API for the >> basic >>>>>>> interfaces. Coupling the two too closely at that basic level will >> make >>>>>>> our live harder in the future when we want to evolve those APIs or >> when >>>>>>> we want the system to be better at choosing how to execute sources >> and >>>>>>> sinks. An example of this is actually the past of the Table API. >> Before >>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and >>>>>>> DataStream, meaning that if users wanted their Table Sink to work in >>>>>>> both BATCH and STREAMING mode they had to provide two >> implementations. >>>>>>> The trend is towards unifying the sources/sinks to common interfaces >>>>>>> that can be used for both BATCH and STREAMING execution but, again, I >>>>>>> think exposing DataStream here would be a step back in the wrong >>>>>> direction. >>>>>>> >>>>>>> I think the solution to the existing user requirement of using >>>>>>> DataStream sources and sinks with the Table API should be better >>>>>>> interoperability between the two APIs, which is being tackled right >> now >>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that >>>>>>> we're trying to solve here, maybe we should think about FLIP-136 some >>>>>> more. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> [1] >>>>>>> >>>>>>> >>>>>> >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>> [2] >>>>>>> >>>>>>> >>>>>> >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>>> >>>> -- >>>> Best regards! >>>> Rui Li >>> >> >> > |
Hi Aljoscha,
Thanks for your feedback. Yes, we should add DataStream Providers to the table bridge module. I think your concerns are right, including the relationship between DataStream and table. My understanding is that the parallelism specified by the user is only the initialization parallelism. After that, the framework still has certain freedom, which is just like the user's personalized intervention on the optimizer. At present, the final state of the source parallelism setting is not clear (DataStream is not ready, and 1.12 is imminent). So I consider shelving the parallelism of source. Let's focus on the parallelism setting of sink and rest of this FLIP. (users specify the parallelism on sink) And we can confirm that users can migrate their connectors to new table source/sink interfaces in Flink 1.12. What do you think? Best, Jingsong On Tue, Oct 13, 2020 at 5:01 PM Aljoscha Krettek <[hidden email]> wrote: > Hi Jingsong, > > I'm sorry, I didn't want to block you for so long on this. I thought > about it again. > > I think it's fine to add a DataStream Provider if this really unblocks > users from migrating to newer Flink versions. I'm guessing you will add > that to the table bridge module? > > Regarding the parallelism: I see your point of letting users set that > explicitly. I'm still skeptical about it but I also think it wasn't such > a good idea to let users specify the parallelism of individual > operations in the DataStream API because it again takes freedom away > from the framework. So if it's really sth that users need we should go > ahead. > > Best, > Aljoscha > > On 09.10.20 13:57, Jingsong Li wrote: > > Hi Aljoscha, > > > > I want to separate `Customized parallelism` and `Parallelism inference`. > > > > ### Customized parallelism > > > > First, I want to explain the current DataStream parallelism setting: > > `env.fromSource(...).setParallelism(...)`. > > This is how users explicitly specify parallelism, and it is the only way > to > > set parallelism. > > > > The underlying Source (Eg.: SourceFunction) is completely independent of > > specific parallelism. The peripheral DataStream is responsible for > setting > > parallelism. > > The table layer also needs to provide peer-to-peer capability. > > > > ### Parallelism inference > > > > Some sources have the ability to infer parallelism, like Kafka, > parallelism > > can be inferred from the partition number. > > > > I think you are right, we should provide this to the underlying Source. > > This capability must be related to the underlying Source (Eg.: > > SourceFunction), so this capability must introduce a new interface for > the > > underlying Source. > > > > The Table layer just tell underlying Source that user want to open > > parallelism inference: > > > > new MyRealSource(path, and, whatnot, parallelismInfer = true) > > > > What do you think? > > > > Best, > > Jingsong > > > > On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <[hidden email]> > > wrote: > > > >> Hi, > >> > >> I'll only respond regarding the parallelism for now because I need to > >> think some more about DataStream. > >> > >> What I'm saying is that exposing a parallelism only for Table Connectors > >> is not the right thing. If we want to allow sources to tell the > >> system/framework what would be a good parallelism it would be at the > >> underlying level. > >> > >> I'll explain with the SourceFunction. A Table API Source connector is > >> basically a factory that will give you a SourceFunction that corresponds > >> to whatever the user configured via properties and other means. If the > >> Table Connector somehow happens to know what would be a good parallelism > >> for the source it could "tell" the source when creating it, i.e. > >> > >> new MyRealSource(path, and, whatnot, parallelismHint) > >> > >> Then the source could either work with that information it got, by > >> shutting down (at runtime) some of its parallel instances. Or we could > >> extend the Source (SourceFunction) API to expose a "parallelism hint" to > >> the system. > >> > >> The basic thing is that Table Connectors are not the real connectors, > >> they just delegate to underlying real connectors. So those underlying > >> connectors are where we need to change things. Otherwise we would just > >> have special-case solutions for the Table API. > >> > >> Best, > >> Aljoscha > >> > >> On 25.09.20 14:30, admin wrote: > >>> Hi everyone, > >>> Thanks for the proposal. > >>> > >>> In our company,we meet the same situation as @liu shouwei. > >>> We developed some features base on flink.Such as parallelism of sql > >> source/sink connector, and kafka delay consumer which is adding a > flatmap > >> and a keyby transformation after the source Datastream. > >>> What make us embarrassing is that when we migrate this features to > Flink > >> 1.11,we found that the DataSteam is missing,So we modify the blink’s > code > >> to support parallelism.But kafka delay comsumer is unsolved until now. > >>> > >>> From user’s perspective,it necessary to manipulate DataStream or have > >> the interoperability between Table API and DataStream. > >>> > >>> Best > >>> > >>> > >>> > >>>> 2020年9月25日 下午4:18,Rui Li <[hidden email]> 写道: > >>>> > >>>> Hi Jingsong, > >>>> > >>>> Thanks for driving this effort. I have two minor comments. > >>>> > >>>> > >>>> 1. IMHO, parallelism is a concept that applies to all > >> ScanTableSource. > >>>> So instead of defining a new interface, is it more natural to > >> incorporate > >>>> parallel inference to existing interfaces, e.g. ScanTableSource > >>>> or ScanRuntimeProvider? > >>>> 2. `scan.infer-parallelism.enabled` doesn't seem very useful to > me. > >> From > >>>> a user's perspective, parallelism is either set by > >> `scan.parallelism`, or > >>>> automatically decided by Flink. If a user doesn't want the > connector > >> to > >>>> infer parallelism, he/she can simply set `scan.parallelism`, no? > >>>> > >>>> > >>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <[hidden email]> > >> wrote: > >>>> > >>>>> Hi Aljoscha, > >>>>> > >>>>> Thank you for your feedback, > >>>>> > >>>>> ## Connector parallelism > >>>>> > >>>>> Requirements: > >>>>> Set parallelism by user specified or inferred by connector. > >>>>> > >>>>> How to configure parallelism in DataStream: > >>>>> In the DataStream world, the only way to configure parallelism is > >>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to > >> have > >>>>> access to DataStream when using a connector, not just the > >> `SourceFunction` > >>>>> / `Source` interface. > >>>>> Is parallelism related to connectors? I think yes, there are many > >>>>> connectors that can support obtaining parallelism related information > >> from > >>>>> them, and users do exactly that. This means parallelism inference > (From > >>>>> connectors). > >>>>> The key is that `DataStream` is an open programming API, and users > can > >>>>> freely program to set parallelism. > >>>>> > >>>>> How to configure parallelism in Table/SQL: > >>>>> But Table/SQL is not an open programming API, every feature needs a > >>>>> corresponding mechanism, because the user is no longer able to > >> program. Our > >>>>> current connector interface: SourceFunctionProvider, > >> SinkFunctionProvider, > >>>>> through these interfaces, there is no ability to generate connector > >> related > >>>>> parallelism. > >>>>> Back to our original intention: to avoid users directly manipulating > >>>>> `DataStream`. Since we want to avoid it, we need to provide > >> corresponding > >>>>> features. > >>>>> > >>>>> And parallelism is the runtime information of connectors, It fits the > >> name > >>>>> of `ScanRuntimeProvider`. > >>>>> > >>>>>> If we wanted to add a "get parallelism" it would be in those > >> underlying > >>>>> connectors but I'm also skeptical about adding such a method there > >> because > >>>>> it is a static assignment and would preclude clever optimizations > >> about the > >>>>> parallelism of a connector at runtime. > >>>>> > >>>>> I think that when a job is submitted, it is in compile time. It > should > >> only > >>>>> provide static parallelism. > >>>>> > >>>>> ## DataStream in table connector > >>>>> > >>>>> As I said before, if we want to completely cancel DataStream in the > >> table > >>>>> connector, we need to provide corresponding functions in > >>>>> `xxRuntimeProvider`. > >>>>> Otherwise, we and users may not be able to migrate the old > connectors. > >>>>> Including Hive/FileSystem connectors and the user cases I mentioned > >> above. > >>>>> CC: @liu shouwei > >>>>> > >>>>> We really need to consider these cases. > >>>>> If there is no alternative in a short period of time, for a long > >>>>> time, users need to continue to use the old table connector API, > which > >> has > >>>>> been deprecated. > >>>>> > >>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream? > >>>>> - These tables are just temporary tables. Can not be > integrated/stored > >> into > >>>>> Catalog. > >>>>> - Creating table DDL can not work... > >>>>> - We need to lose the kinds of useful features of Table/SQL on the > >>>>> connector. For example, projection pushdown, filter pushdown, > >> partitions > >>>>> and etc... > >>>>> > >>>>> But I believe you are right in the long run. The source and sink APIs > >>>>> should be powerful enough to cover all reasonable cases. > >>>>> Maybe we can just introduce them in a minimal way. For example, we > only > >>>>> introduce `DataStreamSinkProvider` in planner as an internal API. > >>>>> > >>>>> Your points are very meaningful, hope to get your reply. > >>>>> > >>>>> Best, > >>>>> Jingsong > >>>>> > >>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl < > [hidden email]> > >>>>> wrote: > >>>>> > >>>>>> Hi,Aljoscha, I would like to share a use case to second setting > >>>>> parallelism > >>>>>> of table sink(or limiting parallelism range of table sink): When > >> writing > >>>>>> data to databases, there is limitation for number of jdbc > connections > >> and > >>>>>> query TPS. we would get errors of too many connections or high load > >> for > >>>>>> db and poor performance because of too many small requests if the > >>>>> optimizer > >>>>>> didn't know such information, and set a large parallelism for sink > >> when > >>>>>> matching the parallelism of its input. > >>>>>> > >>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email] > > > >>>>>> wrote: > >>>>>> > >>>>>>> Thanks for the proposal! I think the use cases that we are trying > to > >>>>>>> solve are indeed valid. However, I think we might have to take a > step > >>>>>>> back to look at what we're trying to solve and how we can solve it. > >>>>>>> > >>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism" > >> to > >>>>>>> sinks/sources 2) let users write DataStream topologies for > >>>>>>> sinks/sources. I'll treat them separately below. > >>>>>>> > >>>>>>> I think we should not add "get parallelism" to the Table Sink API > >>>>>>> because I think it's the wrong level of abstraction. The Table API > >>>>>>> connectors are (or should be) more or less thin wrappers around > >>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly > >>>>>>> DataStream API) connectors. For example, with the Kafka Connector > the > >>>>>>> Table API connector just does the configuration parsing and > >> determines > >>>>> a > >>>>>>> good (de)serialization format and then creates the underlying > >>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer. > >>>>>>> > >>>>>>> If we wanted to add a "get parallelism" it would be in those > >> underlying > >>>>>>> connectors but I'm also skeptical about adding such a method there > >>>>>>> because it is a static assignment and would preclude clever > >>>>>>> optimizations about the parallelism of a connector at runtime. But > >>>>> maybe > >>>>>>> that's thinking too much about future work so I'm open to > discussion > >>>>>> there. > >>>>>>> > >>>>>>> Regarding the second point of letting Table connector developers > use > >>>>>>> DataStream: I think we should not do it. One of the purposes of > >> FLIP-95 > >>>>>>> [1] was to decouple the Table API from the DataStream API for the > >> basic > >>>>>>> interfaces. Coupling the two too closely at that basic level will > >> make > >>>>>>> our live harder in the future when we want to evolve those APIs or > >> when > >>>>>>> we want the system to be better at choosing how to execute sources > >> and > >>>>>>> sinks. An example of this is actually the past of the Table API. > >> Before > >>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and > >>>>>>> DataStream, meaning that if users wanted their Table Sink to work > in > >>>>>>> both BATCH and STREAMING mode they had to provide two > >> implementations. > >>>>>>> The trend is towards unifying the sources/sinks to common > interfaces > >>>>>>> that can be used for both BATCH and STREAMING execution but, > again, I > >>>>>>> think exposing DataStream here would be a step back in the wrong > >>>>>> direction. > >>>>>>> > >>>>>>> I think the solution to the existing user requirement of using > >>>>>>> DataStream sources and sinks with the Table API should be better > >>>>>>> interoperability between the two APIs, which is being tackled right > >> now > >>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that > >>>>>>> we're trying to solve here, maybe we should think about FLIP-136 > some > >>>>>> more. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Best, > >>>>>>> Aljoscha > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > >>>>>>> [2] > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Best, Jingsong Lee > >>>>> > >>>> > >>>> > >>>> -- > >>>> Best regards! > >>>> Rui Li > >>> > >> > >> > > > > -- Best, Jingsong Lee |
Agreed!
Aljoscha On 14.10.20 06:38, Jingsong Li wrote: > Hi Aljoscha, > > Thanks for your feedback. > > Yes, we should add DataStream Providers to the table bridge module. > > I think your concerns are right, including the relationship between > DataStream and table. > My understanding is that the parallelism specified by the user is only the > initialization parallelism. After that, the framework still has certain > freedom, which is just like the user's personalized intervention on the > optimizer. > > At present, the final state of the source parallelism setting is not clear > (DataStream is not ready, and 1.12 is imminent). So I consider shelving the > parallelism of source. Let's focus on the parallelism setting of sink and > rest of this FLIP. (users specify the parallelism on sink) > And we can confirm that users can migrate their connectors to new table > source/sink interfaces in Flink 1.12. > What do you think? > > Best, > Jingsong > > On Tue, Oct 13, 2020 at 5:01 PM Aljoscha Krettek <[hidden email]> > wrote: > >> Hi Jingsong, >> >> I'm sorry, I didn't want to block you for so long on this. I thought >> about it again. >> >> I think it's fine to add a DataStream Provider if this really unblocks >> users from migrating to newer Flink versions. I'm guessing you will add >> that to the table bridge module? >> >> Regarding the parallelism: I see your point of letting users set that >> explicitly. I'm still skeptical about it but I also think it wasn't such >> a good idea to let users specify the parallelism of individual >> operations in the DataStream API because it again takes freedom away >> from the framework. So if it's really sth that users need we should go >> ahead. >> >> Best, >> Aljoscha >> >> On 09.10.20 13:57, Jingsong Li wrote: >>> Hi Aljoscha, >>> >>> I want to separate `Customized parallelism` and `Parallelism inference`. >>> >>> ### Customized parallelism >>> >>> First, I want to explain the current DataStream parallelism setting: >>> `env.fromSource(...).setParallelism(...)`. >>> This is how users explicitly specify parallelism, and it is the only way >> to >>> set parallelism. >>> >>> The underlying Source (Eg.: SourceFunction) is completely independent of >>> specific parallelism. The peripheral DataStream is responsible for >> setting >>> parallelism. >>> The table layer also needs to provide peer-to-peer capability. >>> >>> ### Parallelism inference >>> >>> Some sources have the ability to infer parallelism, like Kafka, >> parallelism >>> can be inferred from the partition number. >>> >>> I think you are right, we should provide this to the underlying Source. >>> This capability must be related to the underlying Source (Eg.: >>> SourceFunction), so this capability must introduce a new interface for >> the >>> underlying Source. >>> >>> The Table layer just tell underlying Source that user want to open >>> parallelism inference: >>> >>> new MyRealSource(path, and, whatnot, parallelismInfer = true) >>> >>> What do you think? >>> >>> Best, >>> Jingsong >>> >>> On Tue, Sep 29, 2020 at 8:48 PM Aljoscha Krettek <[hidden email]> >>> wrote: >>> >>>> Hi, >>>> >>>> I'll only respond regarding the parallelism for now because I need to >>>> think some more about DataStream. >>>> >>>> What I'm saying is that exposing a parallelism only for Table Connectors >>>> is not the right thing. If we want to allow sources to tell the >>>> system/framework what would be a good parallelism it would be at the >>>> underlying level. >>>> >>>> I'll explain with the SourceFunction. A Table API Source connector is >>>> basically a factory that will give you a SourceFunction that corresponds >>>> to whatever the user configured via properties and other means. If the >>>> Table Connector somehow happens to know what would be a good parallelism >>>> for the source it could "tell" the source when creating it, i.e. >>>> >>>> new MyRealSource(path, and, whatnot, parallelismHint) >>>> >>>> Then the source could either work with that information it got, by >>>> shutting down (at runtime) some of its parallel instances. Or we could >>>> extend the Source (SourceFunction) API to expose a "parallelism hint" to >>>> the system. >>>> >>>> The basic thing is that Table Connectors are not the real connectors, >>>> they just delegate to underlying real connectors. So those underlying >>>> connectors are where we need to change things. Otherwise we would just >>>> have special-case solutions for the Table API. >>>> >>>> Best, >>>> Aljoscha >>>> >>>> On 25.09.20 14:30, admin wrote: >>>>> Hi everyone, >>>>> Thanks for the proposal. >>>>> >>>>> In our company,we meet the same situation as @liu shouwei. >>>>> We developed some features base on flink.Such as parallelism of sql >>>> source/sink connector, and kafka delay consumer which is adding a >> flatmap >>>> and a keyby transformation after the source Datastream. >>>>> What make us embarrassing is that when we migrate this features to >> Flink >>>> 1.11,we found that the DataSteam is missing,So we modify the blink’s >> code >>>> to support parallelism.But kafka delay comsumer is unsolved until now. >>>>> >>>>> From user’s perspective,it necessary to manipulate DataStream or have >>>> the interoperability between Table API and DataStream. >>>>> >>>>> Best >>>>> >>>>> >>>>> >>>>>> 2020年9月25日 下午4:18,Rui Li <[hidden email]> 写道: >>>>>> >>>>>> Hi Jingsong, >>>>>> >>>>>> Thanks for driving this effort. I have two minor comments. >>>>>> >>>>>> >>>>>> 1. IMHO, parallelism is a concept that applies to all >>>> ScanTableSource. >>>>>> So instead of defining a new interface, is it more natural to >>>> incorporate >>>>>> parallel inference to existing interfaces, e.g. ScanTableSource >>>>>> or ScanRuntimeProvider? >>>>>> 2. `scan.infer-parallelism.enabled` doesn't seem very useful to >> me. >>>> From >>>>>> a user's perspective, parallelism is either set by >>>> `scan.parallelism`, or >>>>>> automatically decided by Flink. If a user doesn't want the >> connector >>>> to >>>>>> infer parallelism, he/she can simply set `scan.parallelism`, no? >>>>>> >>>>>> >>>>>> On Fri, Sep 25, 2020 at 3:33 PM Jingsong Li <[hidden email]> >>>> wrote: >>>>>> >>>>>>> Hi Aljoscha, >>>>>>> >>>>>>> Thank you for your feedback, >>>>>>> >>>>>>> ## Connector parallelism >>>>>>> >>>>>>> Requirements: >>>>>>> Set parallelism by user specified or inferred by connector. >>>>>>> >>>>>>> How to configure parallelism in DataStream: >>>>>>> In the DataStream world, the only way to configure parallelism is >>>>>>> `SingleOutputStreamOperator.setParallelism`. Actually, users need to >>>> have >>>>>>> access to DataStream when using a connector, not just the >>>> `SourceFunction` >>>>>>> / `Source` interface. >>>>>>> Is parallelism related to connectors? I think yes, there are many >>>>>>> connectors that can support obtaining parallelism related information >>>> from >>>>>>> them, and users do exactly that. This means parallelism inference >> (From >>>>>>> connectors). >>>>>>> The key is that `DataStream` is an open programming API, and users >> can >>>>>>> freely program to set parallelism. >>>>>>> >>>>>>> How to configure parallelism in Table/SQL: >>>>>>> But Table/SQL is not an open programming API, every feature needs a >>>>>>> corresponding mechanism, because the user is no longer able to >>>> program. Our >>>>>>> current connector interface: SourceFunctionProvider, >>>> SinkFunctionProvider, >>>>>>> through these interfaces, there is no ability to generate connector >>>> related >>>>>>> parallelism. >>>>>>> Back to our original intention: to avoid users directly manipulating >>>>>>> `DataStream`. Since we want to avoid it, we need to provide >>>> corresponding >>>>>>> features. >>>>>>> >>>>>>> And parallelism is the runtime information of connectors, It fits the >>>> name >>>>>>> of `ScanRuntimeProvider`. >>>>>>> >>>>>>>> If we wanted to add a "get parallelism" it would be in those >>>> underlying >>>>>>> connectors but I'm also skeptical about adding such a method there >>>> because >>>>>>> it is a static assignment and would preclude clever optimizations >>>> about the >>>>>>> parallelism of a connector at runtime. >>>>>>> >>>>>>> I think that when a job is submitted, it is in compile time. It >> should >>>> only >>>>>>> provide static parallelism. >>>>>>> >>>>>>> ## DataStream in table connector >>>>>>> >>>>>>> As I said before, if we want to completely cancel DataStream in the >>>> table >>>>>>> connector, we need to provide corresponding functions in >>>>>>> `xxRuntimeProvider`. >>>>>>> Otherwise, we and users may not be able to migrate the old >> connectors. >>>>>>> Including Hive/FileSystem connectors and the user cases I mentioned >>>> above. >>>>>>> CC: @liu shouwei >>>>>>> >>>>>>> We really need to consider these cases. >>>>>>> If there is no alternative in a short period of time, for a long >>>>>>> time, users need to continue to use the old table connector API, >> which >>>> has >>>>>>> been deprecated. >>>>>>> >>>>>>> Why not use StreamTableEnvironment fromDataStream/toDataStream? >>>>>>> - These tables are just temporary tables. Can not be >> integrated/stored >>>> into >>>>>>> Catalog. >>>>>>> - Creating table DDL can not work... >>>>>>> - We need to lose the kinds of useful features of Table/SQL on the >>>>>>> connector. For example, projection pushdown, filter pushdown, >>>> partitions >>>>>>> and etc... >>>>>>> >>>>>>> But I believe you are right in the long run. The source and sink APIs >>>>>>> should be powerful enough to cover all reasonable cases. >>>>>>> Maybe we can just introduce them in a minimal way. For example, we >> only >>>>>>> introduce `DataStreamSinkProvider` in planner as an internal API. >>>>>>> >>>>>>> Your points are very meaningful, hope to get your reply. >>>>>>> >>>>>>> Best, >>>>>>> Jingsong >>>>>>> >>>>>>> On Fri, Sep 25, 2020 at 10:55 AM wenlong.lwl < >> [hidden email]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi,Aljoscha, I would like to share a use case to second setting >>>>>>> parallelism >>>>>>>> of table sink(or limiting parallelism range of table sink): When >>>> writing >>>>>>>> data to databases, there is limitation for number of jdbc >> connections >>>> and >>>>>>>> query TPS. we would get errors of too many connections or high load >>>> for >>>>>>>> db and poor performance because of too many small requests if the >>>>>>> optimizer >>>>>>>> didn't know such information, and set a large parallelism for sink >>>> when >>>>>>>> matching the parallelism of its input. >>>>>>>> >>>>>>>> On Thu, 24 Sep 2020 at 22:41, Aljoscha Krettek <[hidden email] >>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for the proposal! I think the use cases that we are trying >> to >>>>>>>>> solve are indeed valid. However, I think we might have to take a >> step >>>>>>>>> back to look at what we're trying to solve and how we can solve it. >>>>>>>>> >>>>>>>>> The FLIP seems to have two broader topics: 1) add "get parallelism" >>>> to >>>>>>>>> sinks/sources 2) let users write DataStream topologies for >>>>>>>>> sinks/sources. I'll treat them separately below. >>>>>>>>> >>>>>>>>> I think we should not add "get parallelism" to the Table Sink API >>>>>>>>> because I think it's the wrong level of abstraction. The Table API >>>>>>>>> connectors are (or should be) more or less thin wrappers around >>>>>>>>> "physical" connectors. By "physical" I mean the underlying (mostly >>>>>>>>> DataStream API) connectors. For example, with the Kafka Connector >> the >>>>>>>>> Table API connector just does the configuration parsing and >>>> determines >>>>>>> a >>>>>>>>> good (de)serialization format and then creates the underlying >>>>>>>>> FlinkKafkaConsumer/FlinkKafkaProducer. >>>>>>>>> >>>>>>>>> If we wanted to add a "get parallelism" it would be in those >>>> underlying >>>>>>>>> connectors but I'm also skeptical about adding such a method there >>>>>>>>> because it is a static assignment and would preclude clever >>>>>>>>> optimizations about the parallelism of a connector at runtime. But >>>>>>> maybe >>>>>>>>> that's thinking too much about future work so I'm open to >> discussion >>>>>>>> there. >>>>>>>>> >>>>>>>>> Regarding the second point of letting Table connector developers >> use >>>>>>>>> DataStream: I think we should not do it. One of the purposes of >>>> FLIP-95 >>>>>>>>> [1] was to decouple the Table API from the DataStream API for the >>>> basic >>>>>>>>> interfaces. Coupling the two too closely at that basic level will >>>> make >>>>>>>>> our live harder in the future when we want to evolve those APIs or >>>> when >>>>>>>>> we want the system to be better at choosing how to execute sources >>>> and >>>>>>>>> sinks. An example of this is actually the past of the Table API. >>>> Before >>>>>>>>> FLIP-95 we had connectors that dealt directly with DataSet and >>>>>>>>> DataStream, meaning that if users wanted their Table Sink to work >> in >>>>>>>>> both BATCH and STREAMING mode they had to provide two >>>> implementations. >>>>>>>>> The trend is towards unifying the sources/sinks to common >> interfaces >>>>>>>>> that can be used for both BATCH and STREAMING execution but, >> again, I >>>>>>>>> think exposing DataStream here would be a step back in the wrong >>>>>>>> direction. >>>>>>>>> >>>>>>>>> I think the solution to the existing user requirement of using >>>>>>>>> DataStream sources and sinks with the Table API should be better >>>>>>>>> interoperability between the two APIs, which is being tackled right >>>> now >>>>>>>>> in FLIP-136 [2]. If FLIP-136 is not adequate for the use cases that >>>>>>>>> we're trying to solve here, maybe we should think about FLIP-136 >> some >>>>>>>> more. >>>>>>>>> >>>>>>>>> What do you think? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Aljoscha >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces >>>>>>>>> [2] >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best, Jingsong Lee >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards! >>>>>> Rui Li >>>>> >>>> >>>> >>> >> >> > |
Free forum by Nabble | Edit this page |